123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import plac
- from pathlib import Path
- import csv
- import re
- import pandas as pd
- from sqlalchemy import create_engine, inspect, event
- import json
- csv_dir = Path("C:\\GlobalCube\\System\\CARLO\\Export")
- target_dsn = { 'user': "sa", 'pass': "Mffu3011#", 'server': "GC-SERVER1\\GLOBALCUBE", 'database': "GC" }
- temp_schema = "temp"
- target_schema = "carlo"
- def get_dtype (db_type):
- if db_type == "DATETIME":
- return "datetime64"
- if db_type == "DECIMAL(28, 8)" or db_type == "DECIMAL(18, 0)" or db_type == "NUMERIC(18, 0)":
- return "float64"
- return "object"
- def conn_string (dsn):
- return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
- def conn_params (dsn):
- return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}"
- def columns_from_csv (source_csv, target_insp_cols):
- target_col_dict = dict([(col['name'], get_dtype(str(col['type']))) for col in target_insp_cols if str(col['type']) != "DATETIME"])
- target_dates = [col['name'] for col in target_insp_cols if str(col['type']) == "DATETIME"]
- df = pd.read_csv(source_csv, sep=";", encoding="ansi", decimal=",", dtype=target_col_dict, parse_dates=target_dates, nrows=10)
- source_columns_list = list(df.columns)
- for i,col in enumerate([col for col in source_columns_list if col[-2:] == ".1"]):
- source_columns_list[i] = col[:-2] + "2"
- return source_columns_list
- def transform_template(target_insp, source_csv, table, target_schema):
- target_insp_cols = target_insp.get_columns(table, schema=target_schema)
- target_columns_list = [col['name'] for col in target_insp_cols]
- source_columns_list = columns_from_csv(source_csv, target_insp_cols)
- target_columns = set(target_columns_list)
- source_columns = set(source_columns_list)
- #intersect = source_columns.intersection(target_columns)
- #print("Auf beiden Seiten: " + ";".join(intersect))
- diff1 = source_columns.difference(target_columns)
- if len(diff1) > 0:
- print("Nur in Quelle: " + ";".join(diff1))
- diff2 = target_columns.difference(source_columns)
- if len(diff2) > 0:
- print("Nur in Ziel: " + ";".join(diff2))
- transform = []
- for i, col in enumerate(target_columns_list):
- if col in source_columns_list:
- pos = source_columns_list.index(col)
- else:
- pos = -1
- transform.append((pos, get_dtype(str(target_insp_cols[i]['type']))))
- return transform
- def transform_line (line, transform):
- pattern = re.compile(r"\d{4}-\d\d-\d\d \d\d:\d\d")
- result = []
- for pos, f in transform:
- e = ""
- if pos > -1:
- e = line[pos]
-
- if f == "float64":
- e = e.replace(",", ".")
- if f == "datetime64":
- if not pattern.match(e):
- e = ""
- #e += ":00.000"
- result.append(e)
- return result
- def transform_file(source_csv, transform):
- stage_csv = Path("\\".join(source_csv.parts[:-1] + list("stage") + source_csv.parts[-1:]))
- if stage_csv.exists() and stage_csv.stat()['st_ctime'] > source_csv.stat()['st_ctime']:
- print(f"Stage-CSV '{stage_csv.name}' ist bereits aktuell.")
- return False
- with csv.reader(open(source_csv, "r", encoding="ansi", newline=""), delimiter=";") as source_file:
- with csv.writer(open(stage_csv, "w", encoding="utf8", newline=""), delimiter="\t") as target_file:
- next(source_file) # ignore header
- for cols in source_file:
- target_file.writerow(transform_line(cols, transform))
- def csv_tables (csv_dir, target_tables_ci):
- p = re.compile("_\d+$")
- result = []
- if not csv_dir.is_dir():
- print(f"Verzeichnis {csv_dir} existiert nicht!")
- return result
- for source_csv in csv_dir.glob("*.csv"):
- if source_csv.is_dir():
- continue
-
- table = source_csv.name[:-4].lower()
- if not table in target_tables_ci:
- table = p.sub("", table)
- if not table in target_tables_ci:
- print(f"Ziel-Tabelle '{table}' existiert nicht!")
- continue
- result.append((table, source_csv))
- return result
- def target_tables (target_dsn, target_schema):
- engine = create_engine(conn_string(target_dsn))
- target_insp = inspect(engine)
- target_tables = target_insp.get_table_names(schema=target_schema)
- return (target_insp, list(map(str.lower, target_tables)))
- def batch (csv_dir, action):
- target_insp, target_tables_ci = target_tables(target_dsn, target_schema)
- for table in csv_tables(csv_dir, target_tables_ci):
- source_csv = f"{csv_dir}\\{table}.csv"
- print(f"=={table}==")
- stage_csv = f"{csv_dir}\\stage\\{table}.csv"
- print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{target_schema}].[{table}]\" ")
- tf_template = transform_template(target_insp, source_csv, table, target_schema)
- print(f"%PYTHON%\\python.exe csv_import.py transform \"{source_csv}\" \"{json.dumps(tf_template)}\" ")
- print(f"bcp.exe [{target_schema}].[{table}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv}.log\" ")
- print("")
- @plac.pos('csv_dir', "", type=Path)
- @plac.opt('action', "", choices=['overwrite', 'update', 'transform'])
- @plac.opt('template', "")
- def main(csv_dir, action="overwrite", template="[]"):
- if action == "transform":
- transform_file(csv_dir, template)
- else:
- batch(csv_dir, action)
- if __name__ == '__main__':
- #plac.call(main)
- main(csv_dir)
|