|
@@ -1,9 +1,13 @@
|
|
|
from datetime import datetime
|
|
|
import calendar
|
|
|
import csv
|
|
|
+import pyodbc
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
+DSN = "dsn=GC_OPTIMA_64;uid=gaps;pwd=Gcbs12ma"
|
|
|
+
|
|
|
+
|
|
|
class DatevConfig:
|
|
|
data_path: str = "datev/data"
|
|
|
export_path: str = "datev/export"
|
|
@@ -204,18 +208,47 @@ class DatevConfig:
|
|
|
return template.format(**datev_header)
|
|
|
|
|
|
|
|
|
-def export_extf(period):
|
|
|
- cfg = DatevConfig()
|
|
|
- cfg.periode = period
|
|
|
- import_file = Path(f"datev/data/{period}.csv")
|
|
|
- cfg.csv_date = datetime.fromtimestamp(import_file.stat().st_mtime)
|
|
|
-
|
|
|
+def get_translation(cfg: DatevConfig):
|
|
|
+ translation = {}
|
|
|
with Path(cfg.translation_file).open("r", encoding="latin-1") as frh:
|
|
|
- translation = {}
|
|
|
for line in csv.reader(frh, delimiter=";"):
|
|
|
acct_no = line[0][:4] + "0"
|
|
|
acct_details = "11" + line[0][11:].replace("-", "")
|
|
|
translation[line[2]] = (acct_no, acct_details)
|
|
|
+ return translation
|
|
|
+
|
|
|
+
|
|
|
+def from_database(period):
|
|
|
+ with pyodbc.connect(DSN) as conn:
|
|
|
+ cursor = conn.cursor()
|
|
|
+ query = (
|
|
|
+ "SELECT * FROM [import].[DATEV_Buchungsstapel] "
|
|
|
+ + f"WHERE [BOOKKEEP_PERIOD] = '{period}' ORDER BY [BOOKKEEP_DATE], [UNIQUE_IDENT]"
|
|
|
+ )
|
|
|
+ cursor.execute(query)
|
|
|
+ for row in cursor.fetchall():
|
|
|
+ yield list(map(str, row[:9]))
|
|
|
+
|
|
|
+
|
|
|
+def from_csv(import_file):
|
|
|
+ with import_file.open("r", encoding="latin-1") as frh:
|
|
|
+ csv_reader = csv.reader(frh, delimiter=";")
|
|
|
+ next(csv_reader)
|
|
|
+ for row in csv_reader:
|
|
|
+ yield row
|
|
|
+
|
|
|
+
|
|
|
+def export_extf(period, import_method="csv"):
|
|
|
+ cfg = DatevConfig()
|
|
|
+ cfg.periode = period
|
|
|
+ translation = get_translation(cfg)
|
|
|
+
|
|
|
+ if import_method == "csv":
|
|
|
+ import_file = Path(f"datev/data/{period}.csv")
|
|
|
+ cfg.csv_date = datetime.fromtimestamp(import_file.stat().st_mtime)
|
|
|
+ get_row = from_csv(import_file)
|
|
|
+ else:
|
|
|
+ get_row = from_database(cfg.periode)
|
|
|
|
|
|
missing = []
|
|
|
|
|
@@ -223,18 +256,19 @@ def export_extf(period):
|
|
|
fwh.write(cfg.header + "\r\n")
|
|
|
fwh.write(cfg.header2 + "\r\n")
|
|
|
|
|
|
- with import_file.open("r", encoding="latin-1") as frh:
|
|
|
- csv_reader = csv.reader(frh, delimiter=";")
|
|
|
- next(csv_reader)
|
|
|
- for row in csv_reader:
|
|
|
- row[0] = row[0].replace(".", ",")
|
|
|
- row.extend(translation.get(row[3], (row[3], "11000000")))
|
|
|
- if row[9] == row[3]:
|
|
|
- missing.append(row[3])
|
|
|
- fwh.write(cfg.row_template.format(*row) + "\r\n")
|
|
|
+ for row in get_row:
|
|
|
+ row[0] = row[0].replace(".", ",")
|
|
|
+ row.extend(translation.get(row[3], (row[3], "11000000")))
|
|
|
+ if row[9] == row[3]:
|
|
|
+ missing.append(row[3])
|
|
|
+ fwh.write(cfg.row_template.format(*row) + "\r\n")
|
|
|
print(set(missing))
|
|
|
|
|
|
|
|
|
+def export_all_periods():
|
|
|
+ for period in range(202301, 202313):
|
|
|
+ export_extf(str(period), "db")
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
- export_extf("202301")
|
|
|
- export_extf("202302")
|
|
|
+ export_all_periods()
|