db_create.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import json
  2. from pathlib import Path
  3. import pandas as pd
  4. from database.model import DatabaseInspect, create_db_ini, load_config
  5. def get_import_config(filename: str, db_name: str):
  6. df = pd.read_csv(filename, sep=";", encoding="latin-1")
  7. if "dest" not in df.columns:
  8. df["dest"] = df["target"]
  9. df["dest_db"] = db_name
  10. df["cols"] = ""
  11. df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv(
  12. filename, sep=";", encoding="latin-1", index=False
  13. )
  14. return df[df["dest"].notnull()]
  15. def create(config_file: str = "database/CARLO.json"):
  16. cfg = load_config(config_file)
  17. create_db_ini(cfg)
  18. base_dir = str(Path(config_file).parent.parent.resolve())
  19. config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
  20. source_db = DatabaseInspect(cfg.source_dsn, source=True)
  21. source_tables = source_db.get_tables()
  22. print(json.dumps(source_db.get_prefix(), indent=2))
  23. dest_db = DatabaseInspect(cfg.dest_dsn)
  24. dest_tables = dest_db.get_tables()
  25. for _, current_table in config.iterrows():
  26. with open(f"{cfg.batch_dir}/{current_table['dest']}.bat", "w", encoding="cp850") as f:
  27. full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
  28. f.write("@echo off\n")
  29. f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n')
  30. f.write("rem ==" + current_table["dest"] + "==\n")
  31. if not current_table["dest"] in dest_tables:
  32. f.write(f"echo Ziel-Tabelle '{current_table['dest']}' existiert nicht!\n")
  33. print(f"Ziel-Tabelle '{current_table['dest']}' existiert nicht!")
  34. continue
  35. f.write(f"del {cfg.logs_dir}\\{current_table['dest']}*.* /Q /F >nul 2>nul\n\n")
  36. f.write('if not "%1"=="" goto :increment\n')
  37. f.write("\n:full\n")
  38. f.write(f' call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n')
  39. dest_columns_list = dest_db.get_columns(current_table["dest"])
  40. dest_column_types = dest_db.get_columns_is_typeof_str(current_table["dest"])
  41. if "CLIENT_DB" in dest_columns_list:
  42. dest_columns_list.remove("CLIENT_DB")
  43. dest_columns_list.append("Client_DB")
  44. dest_columns = set(dest_columns_list)
  45. select_queries: dict[str, str] = {}
  46. for client_db, prefix in cfg.clients.items():
  47. table_client = f'{current_table["dest"]}_{client_db}'
  48. source_table = current_table["source"].format(prefix)
  49. if source_table not in source_tables:
  50. source_table2 = source_db.convert_table(source_table)
  51. if source_table2 not in source_tables:
  52. f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
  53. print(f"Quell-Tabelle '{source_table}' existiert nicht!")
  54. continue
  55. source_columns = set(source_db.get_columns(source_table))
  56. intersect = source_columns.intersection(dest_columns)
  57. # print("Auf beiden Seiten: " + ";".join(intersect))
  58. diff1 = source_columns.difference(dest_columns)
  59. if len(diff1) > 0:
  60. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  61. diff2 = dest_columns.difference(source_columns)
  62. if "Client_DB" not in diff2:
  63. f.write("echo Spalte 'Client_DB' fehlt!\n")
  64. print(f"Ziel-Tabelle '{current_table['dest']}' Spalte 'Client_DB' fehlt!")
  65. continue
  66. diff2.remove("Client_DB")
  67. if len(diff2) > 0:
  68. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  69. if not pd.isnull(current_table["query"]):
  70. select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
  71. elif "." in source_table or cfg.source_dsn.schema == "":
  72. if source_table[0] != "[":
  73. source_table = f"[{source_table}]"
  74. select_query = f"SELECT T1.* FROM {source_table} T1 "
  75. else:
  76. select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
  77. if not pd.isnull(current_table["filter"]):
  78. select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
  79. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  80. select_columns = ""
  81. for col, is_char_type in zip(dest_columns_list, dest_column_types):
  82. if col in intersect:
  83. if False and is_char_type: # vorerst deaktiviert
  84. select_columns += f"dbo.cln(T1.[{col}]), "
  85. else:
  86. select_columns += f"T1.[{col}], "
  87. elif col == "Client_DB":
  88. select_columns += f"'{client_db}' as [Client_DB], "
  89. else:
  90. select_columns += "'' as [" + col + "], "
  91. select_query = select_query.replace("T1.*", select_columns[:-2])
  92. if "timestamp" in source_columns:
  93. select_query += " ORDER BY T1.[timestamp] "
  94. else:
  95. print(current_table["dest"] + " hat kein timestamp-Feld")
  96. select_query = select_query.replace("%", "%%%%") # batch-Problem
  97. select_queries[table_client] = select_query
  98. f.write(f' call bcp_queryout.bat "{table_client}" "{select_query}"\n')
  99. f.write(
  100. f' call bcp_in.bat "{table_client}" '
  101. f'"[{cfg.dest_dsn.schema}].[{current_table["dest"]}]" "{current_table["dest_db"]}"\n'
  102. )
  103. f.write(" goto :cleanup\n\n")
  104. f.write(":increment\n")
  105. temp_table_name = f"[{cfg.temp_db}].[temp].[{current_table['dest']}]"
  106. f.write(f' call sql_query.bat "TRUNCATE TABLE {temp_table_name}"\n\n')
  107. for client_db, prefix in cfg.clients.items():
  108. table_client = f'{current_table["dest"]}_{client_db}'
  109. select_query = select_queries[table_client]
  110. convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)"
  111. if "WHERE" in select_query:
  112. select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND")
  113. elif "ORDER" in select_query:
  114. select_query = select_query.replace("ORDER", f"WHERE {convert_timestamp} ORDER")
  115. else:
  116. print("Dont know where to put WHERE")
  117. f.write(f' call sql_timestamp.bat "{table_client}" "{full_table_name}" "{client_db}"\n')
  118. f.write(f' call bcp_queryout.bat "{table_client}" "{select_query}"\n')
  119. f.write(f' call bcp_in.bat "{table_client}" "[temp].[{current_table["dest"]}]" "{cfg.temp_db}"\n\n')
  120. insert_query = f"INSERT INTO {full_table_name} SELECT * FROM {temp_table_name} T1"
  121. pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
  122. if len(pkey) == 0:
  123. print(current_table["dest"] + " hat keinen Primaerschluessel")
  124. f.write(f" rem {current_table['dest']} hat keinen Primaerschluessel")
  125. else:
  126. pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in pkey]
  127. pkey_join = " AND ".join(pkey_join_list)
  128. delete_query = f"DELETE T1 FROM {full_table_name} T1 INNER JOIN {temp_table_name} T2 ON {pkey_join}"
  129. f.write(f' call sql_query.bat "{delete_query}"\n')
  130. f.write(f' call sql_query.bat "{insert_query}"\n')
  131. f.write("\n:cleanup\n")
  132. for client_db, prefix in cfg.clients.items():
  133. stage_csv = f"{cfg.stage_dir}\\{current_table['dest']}_{client_db}.csv"
  134. f.write(f' del "{stage_csv}" /F >nul 2>nul\n')
  135. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
  136. f.write("@echo off & cd /d %~dp0\n")
  137. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
  138. for index, current_table in config.iterrows():
  139. f.write(f"echo =={current_table['dest']}==\n")
  140. f.write(f"echo {current_table['dest']} >CON\n")
  141. f.write(f"call {cfg.batch_dir}\\{current_table['dest']}.bat 1\n\n")
  142. if __name__ == "__main__":
  143. create()