db_compare.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import codecs
  2. import json
  3. from pathlib import Path
  4. import pandas as pd
  5. from pyodbc import ProgrammingError
  6. from database.db_create import get_import_config
  7. from database.model import DatabaseInspect, DbCreateConfig, create_db_ini, load_config
  8. def decode_ts(ts_binary):
  9. return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
  10. def compare(config_file: str = "database/CARLO.json"):
  11. cfg = load_config(config_file)
  12. create_db_ini(cfg)
  13. base_dir = str(Path(config_file).parent.parent.resolve())
  14. config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
  15. source_db = DatabaseInspect(cfg.source_dsn, source=True)
  16. source_tables = source_db.tables_list()
  17. print(json.dumps(source_db.get_prefix(), indent=2))
  18. dest_db = DatabaseInspect(cfg.dest_dsn)
  19. dest_tables = dest_db.tables_list()
  20. for _, current_table in config.iterrows():
  21. dest_row_count = {}
  22. dest_timestamp = {}
  23. if current_table["dest"] in dest_tables:
  24. full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
  25. query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]"
  26. q = dest_db.cursor.execute(query_count_dest)
  27. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  28. query_timestamp_dest = (
  29. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]"
  30. )
  31. q = dest_db.cursor.execute(query_timestamp_dest)
  32. dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
  33. source_row_count = {}
  34. source_row_count_ts = {}
  35. for client_db, prefix in cfg.clients.items():
  36. source_table = current_table["source"].format(prefix)
  37. source_table2 = source_table.split(".")[-1][1:-1]
  38. if source_table in source_tables or source_table2 in source_tables:
  39. if not pd.isnull(current_table["query"]):
  40. select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
  41. elif "." in source_table or cfg.source_dsn.schema == "":
  42. if source_table[0] != "[":
  43. source_table = f"[{source_table}]"
  44. select_query = f"SELECT T1.* FROM {source_table} T1 "
  45. else:
  46. select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
  47. if not pd.isnull(current_table["filter"]):
  48. select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
  49. elif "WHERE" not in select_query:
  50. select_query += " WHERE 1 = 1"
  51. query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]")
  52. # print(query_count_source)
  53. q = source_db.cursor.execute(query_count_source)
  54. source_row_count[client_db] = q.fetchone()[0]
  55. query_ts = query_count_source
  56. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  57. if "WHERE" in query_ts:
  58. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  59. else:
  60. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  61. # print(query_ts)
  62. try:
  63. q = source_db.cursor.execute(query_ts)
  64. source_row_count_ts[client_db] = q.fetchone()[0]
  65. except ProgrammingError:
  66. pass
  67. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  68. print(f"Tabelle {current_table['dest']} mit Client {client_db} stimmt nicht ueberein.")
  69. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  70. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  71. print(f" dest: {dest_row_count.get(client_db, 0):>8}")
  72. compare_details(current_table, client_db, source_db, dest_db, query_count_source, full_table_name, cfg)
  73. def compare_details(
  74. current_table: pd.Series,
  75. client_db: str,
  76. source_db: DatabaseInspect,
  77. dest_db: DatabaseInspect,
  78. query_count_source: str,
  79. full_table_name: str,
  80. cfg: DbCreateConfig,
  81. ):
  82. table_client = f'{current_table["dest"]}_{client_db}'
  83. pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
  84. cols = pkey + ["timestamp"]
  85. if "Client_DB" in cols:
  86. cols.remove("Client_DB")
  87. if "CLIENT_DB" in cols:
  88. cols.remove("CLIENT_DB")
  89. query_cols = ", ".join([f"T1.[{c}]" for c in cols])
  90. query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
  91. query_source += f" ORDER BY {query_cols}"
  92. query_dest = (
  93. f"SELECT {query_cols} FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' ORDER BY {query_cols}"
  94. )
  95. source_file = f"{cfg.stage_dir}\\source\\{table_client}.csv"
  96. source_data = pd.read_sql(query_source, source_db.sqlalchemy_engine)
  97. source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
  98. source_data.to_csv(source_file, index=False)
  99. dest_file = f"{cfg.stage_dir}\\dest\\{table_client}.csv"
  100. dest_data = pd.read_sql(query_dest, dest_db.sqlalchemy_engine)
  101. dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
  102. dest_data.to_csv(dest_file, index=False)
  103. cols_without_ts = cols[:-1]
  104. only_in_source_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_source.sql"
  105. only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
  106. only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
  107. if only_in_source.shape[0] > 0:
  108. ts_list = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
  109. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  110. query += f" AND T1.[timestamp] IN ({ts_list})"
  111. with open(only_in_source_file, "w") as fwh:
  112. fwh.write(query)
  113. only_in_dest_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_dest.sql"
  114. only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
  115. only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
  116. if only_in_dest.shape[0] > 0:
  117. ts_list = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
  118. query = f"SELECT T1.* FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' AND T1.[timestamp] IN ({ts_list})"
  119. with open(only_in_dest_file, "w") as fwh:
  120. fwh.write(query)
  121. not_updated_file = f"{cfg.stage_dir}\\diff\\{table_client}_not_updated.sql"
  122. not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
  123. not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
  124. if not_updated.shape[0] > 0:
  125. ts_list = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
  126. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  127. query += f" AND T1.[timestamp] IN ({ts_list})"
  128. with open(not_updated_file, "w") as fwh:
  129. fwh.write(query)