db_create.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import json
  2. import pandas as pd
  3. from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402
  4. def get_import_config(filename: str, db_name: str) -> pd.DataFrame:
  5. df = pd.read_csv(filename, sep=";", encoding="latin-1")
  6. if "dest" not in df.columns:
  7. df["dest"] = df["target"]
  8. df["dest_db"] = db_name
  9. df["cols"] = ""
  10. df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv(
  11. filename, sep=";", encoding="latin-1", index=False
  12. )
  13. return df[df["dest"].notnull()]
  14. def get_table_config(cfg: DbCreateConfig) -> list[DestTable]:
  15. config = get_import_config(cfg.csv_file, cfg.dest_dsn.database)
  16. SourceTable2.source_inspect = cfg.source_inspect
  17. SourceTable2.cfg = cfg
  18. table_config = [DestTable(*row.values()) for row in config.to_dict(orient="records")]
  19. for dest_table in table_config:
  20. dest_table.cfg = cfg
  21. dest_table.dest_inspect = cfg.dest_inspect
  22. dest_table.source_tables = []
  23. for client_db, prefix in cfg.clients.items():
  24. st = SourceTable2(dest_table.source, client_db, prefix)
  25. st.dest_table = dest_table
  26. dest_table.source_tables.append(st)
  27. return table_config
  28. def create(config_file: str = "database/CARLO.json"):
  29. cfg = DbCreateConfig.load_config(config_file)
  30. cfg.create_db_ini()
  31. print("Clients:")
  32. print(json.dumps(cfg.clients, indent=2))
  33. print("Vorschlaege fuer Clients:")
  34. print(json.dumps(cfg.source_inspect.get_prefix(), indent=2))
  35. table_config = get_table_config(cfg)
  36. for dest_table in table_config:
  37. with open(f"{cfg.sql_import_full_dir}\\{dest_table.dest}.sql", "w", encoding="latin-1") as f:
  38. if dest_table.dest not in cfg.dest_inspect.tables_list:
  39. f.write(f"-- Ziel-Tabelle '{dest_table.dest}' existiert nicht!\n")
  40. continue
  41. f.write(f"TRUNCATE TABLE {dest_table.full_table_name}\nGO\n\n")
  42. for source_table in dest_table.source_tables:
  43. if source_table.table_name not in cfg.source_inspect.tables_list:
  44. source_table_str = cfg.source_inspect.convert_table(source_table.table_name)
  45. if source_table_str not in cfg.source_inspect.tables_list:
  46. f.write(f"-- Quell-Tabelle '{source_table.table_name}' existiert nicht!\n")
  47. continue
  48. select_query = source_table.select_query_with_columns.replace(
  49. "[dbo]", f"[{cfg.source_inspect.dsn.server}].[{cfg.source_inspect.dsn.database}].[dbo]"
  50. ).replace(",", "\n" + " " * 5 + ",")
  51. for tag in ["SELECT", "FROM", "INNER", "LEFT", "WHERE", "ORDER BY"]:
  52. select_query = select_query.replace(tag, "\n" + tag)
  53. for tag in ["AND", "ON"]:
  54. select_query = select_query.replace(tag, "\n " + tag)
  55. f.write(source_table.info.replace("rem ", "-- "))
  56. if select_query == "":
  57. f.write(f"-- Ziel-Tabelle '{dest_table.dest}' Spalte 'Client_DB' fehlt!\n")
  58. continue
  59. f.write(
  60. f"INSERT INTO [{cfg.dest_dsn.schema}].[{dest_table.dest}]\nWITH (TABLOCK)\n{select_query}\nGO\n"
  61. )
  62. for dest_table in table_config:
  63. with open(dest_table.table_batch_file, "w", encoding="cp850") as f:
  64. f.write("@echo off\n")
  65. f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n')
  66. f.write("rem ==" + dest_table.dest + "==\n")
  67. if dest_table.dest not in cfg.dest_inspect.tables_list:
  68. f.write(f"echo Ziel-Tabelle '{dest_table.dest}' existiert nicht!\n")
  69. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  70. continue
  71. f.write(f"del {cfg.logs_dir}\\{dest_table.dest}*.* /Q /F >nul 2>nul\n\n")
  72. f.write('if not "%1"=="" goto :increment\n')
  73. f.write("\n:full\n")
  74. f.write(f' call sql_query.bat "TRUNCATE TABLE {dest_table.full_table_name}"\n')
  75. for source_table in dest_table.source_tables:
  76. if source_table.table_name not in cfg.source_inspect.tables_list:
  77. source_table_str = cfg.source_inspect.convert_table(source_table.table_name)
  78. if source_table_str not in cfg.source_inspect.tables_list:
  79. f.write(f"echo Quell-Tabelle '{source_table.table_name}' existiert nicht!\n")
  80. print(f"Quell-Tabelle '{source_table.table_name}' existiert nicht!")
  81. continue
  82. select_query = source_table.select_query_with_columns.replace("%", "%%%%") # batch-Problem
  83. f.write(source_table.info)
  84. if select_query == "":
  85. print(f"Ziel-Tabelle '{dest_table.dest}' Spalte 'Client_DB' fehlt!")
  86. continue
  87. f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
  88. f.write(
  89. f' call bcp_in.bat "{source_table.table_client}" '
  90. f'"[{cfg.dest_dsn.schema}].[{dest_table.dest}]" "{dest_table.dest_db}"\n'
  91. )
  92. f.write(" goto :cleanup\n\n")
  93. f.write(":increment\n")
  94. f.write(f' call sql_query.bat "TRUNCATE TABLE {dest_table.temp_table_name}"\n\n')
  95. for source_table in dest_table.source_tables:
  96. select_query = source_table.select_query_with_columns.replace("%", "%%%%") # batch-Problem
  97. convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)"
  98. if "WHERE" in select_query:
  99. select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND")
  100. else:
  101. print("Dont know where to put WHERE")
  102. f.write(
  103. f' call sql_timestamp.bat "{source_table.table_client}" "{dest_table.full_table_name}" "{source_table.client_db}"\n'
  104. )
  105. f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
  106. f.write(
  107. f' call bcp_in.bat "{source_table.table_client}" "[temp].[{dest_table.dest}]" "{cfg.temp_db}"\n\n'
  108. )
  109. if dest_table.delete_query == "":
  110. print(dest_table.dest + " hat keinen Primaerschluessel")
  111. f.write(f" rem {dest_table.dest} hat keinen Primaerschluessel")
  112. else:
  113. f.write(f' call sql_query.bat "{dest_table.delete_query}"\n')
  114. f.write(f' call sql_query.bat "{dest_table.insert_query}"\n')
  115. f.write("\n:cleanup\n")
  116. for source_table in dest_table.source_tables:
  117. f.write(f' call delete.bat "{source_table.stage_csv}"\n')
  118. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
  119. f.write("@echo off & cd /d %~dp0\n")
  120. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
  121. for dest_table in table_config:
  122. f.write(f"echo =={dest_table.dest}==\n")
  123. f.write(f"echo {dest_table.dest} >CON\n")
  124. f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat 1\n\n")
  125. with open(f"{cfg.batch_dir}/_{cfg.name}_full_load.bat", "w", encoding="cp850") as f:
  126. f.write("@echo off & cd /d %~dp0\n")
  127. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
  128. for dest_table in table_config:
  129. f.write(f"echo =={dest_table.dest}==\n")
  130. f.write(f"echo {dest_table.dest} >CON\n")
  131. f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat\n\n")