|
@@ -6,10 +6,11 @@ import pandas as pd
|
|
|
from sqlalchemy import create_engine, inspect, event
|
|
|
import json
|
|
|
|
|
|
-csv_dir = Path("C:\\GlobalCube\\System\\CARLO\\Export")
|
|
|
-target_dsn = { 'user': "sa", 'pass': "Mffu3011#", 'server': "GC-SERVER1\\GLOBALCUBE", 'database': "GC" }
|
|
|
+csv_dir = Path("C:\\GlobalCube\\System\\AUTOLINE\\Datenbank\\Full_zusammengesetllt")
|
|
|
+target_dsn = { 'user': "sa", 'pass': "Mffu3011#", 'server': "GC-SERVER1\\GLOBALCUBE", 'database': "AUTOLINE" }
|
|
|
temp_schema = "temp"
|
|
|
-target_schema = "carlo"
|
|
|
+target_schema = "import"
|
|
|
+transform = []
|
|
|
|
|
|
def get_dtype (db_type):
|
|
|
if db_type == "DATETIME":
|
|
@@ -24,19 +25,18 @@ def conn_string (dsn):
|
|
|
def conn_params (dsn):
|
|
|
return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}"
|
|
|
|
|
|
-def columns_from_csv (source_csv, target_insp_cols):
|
|
|
- target_col_dict = dict([(col['name'], get_dtype(str(col['type']))) for col in target_insp_cols if str(col['type']) != "DATETIME"])
|
|
|
- target_dates = [col['name'] for col in target_insp_cols if str(col['type']) == "DATETIME"]
|
|
|
- df = pd.read_csv(source_csv, sep=";", encoding="ansi", decimal=",", dtype=target_col_dict, parse_dates=target_dates, nrows=10)
|
|
|
+def columns_from_csv (source_csv):
|
|
|
+ df = pd.read_csv(source_csv, sep=",", encoding="utf8", decimal=".", nrows=1)
|
|
|
source_columns_list = list(df.columns)
|
|
|
for i,col in enumerate([col for col in source_columns_list if col[-2:] == ".1"]):
|
|
|
source_columns_list[i] = col[:-2] + "2"
|
|
|
+ source_columns_list = [col.replace(".", " ") for col in source_columns_list]
|
|
|
return source_columns_list
|
|
|
|
|
|
def transform_template(target_insp, source_csv, table, target_schema):
|
|
|
target_insp_cols = target_insp.get_columns(table, schema=target_schema)
|
|
|
target_columns_list = [col['name'] for col in target_insp_cols]
|
|
|
- source_columns_list = columns_from_csv(source_csv, target_insp_cols)
|
|
|
+ source_columns_list = columns_from_csv(source_csv)
|
|
|
|
|
|
target_columns = set(target_columns_list)
|
|
|
source_columns = set(source_columns_list)
|
|
@@ -44,21 +44,21 @@ def transform_template(target_insp, source_csv, table, target_schema):
|
|
|
#print("Auf beiden Seiten: " + ";".join(intersect))
|
|
|
diff1 = source_columns.difference(target_columns)
|
|
|
if len(diff1) > 0:
|
|
|
- print("Nur in Quelle: " + ";".join(diff1))
|
|
|
+ print("rem Nur in Quelle: " + ";".join(diff1))
|
|
|
diff2 = target_columns.difference(source_columns)
|
|
|
if len(diff2) > 0:
|
|
|
- print("Nur in Ziel: " + ";".join(diff2))
|
|
|
+ print("rem Nur in Ziel: " + ";".join(diff2))
|
|
|
|
|
|
- transform = []
|
|
|
+ template = []
|
|
|
for i, col in enumerate(target_columns_list):
|
|
|
if col in source_columns_list:
|
|
|
pos = source_columns_list.index(col)
|
|
|
else:
|
|
|
pos = -1
|
|
|
- transform.append((pos, get_dtype(str(target_insp_cols[i]['type']))))
|
|
|
- return transform
|
|
|
+ template.append((pos, get_dtype(str(target_insp_cols[i]['type']))))
|
|
|
+ return template
|
|
|
|
|
|
-def transform_line (line, transform):
|
|
|
+def transform_line (line):
|
|
|
pattern = re.compile(r"\d{4}-\d\d-\d\d \d\d:\d\d")
|
|
|
result = []
|
|
|
for pos, f in transform:
|
|
@@ -75,22 +75,28 @@ def transform_line (line, transform):
|
|
|
result.append(e)
|
|
|
return result
|
|
|
|
|
|
-def transform_file(source_csv, transform):
|
|
|
- stage_csv = Path("\\".join(source_csv.parts[:-1] + list("stage") + source_csv.parts[-1:]))
|
|
|
+def transform_file(source_csv, template):
|
|
|
+ global transform
|
|
|
+ transform = json.loads(template)
|
|
|
+ stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
|
|
|
|
|
|
- if stage_csv.exists() and stage_csv.stat()['st_ctime'] > source_csv.stat()['st_ctime']:
|
|
|
+ if stage_csv.exists() and stage_csv.stat().st_ctime > source_csv.stat().st_ctime:
|
|
|
print(f"Stage-CSV '{stage_csv.name}' ist bereits aktuell.")
|
|
|
return False
|
|
|
|
|
|
- with csv.reader(open(source_csv, "r", encoding="ansi", newline=""), delimiter=";") as source_file:
|
|
|
- with csv.writer(open(stage_csv, "w", encoding="utf8", newline=""), delimiter="\t") as target_file:
|
|
|
- next(source_file) # ignore header
|
|
|
- for cols in source_file:
|
|
|
- target_file.writerow(transform_line(cols, transform))
|
|
|
+ print(source_csv)
|
|
|
+ print(stage_csv)
|
|
|
+ with open(source_csv, "r", encoding="utf8", newline="") as source_file, open(stage_csv, "w", encoding="utf8", newline="") as target_file:
|
|
|
+ csv_read = csv.reader(source_file, delimiter=",")
|
|
|
+ csv_write = csv.writer(target_file, delimiter="\t")
|
|
|
+
|
|
|
+ next(csv_read) # ignore header
|
|
|
+ for cols in csv_read:
|
|
|
+ csv_write.writerow(transform_line(cols))
|
|
|
|
|
|
|
|
|
def csv_tables (csv_dir, target_tables_ci):
|
|
|
- p = re.compile("_\d+$")
|
|
|
+ p = re.compile(r"_\d+$")
|
|
|
result = []
|
|
|
if not csv_dir.is_dir():
|
|
|
print(f"Verzeichnis {csv_dir} existiert nicht!")
|
|
@@ -104,7 +110,7 @@ def csv_tables (csv_dir, target_tables_ci):
|
|
|
if not table in target_tables_ci:
|
|
|
table = p.sub("", table)
|
|
|
if not table in target_tables_ci:
|
|
|
- print(f"Ziel-Tabelle '{table}' existiert nicht!")
|
|
|
+ print(f"rem Ziel-Tabelle '{table}' existiert nicht!")
|
|
|
continue
|
|
|
result.append((table, source_csv))
|
|
|
return result
|
|
@@ -119,28 +125,42 @@ def target_tables (target_dsn, target_schema):
|
|
|
|
|
|
def batch (csv_dir, action):
|
|
|
target_insp, target_tables_ci = target_tables(target_dsn, target_schema)
|
|
|
-
|
|
|
- for table in csv_tables(csv_dir, target_tables_ci):
|
|
|
- source_csv = f"{csv_dir}\\{table}.csv"
|
|
|
- print(f"=={table}==")
|
|
|
- stage_csv = f"{csv_dir}\\stage\\{table}.csv"
|
|
|
- print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{target_schema}].[{table}]\" ")
|
|
|
- tf_template = transform_template(target_insp, source_csv, table, target_schema)
|
|
|
- print(f"%PYTHON%\\python.exe csv_import.py transform \"{source_csv}\" \"{json.dumps(tf_template)}\" ")
|
|
|
- print(f"bcp.exe [{target_schema}].[{table}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv}.log\" ")
|
|
|
- print("")
|
|
|
-
|
|
|
-
|
|
|
+ stage_schema = target_schema if action == "overwrite" else temp_schema
|
|
|
+ print("@echo off")
|
|
|
+ print("cd /d %~dp0")
|
|
|
+ print("set PYTHON=\"C:\\dev\\Python\\Python38-32\"")
|
|
|
+
|
|
|
+ for (table, source_csv) in csv_tables(csv_dir, target_tables_ci):
|
|
|
+ print(f"rem =={table}==")
|
|
|
+ stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
|
|
|
+ try:
|
|
|
+ tf_template = transform_template(target_insp, source_csv, table, target_schema)
|
|
|
+ template_json = json.dumps(tf_template).replace("\"", "\\\"")
|
|
|
+ print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{stage_schema}].[{table}]\" ")
|
|
|
+ print(f"%PYTHON%\\python.exe csv_import.py transform \"{source_csv}\" -t \"{template_json}\" ")
|
|
|
+
|
|
|
+ print(f"bcp.exe [{stage_schema}].[{table}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv}.log\" ")
|
|
|
+ pkeys = target_insp.get_pk_constraint(table, schema=target_schema)
|
|
|
+ delete_sql = f"DELETE T1 FROM [{target_schema}].[{table}] T1 INNER JOIN [{temp_schema}].[{table}] T2 ON " + " AND ".join([f"T1.[{col}] = T2.[{col}]" for col in pkeys['constrained_columns']])
|
|
|
+ print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"{delete_sql}\" ")
|
|
|
+ insert_sql = f"INSERT INTO [{target_schema}].[{table}] SELECT * FROM [{temp_schema}].[{table}]"
|
|
|
+ print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"{insert_sql}\" ")
|
|
|
+ print("")
|
|
|
+ except:
|
|
|
+ print(f"rem {source_csv} fehlerhaft!")
|
|
|
+
|
|
|
+
|
|
|
+@plac.pos('action', "", choices=['batch', 'transform'])
|
|
|
@plac.pos('csv_dir', "", type=Path)
|
|
|
-@plac.opt('action', "", choices=['overwrite', 'update', 'transform'])
|
|
|
+@plac.opt('mode', "", choices=['overwrite', 'append', 'update'])
|
|
|
@plac.opt('template', "")
|
|
|
-def main(csv_dir, action="overwrite", template="[]"):
|
|
|
+def main(action, csv_dir, mode="overwrite", template="[]"):
|
|
|
if action == "transform":
|
|
|
transform_file(csv_dir, template)
|
|
|
else:
|
|
|
- batch(csv_dir, action)
|
|
|
+ batch(csv_dir, mode)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- #plac.call(main)
|
|
|
- main(csv_dir)
|
|
|
+ plac.call(main)
|
|
|
+ #main("batch", csv_dir, "append")
|