csv_import.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import plac
  2. from pathlib import Path
  3. import csv
  4. import re
  5. import pandas as pd
  6. from sqlalchemy import create_engine, inspect, event
  7. import json
  8. csv_dir = Path("C:\\GlobalCube\\System\\AUTOLINE\\Datenbank\\Full_zusammengesetllt")
  9. target_dsn = { 'user': "sa", 'pass': "Mffu3011#", 'server': "GC-SERVER1\\GLOBALCUBE", 'database': "AUTOLINE" }
  10. temp_schema = "temp"
  11. target_schema = "import"
  12. transform = []
  13. def get_dtype (db_type):
  14. if db_type == "DATETIME":
  15. return "datetime64"
  16. if db_type == "DECIMAL(28, 8)" or db_type == "DECIMAL(18, 0)" or db_type == "NUMERIC(18, 0)":
  17. return "float64"
  18. return "object"
  19. def conn_string (dsn):
  20. return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
  21. def conn_params (dsn):
  22. return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}"
  23. def columns_from_csv (source_csv):
  24. df = pd.read_csv(source_csv, sep=",", encoding="utf8", decimal=".", nrows=1)
  25. source_columns_list = list(df.columns)
  26. for i,col in enumerate([col for col in source_columns_list if col[-2:] == ".1"]):
  27. source_columns_list[i] = col[:-2] + "2"
  28. source_columns_list = [col.replace(".", " ") for col in source_columns_list]
  29. return source_columns_list
  30. def transform_template(target_insp, source_csv, table, target_schema):
  31. target_insp_cols = target_insp.get_columns(table, schema=target_schema)
  32. target_columns_list = [col['name'] for col in target_insp_cols]
  33. source_columns_list = columns_from_csv(source_csv)
  34. target_columns = set(target_columns_list)
  35. source_columns = set(source_columns_list)
  36. #intersect = source_columns.intersection(target_columns)
  37. #print("Auf beiden Seiten: " + ";".join(intersect))
  38. diff1 = source_columns.difference(target_columns)
  39. if len(diff1) > 0:
  40. print("rem Nur in Quelle: " + ";".join(diff1))
  41. diff2 = target_columns.difference(source_columns)
  42. if len(diff2) > 0:
  43. print("rem Nur in Ziel: " + ";".join(diff2))
  44. template = []
  45. for i, col in enumerate(target_columns_list):
  46. if col in source_columns_list:
  47. pos = source_columns_list.index(col)
  48. else:
  49. pos = -1
  50. template.append((pos, get_dtype(str(target_insp_cols[i]['type']))))
  51. return template
  52. def transform_line (line):
  53. pattern = re.compile(r"\d{4}-\d\d-\d\d \d\d:\d\d")
  54. result = []
  55. for pos, f in transform:
  56. e = ""
  57. if pos > -1:
  58. e = line[pos]
  59. if f == "float64":
  60. e = e.replace(",", ".")
  61. if f == "datetime64":
  62. if not pattern.match(e):
  63. e = ""
  64. #e += ":00.000"
  65. result.append(e)
  66. return result
  67. def fix_nulls(s):
  68. for line in s:
  69. yield line.replace('\0', ' ')
  70. def transform_file(source_csv, template):
  71. global transform
  72. transform = json.loads(template)
  73. stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
  74. if stage_csv.exists() and stage_csv.stat().st_ctime > source_csv.stat().st_ctime:
  75. print(f"Stage-CSV '{stage_csv.name}' ist bereits aktuell.")
  76. return False
  77. print(source_csv.name)
  78. with open(source_csv, "r", encoding="utf8", newline="") as source_file, open(stage_csv, "w", encoding="utf8", newline="") as target_file:
  79. csv_read = csv.reader(fix_nulls(source_file), delimiter=",")
  80. csv_write = csv.writer(target_file, delimiter="\t")
  81. next(csv_read) # ignore header
  82. for cols in csv_read:
  83. csv_write.writerow(transform_line(cols))
  84. def csv_tables (csv_dir, target_tables_ci):
  85. p = re.compile(r"_\d+$")
  86. result = []
  87. if not csv_dir.is_dir():
  88. print(f"Verzeichnis {csv_dir} existiert nicht!")
  89. return result
  90. for source_csv in csv_dir.glob("*.csv"):
  91. if source_csv.is_dir():
  92. continue
  93. table = source_csv.name[:-4].lower()
  94. if not table in target_tables_ci:
  95. table = p.sub("", table)
  96. if not table in target_tables_ci:
  97. print(f"rem Ziel-Tabelle '{table}' existiert nicht!")
  98. continue
  99. result.append((table, source_csv))
  100. return result
  101. def target_tables (target_dsn, target_schema):
  102. engine = create_engine(conn_string(target_dsn))
  103. target_insp = inspect(engine)
  104. target_tables = target_insp.get_table_names(schema=target_schema)
  105. return (target_insp, list(map(str.lower, target_tables)))
  106. def batch (csv_dir, action):
  107. target_insp, target_tables_ci = target_tables(target_dsn, target_schema)
  108. stage_schema = target_schema if action == "overwrite" else temp_schema
  109. print("@echo off")
  110. print("cd /d %~dp0")
  111. print("set PYTHON=\"C:\\dev\\Python\\Python38-32\"")
  112. for (table, source_csv) in csv_tables(csv_dir, target_tables_ci):
  113. print(f"echo =={table}==")
  114. stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
  115. try:
  116. tf_template = transform_template(target_insp, source_csv, table, target_schema)
  117. template_json = json.dumps(tf_template).replace("\"", "\\\"")
  118. print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{stage_schema}].[{table}]\" ")
  119. print(f"%PYTHON%\\python.exe csv_import.py transform \"{source_csv}\" -t \"{template_json}\" ")
  120. print(f"bcp.exe [{stage_schema}].[{table}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv}.log\" ")
  121. pkeys = target_insp.get_pk_constraint(table, schema=target_schema)
  122. if len(pkeys['constrained_columns']) > 0:
  123. delete_sql = f"DELETE T1 FROM [{target_schema}].[{table}] T1 INNER JOIN [{temp_schema}].[{table}] T2 ON " + " AND ".join([f"T1.[{col}] = T2.[{col}]" for col in pkeys['constrained_columns']])
  124. print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"{delete_sql}\" ")
  125. insert_sql = f"INSERT INTO [{target_schema}].[{table}] SELECT * FROM [{temp_schema}].[{table}]"
  126. print(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"{insert_sql}\" ")
  127. print("")
  128. except:
  129. print(f"rem {source_csv} fehlerhaft!")
  130. @plac.pos('action', "", choices=['batch', 'transform'])
  131. @plac.pos('csv_dir', "", type=Path)
  132. @plac.opt('mode', "", choices=['overwrite', 'append', 'update'])
  133. @plac.opt('template', "")
  134. def main(action, csv_dir, mode="overwrite", template="[]"):
  135. if action == "transform":
  136. transform_file(csv_dir, template)
  137. else:
  138. batch(csv_dir, mode)
  139. if __name__ == '__main__':
  140. plac.call(main)
  141. #main("batch", csv_dir, "append")