|
@@ -6,17 +6,26 @@ import pandas as pd
|
|
|
from sqlalchemy import create_engine, inspect
|
|
|
import json
|
|
|
|
|
|
-csv_dir = Path('C:\\GlobalCube\\System\\AUTOLINE\\Datenbank\\Full_zusammengesetllt')
|
|
|
-target_dsn = {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'AUTOLINE'}
|
|
|
-temp_schema = 'temp'
|
|
|
-target_schema = 'import'
|
|
|
+csv_dir = Path("C:\\GlobalCube\\System\\AUTOLINE\\Datenbank\\Full_zusammengesetllt")
|
|
|
+target_dsn = {
|
|
|
+ "user": "sa",
|
|
|
+ "pass": "Mffu3011#",
|
|
|
+ "server": "GC-SERVER1\\GLOBALCUBE",
|
|
|
+ "database": "AUTOLINE",
|
|
|
+}
|
|
|
+temp_schema = "temp"
|
|
|
+target_schema = "import"
|
|
|
transform = []
|
|
|
|
|
|
|
|
|
def get_dtype(db_type):
|
|
|
if db_type == "DATETIME":
|
|
|
return "datetime64"
|
|
|
- if db_type == "DECIMAL(28, 8)" or db_type == "DECIMAL(18, 0)" or db_type == "NUMERIC(18, 0)":
|
|
|
+ if (
|
|
|
+ db_type == "DECIMAL(28, 8)"
|
|
|
+ or db_type == "DECIMAL(18, 0)"
|
|
|
+ or db_type == "NUMERIC(18, 0)"
|
|
|
+ ):
|
|
|
return "float64"
|
|
|
return "object"
|
|
|
|
|
@@ -40,7 +49,7 @@ def columns_from_csv(source_csv):
|
|
|
|
|
|
def transform_template(target_insp, source_csv, table, target_schema):
|
|
|
target_insp_cols = target_insp.get_columns(table, schema=target_schema)
|
|
|
- target_columns_list = [col['name'] for col in target_insp_cols]
|
|
|
+ target_columns_list = [col["name"] for col in target_insp_cols]
|
|
|
source_columns_list = columns_from_csv(source_csv)
|
|
|
|
|
|
target_columns = set(target_columns_list)
|
|
@@ -60,7 +69,7 @@ def transform_template(target_insp, source_csv, table, target_schema):
|
|
|
pos = source_columns_list.index(col)
|
|
|
else:
|
|
|
pos = -1
|
|
|
- template.append((pos, get_dtype(str(target_insp_cols[i]['type']))))
|
|
|
+ template.append((pos, get_dtype(str(target_insp_cols[i]["type"]))))
|
|
|
return template
|
|
|
|
|
|
|
|
@@ -88,7 +97,7 @@ def transform_line(line):
|
|
|
|
|
|
def fix_nulls(s):
|
|
|
for line in s:
|
|
|
- yield line.replace('\0', ' ')
|
|
|
+ yield line.replace("\0", " ")
|
|
|
|
|
|
|
|
|
def transform_file(source_csv, template):
|
|
@@ -101,12 +110,13 @@ def transform_file(source_csv, template):
|
|
|
return False
|
|
|
|
|
|
print(f"Importiere {source_csv.name}...")
|
|
|
- with open(source_csv, "r", encoding="utf-8", errors="ignore", newline="") as source_file, \
|
|
|
- open(stage_csv, "w", encoding="utf-8", newline="") as target_file:
|
|
|
+ with open(
|
|
|
+ source_csv, "r", encoding="utf-8", errors="ignore", newline=""
|
|
|
+ ) as source_file, open(stage_csv, "w", encoding="utf-8", newline="") as target_file:
|
|
|
csv_read = csv.reader(fix_nulls(source_file), delimiter=",")
|
|
|
csv_write = csv.writer(target_file, delimiter="\t")
|
|
|
|
|
|
- next(csv_read) # ignore header
|
|
|
+ next(csv_read) # ignore header
|
|
|
i = 0
|
|
|
for cols in csv_read:
|
|
|
csv_write.writerow(transform_line(cols))
|
|
@@ -147,34 +157,49 @@ def batch(csv_dir, action):
|
|
|
stage_schema = target_schema if action == "overwrite" else temp_schema
|
|
|
print("@echo off")
|
|
|
print("cd /d %~dp0")
|
|
|
- print("set PYTHON=\"C:\\dev\\Python\\Python38-32\"")
|
|
|
+ print('set PYTHON="C:\\dev\\Python\\Python38-32"')
|
|
|
|
|
|
- for (table, source_csv) in csv_tables(csv_dir, target_tables_ci):
|
|
|
+ for table, source_csv in csv_tables(csv_dir, target_tables_ci):
|
|
|
print(f"echo =={table}==")
|
|
|
stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
|
|
|
try:
|
|
|
- tf_template = transform_template(target_insp, source_csv, table, target_schema)
|
|
|
- template_json = json.dumps(tf_template).replace("\"", "\\\"")
|
|
|
- print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{stage_schema}].[{table}]\" ")
|
|
|
- print(f"%PYTHON%\\python.exe csv_import.py transform \"{source_csv}\" -t \"{template_json}\" ")
|
|
|
-
|
|
|
- print(f"bcp.exe [{stage_schema}].[{table}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv}.log\" ")
|
|
|
+ tf_template = transform_template(
|
|
|
+ target_insp, source_csv, table, target_schema
|
|
|
+ )
|
|
|
+ template_json = json.dumps(tf_template).replace('"', '\\"')
|
|
|
+ print(
|
|
|
+ f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "TRUNCATE TABLE [{stage_schema}].[{table}]" '
|
|
|
+ )
|
|
|
+ print(
|
|
|
+ f'%PYTHON%\\python.exe csv_import.py transform "{source_csv}" -t "{template_json}" '
|
|
|
+ )
|
|
|
+
|
|
|
+ print(
|
|
|
+ f'bcp.exe [{stage_schema}].[{table}] in "{stage_csv}" {conn_params(target_dsn)} -c -C 65001 -e "{stage_csv}.log" '
|
|
|
+ )
|
|
|
pkeys = target_insp.get_pk_constraint(table, schema=target_schema)
|
|
|
- if len(pkeys['constrained_columns']) > 0:
|
|
|
- delete_sql = f"DELETE T1 FROM [{target_schema}].[{table}] T1 INNER JOIN [{temp_schema}].[{table}] T2 ON " + \
|
|
|
- " AND ".join([f"T1.[{col}] = T2.[{col}]" for col in pkeys['constrained_columns']])
|
|
|
- print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"{delete_sql}\" ")
|
|
|
+ if len(pkeys["constrained_columns"]) > 0:
|
|
|
+ delete_sql = (
|
|
|
+ f"DELETE T1 FROM [{target_schema}].[{table}] T1 INNER JOIN [{temp_schema}].[{table}] T2 ON "
|
|
|
+ + " AND ".join(
|
|
|
+ [
|
|
|
+ f"T1.[{col}] = T2.[{col}]"
|
|
|
+ for col in pkeys["constrained_columns"]
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ )
|
|
|
+ print(f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "{delete_sql}" ')
|
|
|
insert_sql = f"INSERT INTO [{target_schema}].[{table}] SELECT * FROM [{temp_schema}].[{table}]"
|
|
|
- print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"{insert_sql}\" ")
|
|
|
+ print(f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "{insert_sql}" ')
|
|
|
print("")
|
|
|
except Exception:
|
|
|
print(f"rem {source_csv} fehlerhaft!")
|
|
|
|
|
|
|
|
|
-@plac.pos('action', "", choices=['batch', 'transform'])
|
|
|
-@plac.pos('csv_dir', "", type=Path)
|
|
|
-@plac.opt('mode', "", choices=['overwrite', 'append', 'update'])
|
|
|
-@plac.opt('template', "")
|
|
|
+@plac.pos("action", "", choices=["batch", "transform"])
|
|
|
+@plac.pos("csv_dir", "", type=Path)
|
|
|
+@plac.opt("mode", "", choices=["overwrite", "append", "update"])
|
|
|
+@plac.opt("template", "")
|
|
|
def main(action, csv_dir, mode="overwrite", template="[]"):
|
|
|
if action == "transform":
|
|
|
transform_file(csv_dir, template)
|
|
@@ -182,6 +207,6 @@ def main(action, csv_dir, mode="overwrite", template="[]"):
|
|
|
batch(csv_dir, mode)
|
|
|
|
|
|
|
|
|
-if __name__ == '__main__':
|
|
|
+if __name__ == "__main__":
|
|
|
plac.call(main)
|
|
|
# main("batch", csv_dir, "append")
|