db_compare.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import codecs
  2. import sys
  3. import pandas as pd
  4. from pyodbc import ProgrammingError
  5. sys.path.insert(0, "C:\\Projekte\\tools")
  6. from database.db_create import get_table_config # noqa:E402
  7. from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402
  8. def decode_ts(ts_binary):
  9. return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
  10. def compare(config_file: str = "database/CARLO.json"):
  11. cfg = DbCreateConfig.load_config(config_file)
  12. table_config = get_table_config(cfg)
  13. for dest_table in table_config:
  14. dest_row_count = {}
  15. dest_timestamp = {}
  16. if dest_table.dest not in cfg.dest_inspect.tables_list:
  17. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  18. continue
  19. query_count_dest = (
  20. f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  21. )
  22. q = cfg.dest_inspect.cursor.execute(query_count_dest)
  23. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  24. query_timestamp_dest = (
  25. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  26. )
  27. q = cfg.dest_inspect.cursor.execute(query_timestamp_dest)
  28. dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
  29. source_row_count = {}
  30. source_row_count_ts = {}
  31. for source_table in dest_table.source_tables:
  32. source_table2 = cfg.source_inspect.convert_table(source_table.table_name)
  33. client_db = source_table.client_db
  34. if (
  35. source_table.table_name in cfg.source_inspect.tables_list
  36. or source_table2 in cfg.source_inspect.tables_list
  37. ):
  38. query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
  39. # print(query_count_source)
  40. q = cfg.source_inspect.cursor.execute(query_count_source)
  41. source_row_count[client_db] = q.fetchone()[0]
  42. query_ts = query_count_source
  43. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  44. if "WHERE" in query_ts:
  45. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  46. else:
  47. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  48. # print(query_ts)
  49. try:
  50. q = cfg.source_inspect.cursor.execute(query_ts)
  51. source_row_count_ts[client_db] = q.fetchone()[0]
  52. except ProgrammingError:
  53. pass
  54. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  55. print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
  56. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  57. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  58. print(f" dest: {dest_row_count.get(client_db, 0):>8}")
  59. compare_details(source_table, dest_table, query_count_source, cfg)
  60. def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
  61. cols = dest_table.primary_key + ["timestamp"]
  62. if "Client_DB" in cols:
  63. cols.remove("Client_DB")
  64. if "CLIENT_DB" in cols:
  65. cols.remove("CLIENT_DB")
  66. query_cols = ", ".join([f"T1.[{c}]" for c in cols])
  67. query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
  68. query_source += f" ORDER BY {query_cols}"
  69. query_dest = (
  70. f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 "
  71. f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
  72. )
  73. source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
  74. source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
  75. source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
  76. source_data.to_csv(source_file, index=False)
  77. dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
  78. dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
  79. dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
  80. dest_data.to_csv(dest_file, index=False)
  81. cols_without_ts = cols[:-1]
  82. only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
  83. only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
  84. only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
  85. if only_in_source.shape[0] > 0:
  86. ts_list = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
  87. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  88. query += f" AND T1.[timestamp] IN ({ts_list})"
  89. with open(only_in_source_file, "w") as fwh:
  90. fwh.write(query)
  91. only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
  92. only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
  93. only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
  94. if only_in_dest.shape[0] > 0:
  95. ts_list = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
  96. query = (
  97. f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
  98. f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list})"
  99. )
  100. with open(only_in_dest_file, "w") as fwh:
  101. fwh.write(query)
  102. not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
  103. not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
  104. not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
  105. if not_updated.shape[0] > 0:
  106. ts_list = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
  107. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  108. query += f" AND T1.[timestamp] IN ({ts_list})"
  109. with open(not_updated_file, "w") as fwh:
  110. fwh.write(query)
  111. if __name__ == "__main__":
  112. compare()