import plac from pathlib import Path import csv import re import pandas as pd from sqlalchemy import create_engine, inspect import json csv_dir = Path("C:\\GlobalCube\\System\\AUTOLINE\\Datenbank\\Full_zusammengesetllt") target_dsn = { "user": "sa", "pass": "Mffu3011#", "server": "GC-SERVER1\\GLOBALCUBE", "database": "AUTOLINE", } temp_schema = "temp" target_schema = "import" transform = [] def get_dtype(db_type): if db_type == "DATETIME": return "datetime64" if ( db_type == "DECIMAL(28, 8)" or db_type == "DECIMAL(18, 0)" or db_type == "NUMERIC(18, 0)" ): return "float64" return "object" def conn_string(dsn): return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0" def conn_params(dsn): return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}" def columns_from_csv(source_csv): df = pd.read_csv(source_csv, sep=",", encoding="utf-8", decimal=".", nrows=1) source_columns_list = list(df.columns) for i, col in enumerate([col for col in source_columns_list if col[-2:] == ".1"]): source_columns_list[i] = col[:-2] + "2" source_columns_list = [col.replace(".", " ") for col in source_columns_list] return source_columns_list def transform_template(target_insp, source_csv, table, target_schema): target_insp_cols = target_insp.get_columns(table, schema=target_schema) target_columns_list = [col["name"] for col in target_insp_cols] source_columns_list = columns_from_csv(source_csv) target_columns = set(target_columns_list) source_columns = set(source_columns_list) # intersect = source_columns.intersection(target_columns) # print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = source_columns.difference(target_columns) if len(diff1) > 0: print("rem Nur in Quelle: " + ";".join(diff1)) diff2 = target_columns.difference(source_columns) if len(diff2) > 0: print("rem Nur in Ziel: " + ";".join(diff2)) template = [] for i, col in enumerate(target_columns_list): if col in source_columns_list: pos = source_columns_list.index(col) else: pos = -1 template.append((pos, get_dtype(str(target_insp_cols[i]["type"])))) return template def transform_line(line): pattern = re.compile(r"\d{4}-\d\d-\d\d \d\d:\d\d") pattern2 = re.compile(r"(\d{2})[/\.](\d{2})[/\.](\d{4})") result = [] for pos, f in transform: e = "" if pos > -1 and pos < len(line): e = line[pos] if f == "float64": e = e.replace(",", ".") if f == "datetime64": m = pattern2.match(e) if m: e = f"{m[3]}-{m[2]}-{m[1]}" elif not pattern.match(e): e = "" # e += ":00.000" result.append(e) return result def fix_nulls(s): for line in s: yield line.replace("\0", " ") def transform_file(source_csv, template): global transform transform = json.loads(template) stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}") if stage_csv.exists() and stage_csv.stat().st_ctime > source_csv.stat().st_ctime: print(f"Stage-CSV '{stage_csv.name}' ist bereits aktuell.") return False print(f"Importiere {source_csv.name}...") with open( source_csv, "r", encoding="utf-8", errors="ignore", newline="" ) as source_file, open(stage_csv, "w", encoding="utf-8", newline="") as target_file: csv_read = csv.reader(fix_nulls(source_file), delimiter=",") csv_write = csv.writer(target_file, delimiter="\t") next(csv_read) # ignore header i = 0 for cols in csv_read: csv_write.writerow(transform_line(cols)) i += 1 print(f"...{i} Zeilen konvertiert.") def csv_tables(csv_dir, target_tables_ci): p = re.compile(r"_\d+$") result = [] if not csv_dir.is_dir(): print(f"Verzeichnis {csv_dir} existiert nicht!") return result for source_csv in csv_dir.glob("*.csv"): if source_csv.is_dir(): continue table = source_csv.name[:-4].lower() if table not in target_tables_ci: table = p.sub("", table) if table not in target_tables_ci: print(f"rem Ziel-Tabelle '{table}' existiert nicht!") continue result.append((table, source_csv)) return result def target_tables(target_dsn, target_schema): engine = create_engine(conn_string(target_dsn)) target_insp = inspect(engine) target_tables = target_insp.get_table_names(schema=target_schema) return (target_insp, list(map(str.lower, target_tables))) def batch(csv_dir, action): target_insp, target_tables_ci = target_tables(target_dsn, target_schema) stage_schema = target_schema if action == "overwrite" else temp_schema print("@echo off") print("cd /d %~dp0") print('set PYTHON="C:\\dev\\Python\\Python38-32"') for table, source_csv in csv_tables(csv_dir, target_tables_ci): print(f"echo =={table}==") stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}") try: tf_template = transform_template( target_insp, source_csv, table, target_schema ) template_json = json.dumps(tf_template).replace('"', '\\"') print( f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "TRUNCATE TABLE [{stage_schema}].[{table}]" ' ) print( f'%PYTHON%\\python.exe csv_import.py transform "{source_csv}" -t "{template_json}" ' ) print( f'bcp.exe [{stage_schema}].[{table}] in "{stage_csv}" {conn_params(target_dsn)} -c -C 65001 -e "{stage_csv}.log" ' ) pkeys = target_insp.get_pk_constraint(table, schema=target_schema) if len(pkeys["constrained_columns"]) > 0: delete_sql = ( f"DELETE T1 FROM [{target_schema}].[{table}] T1 INNER JOIN [{temp_schema}].[{table}] T2 ON " + " AND ".join( [ f"T1.[{col}] = T2.[{col}]" for col in pkeys["constrained_columns"] ] ) ) print(f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "{delete_sql}" ') insert_sql = f"INSERT INTO [{target_schema}].[{table}] SELECT * FROM [{temp_schema}].[{table}]" print(f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "{insert_sql}" ') print("") except Exception: print(f"rem {source_csv} fehlerhaft!") @plac.pos("action", "", choices=["batch", "transform"]) @plac.pos("csv_dir", "", type=Path) @plac.opt("mode", "", choices=["overwrite", "append", "update"]) @plac.opt("template", "") def main(action, csv_dir, mode="overwrite", template="[]"): if action == "transform": transform_file(csv_dir, template) else: batch(csv_dir, mode) if __name__ == "__main__": plac.call(main) # main("batch", csv_dir, "append")