csv_import.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import plac
  2. from pathlib import Path
  3. import csv
  4. import re
  5. import pandas as pd
  6. from sqlalchemy import create_engine, inspect
  7. import json
  8. csv_dir = Path("C:\\GlobalCube\\System\\AUTOLINE\\Datenbank\\Full_zusammengesetllt")
  9. target_dsn = {
  10. "user": "sa",
  11. "pass": "Mffu3011#",
  12. "server": "GC-SERVER1\\GLOBALCUBE",
  13. "database": "AUTOLINE",
  14. }
  15. temp_schema = "temp"
  16. target_schema = "import"
  17. transform = []
  18. def get_dtype(db_type):
  19. if db_type == "DATETIME":
  20. return "datetime64"
  21. if (
  22. db_type == "DECIMAL(28, 8)"
  23. or db_type == "DECIMAL(18, 0)"
  24. or db_type == "NUMERIC(18, 0)"
  25. ):
  26. return "float64"
  27. return "object"
  28. def conn_string(dsn):
  29. return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
  30. def conn_params(dsn):
  31. return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}"
  32. def columns_from_csv(source_csv):
  33. df = pd.read_csv(source_csv, sep=",", encoding="utf-8", decimal=".", nrows=1)
  34. source_columns_list = list(df.columns)
  35. for i, col in enumerate([col for col in source_columns_list if col[-2:] == ".1"]):
  36. source_columns_list[i] = col[:-2] + "2"
  37. source_columns_list = [col.replace(".", " ") for col in source_columns_list]
  38. return source_columns_list
  39. def transform_template(target_insp, source_csv, table, target_schema):
  40. target_insp_cols = target_insp.get_columns(table, schema=target_schema)
  41. target_columns_list = [col["name"] for col in target_insp_cols]
  42. source_columns_list = columns_from_csv(source_csv)
  43. target_columns = set(target_columns_list)
  44. source_columns = set(source_columns_list)
  45. # intersect = source_columns.intersection(target_columns)
  46. # print("Auf beiden Seiten: " + ";".join(intersect))
  47. diff1 = source_columns.difference(target_columns)
  48. if len(diff1) > 0:
  49. print("rem Nur in Quelle: " + ";".join(diff1))
  50. diff2 = target_columns.difference(source_columns)
  51. if len(diff2) > 0:
  52. print("rem Nur in Ziel: " + ";".join(diff2))
  53. template = []
  54. for i, col in enumerate(target_columns_list):
  55. if col in source_columns_list:
  56. pos = source_columns_list.index(col)
  57. else:
  58. pos = -1
  59. template.append((pos, get_dtype(str(target_insp_cols[i]["type"]))))
  60. return template
  61. def transform_line(line):
  62. pattern = re.compile(r"\d{4}-\d\d-\d\d \d\d:\d\d")
  63. pattern2 = re.compile(r"(\d{2})[/\.](\d{2})[/\.](\d{4})")
  64. result = []
  65. for pos, f in transform:
  66. e = ""
  67. if pos > -1 and pos < len(line):
  68. e = line[pos]
  69. if f == "float64":
  70. e = e.replace(",", ".")
  71. if f == "datetime64":
  72. m = pattern2.match(e)
  73. if m:
  74. e = f"{m[3]}-{m[2]}-{m[1]}"
  75. elif not pattern.match(e):
  76. e = ""
  77. # e += ":00.000"
  78. result.append(e)
  79. return result
  80. def fix_nulls(s):
  81. for line in s:
  82. yield line.replace("\0", " ")
  83. def transform_file(source_csv, template):
  84. global transform
  85. transform = json.loads(template)
  86. stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
  87. if stage_csv.exists() and stage_csv.stat().st_ctime > source_csv.stat().st_ctime:
  88. print(f"Stage-CSV '{stage_csv.name}' ist bereits aktuell.")
  89. return False
  90. print(f"Importiere {source_csv.name}...")
  91. with open(
  92. source_csv, "r", encoding="utf-8", errors="ignore", newline=""
  93. ) as source_file, open(stage_csv, "w", encoding="utf-8", newline="") as target_file:
  94. csv_read = csv.reader(fix_nulls(source_file), delimiter=",")
  95. csv_write = csv.writer(target_file, delimiter="\t")
  96. next(csv_read) # ignore header
  97. i = 0
  98. for cols in csv_read:
  99. csv_write.writerow(transform_line(cols))
  100. i += 1
  101. print(f"...{i} Zeilen konvertiert.")
  102. def csv_tables(csv_dir, target_tables_ci):
  103. p = re.compile(r"_\d+$")
  104. result = []
  105. if not csv_dir.is_dir():
  106. print(f"Verzeichnis {csv_dir} existiert nicht!")
  107. return result
  108. for source_csv in csv_dir.glob("*.csv"):
  109. if source_csv.is_dir():
  110. continue
  111. table = source_csv.name[:-4].lower()
  112. if table not in target_tables_ci:
  113. table = p.sub("", table)
  114. if table not in target_tables_ci:
  115. print(f"rem Ziel-Tabelle '{table}' existiert nicht!")
  116. continue
  117. result.append((table, source_csv))
  118. return result
  119. def target_tables(target_dsn, target_schema):
  120. engine = create_engine(conn_string(target_dsn))
  121. target_insp = inspect(engine)
  122. target_tables = target_insp.get_table_names(schema=target_schema)
  123. return (target_insp, list(map(str.lower, target_tables)))
  124. def batch(csv_dir, action):
  125. target_insp, target_tables_ci = target_tables(target_dsn, target_schema)
  126. stage_schema = target_schema if action == "overwrite" else temp_schema
  127. print("@echo off")
  128. print("cd /d %~dp0")
  129. print('set PYTHON="C:\\dev\\Python\\Python38-32"')
  130. for table, source_csv in csv_tables(csv_dir, target_tables_ci):
  131. print(f"echo =={table}==")
  132. stage_csv = Path(f"{source_csv.parent}\\stage\\{source_csv.name}")
  133. try:
  134. tf_template = transform_template(
  135. target_insp, source_csv, table, target_schema
  136. )
  137. template_json = json.dumps(tf_template).replace('"', '\\"')
  138. print(
  139. f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "TRUNCATE TABLE [{stage_schema}].[{table}]" '
  140. )
  141. print(
  142. f'%PYTHON%\\python.exe csv_import.py transform "{source_csv}" -t "{template_json}" '
  143. )
  144. print(
  145. f'bcp.exe [{stage_schema}].[{table}] in "{stage_csv}" {conn_params(target_dsn)} -c -C 65001 -e "{stage_csv}.log" '
  146. )
  147. pkeys = target_insp.get_pk_constraint(table, schema=target_schema)
  148. if len(pkeys["constrained_columns"]) > 0:
  149. delete_sql = (
  150. f"DELETE T1 FROM [{target_schema}].[{table}] T1 INNER JOIN [{temp_schema}].[{table}] T2 ON "
  151. + " AND ".join(
  152. [
  153. f"T1.[{col}] = T2.[{col}]"
  154. for col in pkeys["constrained_columns"]
  155. ]
  156. )
  157. )
  158. print(f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "{delete_sql}" ')
  159. insert_sql = f"INSERT INTO [{target_schema}].[{table}] SELECT * FROM [{temp_schema}].[{table}]"
  160. print(f'sqlcmd.exe {conn_params(target_dsn)} -p -Q "{insert_sql}" ')
  161. print("")
  162. except Exception:
  163. print(f"rem {source_csv} fehlerhaft!")
  164. @plac.pos("action", "", choices=["batch", "transform"])
  165. @plac.pos("csv_dir", "", type=Path)
  166. @plac.opt("mode", "", choices=["overwrite", "append", "update"])
  167. @plac.opt("template", "")
  168. def main(action, csv_dir, mode="overwrite", template="[]"):
  169. if action == "transform":
  170. transform_file(csv_dir, template)
  171. else:
  172. batch(csv_dir, mode)
  173. if __name__ == "__main__":
  174. plac.call(main)
  175. # main("batch", csv_dir, "append")