db_create.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import json
  2. from pathlib import Path
  3. import pandas as pd
  4. from model import DatabaseInspect, load_config
  5. def get_import_config(filename: str, db_name: str):
  6. df = pd.read_csv(filename, sep=";", encoding="latin-1")
  7. if "cols" not in df.columns:
  8. df["target_db"] = db_name
  9. df["cols"] = ""
  10. df[["source", "target", "target_db", "filter", "query", "iterative", "cols"]].to_csv(
  11. filename, sep=";", encoding="latin-1", index=False
  12. )
  13. return df[df["target"].notnull()]
  14. def create(config_file: str = "database/CARLO.json"):
  15. cfg = load_config(config_file)
  16. base_dir = str(Path(cfg.batch_dir).parent)
  17. config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.target_dsn.database)
  18. source_db = DatabaseInspect(cfg.source_dsn, source=True)
  19. source_tables = source_db.get_tables()
  20. print(json.dumps(source_db.get_prefix(), indent=2))
  21. target_db = DatabaseInspect(cfg.target_dsn)
  22. target_tables = target_db.get_tables()
  23. for _, current_table in config.iterrows():
  24. with open(f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850") as f:
  25. f.write("@echo off \n")
  26. f.write("rem ==" + current_table["target"] + "==\n")
  27. if not current_table["target"] in target_tables:
  28. f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
  29. print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
  30. continue
  31. f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
  32. f.write(
  33. f"sqlcmd.exe {target_db.bcp_conn_params} -p "
  34. + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn.schema}].[{current_table['target']}]\" \n"
  35. )
  36. target_columns_list = target_db.get_columns(current_table["target"])
  37. target_column_types = target_db.get_columns_is_typeof_str(current_table["target"])
  38. if "CLIENT_DB" in target_columns_list:
  39. target_columns_list.remove("CLIENT_DB")
  40. target_columns_list.append("Client_DB")
  41. target_columns = set(target_columns_list)
  42. for client_db, prefix in cfg.clients.items():
  43. source_table = current_table["source"].format(prefix)
  44. if source_table not in source_tables:
  45. source_table2 = source_db.convert_table(source_table)
  46. if source_table2 not in source_tables:
  47. f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
  48. print(f"Quell-Tabelle '{source_table}' existiert nicht!")
  49. continue
  50. source_columns = set(source_db.get_columns(source_table))
  51. intersect = source_columns.intersection(target_columns)
  52. # print("Auf beiden Seiten: " + ";".join(intersect))
  53. diff1 = source_columns.difference(target_columns)
  54. if len(diff1) > 0:
  55. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  56. diff2 = target_columns.difference(source_columns)
  57. if "Client_DB" not in diff2:
  58. f.write("echo Spalte 'Client_DB' fehlt!\n")
  59. print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
  60. continue
  61. diff2.remove("Client_DB")
  62. if len(diff2) > 0:
  63. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  64. if not pd.isnull(current_table["query"]):
  65. select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
  66. elif "." in source_table or cfg.source_dsn.schema == "":
  67. if source_table[0] != "[":
  68. source_table = f"[{source_table}]"
  69. select_query = f"SELECT T1.* FROM {source_table} T1 "
  70. else:
  71. select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
  72. if not pd.isnull(current_table["filter"]):
  73. select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
  74. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  75. select_columns = ""
  76. for col, is_char_type in zip(target_columns_list, target_column_types):
  77. if col in intersect:
  78. if is_char_type:
  79. select_columns += f"dbo.cln(T1.[{col}]), "
  80. else:
  81. select_columns += f"T1.[{col}], "
  82. elif col == "Client_DB":
  83. select_columns += f"'{client_db}' as \\\"Client_DB\\\", "
  84. else:
  85. select_columns += "'' as \\\"" + col + '\\", '
  86. select_query = select_query.replace("T1.*", select_columns[:-2])
  87. if "timestamp" in source_columns:
  88. select_query += " ORDER BY T1.[timestamp] "
  89. else:
  90. print(current_table["target"] + " hat kein timestamp-Feld")
  91. pkey = target_db.get_pkey(current_table["target"])
  92. if len(pkey) == 0:
  93. print(current_table["target"] + " hat keinen Primaerschluessel")
  94. select_query = select_query.replace("%", "%%") # batch-Problem
  95. stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
  96. logfile = f"{cfg.logs_dir}\\{current_table['target']}_{client_db}"
  97. # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
  98. # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
  99. # print(select_query)
  100. bulk_copy = "bcp" if cfg.source_dsn.driver == "mssql" else "cet"
  101. f.write(
  102. f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params} -c -C 65001 -m 1000 '
  103. + f'-e "{logfile}.queryout.log" > "{logfile}.bcp1.log" \n'
  104. )
  105. f.write(f'type "{logfile}.bcp1.log" | findstr -v "1000" \n')
  106. f.write(
  107. f"bcp [{cfg.target_dsn.schema}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params} "
  108. + f'-c -C 65001 -m 1000 -e "{logfile}.in.log" > "{logfile}.bcp2.log" \n'
  109. )
  110. f.write(f'type "{logfile}.bcp2.log" | findstr -v "1000" \n')
  111. f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
  112. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
  113. f.write("@echo off & cd /d %~dp0 \n")
  114. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
  115. for index, current_table in config.iterrows():
  116. f.write(f"echo =={current_table['target']}==\n")
  117. f.write(f"echo {current_table['target']} >CON \n")
  118. f.write(f"call {current_table['target']}.bat\n\n")
  119. if __name__ == "__main__":
  120. create()