db_create.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import io
  2. import json
  3. import sys
  4. from dataclasses import dataclass
  5. from functools import cached_property
  6. from pathlib import Path
  7. import pandas as pd
  8. sys.path.insert(0, "C:\\Projekte\\tools")
  9. from database.model import ( # noqa:E402
  10. DatabaseInspect,
  11. DbCreateConfig,
  12. create_db_ini,
  13. load_config,
  14. )
  15. def get_import_config(filename: str, db_name: str):
  16. df = pd.read_csv(filename, sep=";", encoding="latin-1")
  17. if "dest" not in df.columns:
  18. df["dest"] = df["target"]
  19. df["dest_db"] = db_name
  20. df["cols"] = ""
  21. df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv(
  22. filename, sep=";", encoding="latin-1", index=False
  23. )
  24. return df[df["dest"].notnull()]
  25. @dataclass
  26. class SourceTable:
  27. source: str
  28. client_db: str
  29. prefix: str
  30. @property
  31. def stage_csv(self) -> str:
  32. return ""
  33. @property
  34. def table_client(self) -> str:
  35. return ""
  36. @property
  37. def table_name(self) -> str:
  38. return ""
  39. @property
  40. def select_query(self) -> str:
  41. return ""
  42. @dataclass
  43. class DestTable:
  44. source: str
  45. dest: str
  46. dest_db: str
  47. filter_: str
  48. query: str
  49. iterative: str
  50. cols: str
  51. source_tables: list[SourceTable] = None
  52. dest_inspect: DatabaseInspect = None
  53. def table_batch_file(self, batch_dir: str) -> str:
  54. return f"{batch_dir}/{self.dest}.bat"
  55. def full_table_name(self, schema_name: str) -> str:
  56. return f"[{self.dest_db}].[{schema_name}].[{self.dest}]"
  57. def temp_table_name(self, temp_db: str) -> str:
  58. return f"[{temp_db}].[temp].[{self.dest}]"
  59. @cached_property
  60. def columns_list(self) -> list[str]:
  61. res = self.dest_inspect.get_columns(self.dest)
  62. if "CLIENT_DB" in res:
  63. res.remove("CLIENT_DB")
  64. res.append("Client_DB")
  65. return res
  66. @cached_property
  67. def column_types(self) -> list[str]:
  68. return self.dest_inspect.get_columns_is_typeof_str(self.dest)
  69. @cached_property
  70. def primary_key(self) -> list[str]:
  71. return self.dest_inspect.get_pkey(self.dest, self.dest_db)
  72. @property
  73. def insert_query(self) -> str:
  74. return f"INSERT INTO {self.full_table_name} SELECT * FROM {self.temp_table_name} T1"
  75. @property
  76. def delete_query(self) -> str:
  77. # pkey = self.primary_key
  78. if len(self.primary_key) == 0:
  79. return ""
  80. pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
  81. pkey_join = " AND ".join(pkey_join_list)
  82. return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
  83. class SourceTable2(SourceTable):
  84. dest_table: DestTable
  85. cfg: DbCreateConfig
  86. _select_query: str = None
  87. source_inspect: DatabaseInspect = None
  88. info: str = ""
  89. @property
  90. def table_client(self) -> str:
  91. return f"{self.dest_table.dest}_{self.client_db}"
  92. @property
  93. def stage_csv(self) -> str:
  94. return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
  95. @property
  96. def table_name(self) -> str:
  97. return self.source.format(self.prefix)
  98. @cached_property
  99. def select_query(self):
  100. f = io.StringIO()
  101. source_columns = set(self.source_inspect.get_columns(self.table_name))
  102. intersect = source_columns.intersection(self.dest_table.columns_list)
  103. # print("Auf beiden Seiten: " + ";".join(intersect))
  104. diff1 = source_columns.difference(self.dest_table.columns_list)
  105. if len(diff1) > 0:
  106. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  107. diff2 = set(self.dest_table.columns_list).difference(source_columns)
  108. if "Client_DB" not in diff2:
  109. f.write("echo Spalte 'Client_DB' fehlt!\n")
  110. return
  111. diff2.remove("Client_DB")
  112. if len(diff2) > 0:
  113. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  114. if not pd.isnull(self.dest_table.query):
  115. select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
  116. elif "." in self.table_name or self.cfg.source_dsn.schema == "":
  117. if self.table_name[0] != "[":
  118. self.table_name = f"[{self.table_name}]"
  119. select_query = f"SELECT T1.* FROM {self.table_name} T1 "
  120. else:
  121. select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
  122. if not pd.isnull(self.dest_table.filter_):
  123. select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
  124. elif "WHERE" not in select_query:
  125. select_query += " WHERE 1 = 1"
  126. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  127. select_columns = ""
  128. for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
  129. if col in intersect:
  130. if False and is_char_type: # vorerst deaktiviert
  131. select_columns += f"dbo.cln(T1.[{col}]), "
  132. else:
  133. select_columns += f"T1.[{col}], "
  134. elif col == "Client_DB":
  135. select_columns += f"'{self.client_db}' as [Client_DB], "
  136. else:
  137. select_columns += "'' as [" + col + "], "
  138. select_query = select_query.replace("T1.*", select_columns[:-2])
  139. if "timestamp" in source_columns:
  140. select_query += " ORDER BY T1.[timestamp] "
  141. else:
  142. print(self.dest_table.dest + " hat kein timestamp-Feld")
  143. self.info = f.getvalue()
  144. return select_query
  145. def create(config_file: str = "database/CARLO.json"):
  146. cfg = load_config(config_file)
  147. create_db_ini(cfg)
  148. base_dir = str(Path(config_file).parent.parent.resolve())
  149. source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
  150. print(json.dumps(source_inspect.get_prefix(), indent=2))
  151. SourceTable2.source_inspect = source_inspect
  152. SourceTable2.cfg = cfg
  153. dest_inspect = DatabaseInspect(cfg.dest_dsn)
  154. # DestTable.dest_inspect = dest_inspect
  155. config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
  156. table_config = [DestTable(*row.values()) for row in config.to_dict(orient="records")]
  157. for dest_table in table_config:
  158. dest_table.dest_inspect = dest_inspect
  159. dest_table.source_tables = []
  160. for client_db, prefix in cfg.clients.items():
  161. st = SourceTable2(dest_table.source, client_db, prefix)
  162. st.dest_table = dest_table
  163. dest_table.source_tables.append(st)
  164. for dest_table in table_config:
  165. with open(dest_table.table_batch_file(cfg.batch_dir), "w", encoding="cp850") as f:
  166. full_table_name = dest_table.full_table_name(cfg.dest_dsn.schema)
  167. f.write("@echo off\n")
  168. f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n')
  169. f.write("rem ==" + dest_table.dest + "==\n")
  170. if dest_table.dest not in dest_inspect.tables_list:
  171. f.write(f"echo Ziel-Tabelle '{dest_table.dest}' existiert nicht!\n")
  172. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  173. continue
  174. f.write(f"del {cfg.logs_dir}\\{dest_table.dest}*.* /Q /F >nul 2>nul\n\n")
  175. f.write('if not "%1"=="" goto :increment\n')
  176. f.write("\n:full\n")
  177. f.write(f' call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n')
  178. for source_table in dest_table.source_tables:
  179. if source_table.table_name not in source_inspect.tables_list:
  180. source_table2 = source_inspect.convert_table(source_table.table_name)
  181. if source_table2 not in source_inspect.tables_list:
  182. f.write(f"echo Quell-Tabelle '{source_table.table_name}' existiert nicht!\n")
  183. print(f"Quell-Tabelle '{source_table.table_name}' existiert nicht!")
  184. continue
  185. select_query = source_table.select_query.replace("%", "%%%%") # batch-Problem
  186. f.write(source_table.info)
  187. if select_query == "":
  188. print(f"Ziel-Tabelle '{dest_table.dest}' Spalte 'Client_DB' fehlt!")
  189. continue
  190. f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
  191. f.write(
  192. f' call bcp_in.bat "{source_table.table_client}" '
  193. f'"[{cfg.dest_dsn.schema}].[{dest_table.dest}]" "{dest_table.dest_db}"\n'
  194. )
  195. f.write(" goto :cleanup\n\n")
  196. f.write(":increment\n")
  197. f.write(f' call sql_query.bat "TRUNCATE TABLE {dest_table.temp_table_name}"\n\n')
  198. for source_table in dest_table.source_tables:
  199. select_query = source_table.select_query
  200. convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)"
  201. if "WHERE" in select_query:
  202. select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND")
  203. else:
  204. print("Dont know where to put WHERE")
  205. f.write(f' call sql_timestamp.bat "{source_table.table_client}" "{full_table_name}" "{client_db}"\n')
  206. f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
  207. f.write(
  208. f' call bcp_in.bat "{source_table.table_client}" "[temp].[{dest_table.dest}]" "{cfg.temp_db}"\n\n'
  209. )
  210. if dest_table.delete_query == "":
  211. print(dest_table.dest + " hat keinen Primaerschluessel")
  212. f.write(f" rem {dest_table.dest} hat keinen Primaerschluessel")
  213. else:
  214. f.write(f' call sql_query.bat "{dest_table.delete_query}"\n')
  215. f.write(f' call sql_query.bat "{dest_table.insert_query}"\n')
  216. f.write("\n:cleanup\n")
  217. for source_table in dest_table.source_tables:
  218. f.write(f' call delete.bat "{source_table.stage_csv}"\n')
  219. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
  220. f.write("@echo off & cd /d %~dp0\n")
  221. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
  222. for dest_table in table_config:
  223. f.write(f"echo =={dest_table.dest}==\n")
  224. f.write(f"echo {dest_table.dest} >CON\n")
  225. f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat 1\n\n")
  226. with open(f"{cfg.batch_dir}/_{cfg.name}_full_load.bat", "w", encoding="cp850") as f:
  227. f.write("@echo off & cd /d %~dp0\n")
  228. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
  229. for dest_table in table_config:
  230. f.write(f"echo =={dest_table.dest}==\n")
  231. f.write(f"echo {dest_table.dest} >CON\n")
  232. f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat\n\n")
  233. if __name__ == "__main__":
  234. create()