import plac from pathlib import Path import csv import re import pandas as pd from sqlalchemy import create_engine, inspect, event import json csv_dir = Path("C:\\GlobalCube\\System\\CARLO\\Export") target_dsn = { 'user': "sa", 'pass': "Mffu3011#", 'server': "GC-SERVER1\\GLOBALCUBE", 'database': "GC" } temp_schema = "temp" target_schema = "carlo" def get_dtype (db_type): if db_type == "DATETIME": return "datetime64" if db_type == "DECIMAL(28, 8)" or db_type == "DECIMAL(18, 0)" or db_type == "NUMERIC(18, 0)": return "float64" return "object" def conn_string (dsn): return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0" def conn_params (dsn): return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}" def columns_from_csv (source_csv, target_insp_cols): target_col_dict = dict([(col['name'], get_dtype(str(col['type']))) for col in target_insp_cols if str(col['type']) != "DATETIME"]) target_dates = [col['name'] for col in target_insp_cols if str(col['type']) == "DATETIME"] df = pd.read_csv(source_csv, sep=";", encoding="ansi", decimal=",", dtype=target_col_dict, parse_dates=target_dates, nrows=10) source_columns_list = list(df.columns) for i,col in enumerate([col for col in source_columns_list if col[-2:] == ".1"]): source_columns_list[i] = col[:-2] + "2" return source_columns_list def transform_template(target_insp, source_csv, table, target_schema): target_insp_cols = target_insp.get_columns(table, schema=target_schema) target_columns_list = [col['name'] for col in target_insp_cols] source_columns_list = columns_from_csv(source_csv, target_insp_cols) target_columns = set(target_columns_list) source_columns = set(source_columns_list) #intersect = source_columns.intersection(target_columns) #print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = source_columns.difference(target_columns) if len(diff1) > 0: print("Nur in Quelle: " + ";".join(diff1)) diff2 = target_columns.difference(source_columns) if len(diff2) > 0: print("Nur in Ziel: " + ";".join(diff2)) transform = [] for i, col in enumerate(target_columns_list): if col in source_columns_list: pos = source_columns_list.index(col) else: pos = -1 transform.append((pos, get_dtype(str(target_insp_cols[i]['type'])))) return transform def transform_line (line, transform): pattern = re.compile(r"\d{4}-\d\d-\d\d \d\d:\d\d") result = [] for pos, f in transform: e = "" if pos > -1: e = line[pos] if f == "float64": e = e.replace(",", ".") if f == "datetime64": if not pattern.match(e): e = "" #e += ":00.000" result.append(e) return result def transform_file(source_csv, transform): stage_csv = Path("\\".join(source_csv.parts[:-1] + list("stage") + source_csv.parts[-1:])) if stage_csv.exists() and stage_csv.stat()['st_ctime'] > source_csv.stat()['st_ctime']: print(f"Stage-CSV '{stage_csv.name}' ist bereits aktuell.") return False with csv.reader(open(source_csv, "r", encoding="ansi", newline=""), delimiter=";") as source_file: with csv.writer(open(stage_csv, "w", encoding="utf8", newline=""), delimiter="\t") as target_file: next(source_file) # ignore header for cols in source_file: target_file.writerow(transform_line(cols, transform)) def csv_tables (csv_dir, target_tables_ci): p = re.compile("_\d+$") result = [] if not csv_dir.is_dir(): print(f"Verzeichnis {csv_dir} existiert nicht!") return result for source_csv in csv_dir.glob("*.csv"): if source_csv.is_dir(): continue table = source_csv.name[:-4].lower() if not table in target_tables_ci: table = p.sub("", table) if not table in target_tables_ci: print(f"Ziel-Tabelle '{table}' existiert nicht!") continue result.append((table, source_csv)) return result def target_tables (target_dsn, target_schema): engine = create_engine(conn_string(target_dsn)) target_insp = inspect(engine) target_tables = target_insp.get_table_names(schema=target_schema) return (target_insp, list(map(str.lower, target_tables))) def batch (csv_dir, action): target_insp, target_tables_ci = target_tables(target_dsn, target_schema) for table in csv_tables(csv_dir, target_tables_ci): source_csv = f"{csv_dir}\\{table}.csv" print(f"=={table}==") stage_csv = f"{csv_dir}\\stage\\{table}.csv" print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{target_schema}].[{table}]\" ") tf_template = transform_template(target_insp, source_csv, table, target_schema) print(f"%PYTHON%\\python.exe csv_import.py transform \"{source_csv}\" \"{json.dumps(tf_template)}\" ") print(f"bcp.exe [{target_schema}].[{table}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv}.log\" ") print("") @plac.pos('csv_dir', "", type=Path) @plac.opt('action', "", choices=['overwrite', 'update', 'transform']) @plac.opt('template', "") def main(csv_dir, action="overwrite", template="[]"): if action == "transform": transform_file(csv_dir, template) else: batch(csv_dir, action) if __name__ == '__main__': #plac.call(main) main(csv_dir)