db_compare.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import codecs
  2. import json
  3. from pathlib import Path
  4. import pandas as pd
  5. import pyodbc
  6. from database.db_create import get_import_config
  7. from database.model import DatabaseInspect, create_db_ini, load_config
  8. def compare(config_file: str = "database/CARLO.json"):
  9. cfg = load_config(config_file)
  10. create_db_ini(cfg)
  11. base_dir = str(Path(config_file).parent.parent.resolve())
  12. config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
  13. source_db = DatabaseInspect(cfg.source_dsn, source=True)
  14. source_tables = source_db.get_tables()
  15. print(json.dumps(source_db.get_prefix(), indent=2))
  16. dest_db = DatabaseInspect(cfg.dest_dsn)
  17. dest_tables = dest_db.get_tables()
  18. for _, current_table in config.iterrows():
  19. dest_row_count = {}
  20. dest_timestamp = {}
  21. if current_table["dest"] in dest_tables:
  22. full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
  23. # pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
  24. # cols = pkey + ["timestamp"]
  25. query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]"
  26. q = dest_db.cursor.execute(query_count_dest)
  27. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  28. query_timestamp_dest = (
  29. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]"
  30. )
  31. q = dest_db.cursor.execute(query_timestamp_dest)
  32. dest_timestamp = dict(
  33. [(col[0], "0x" + codecs.encode(col[1], "hex_codec").decode()) for col in q.fetchall()]
  34. )
  35. source_row_count = {}
  36. source_row_count_ts = {}
  37. for client_db, prefix in cfg.clients.items():
  38. # table_client = f'{current_table["dest"]}_{client_db}'
  39. source_table = current_table["source"].format(prefix)
  40. source_table2 = source_table.split(".")[-1][1:-1]
  41. if source_table in source_tables or source_table2 in source_tables:
  42. if not pd.isnull(current_table["query"]):
  43. select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
  44. elif "." in source_table or cfg.source_dsn.schema == "":
  45. if source_table[0] != "[":
  46. source_table = f"[{source_table}]"
  47. select_query = f"SELECT T1.* FROM {source_table} T1 "
  48. else:
  49. select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
  50. if not pd.isnull(current_table["filter"]):
  51. select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
  52. query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]")
  53. # print(query_count_source)
  54. q = source_db.cursor.execute(query_count_source)
  55. source_row_count[client_db] = q.fetchone()[0]
  56. query_ts = query_count_source
  57. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  58. if "WHERE" in query_ts:
  59. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  60. else:
  61. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  62. # print(query_ts)
  63. try:
  64. q = source_db.cursor.execute(query_ts)
  65. source_row_count_ts[client_db] = q.fetchone()[0]
  66. except pyodbc.ProgrammingError:
  67. pass
  68. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  69. print(f"Tabelle {current_table['dest']} mit Client {client_db} stimmt nicht ueberein.")
  70. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  71. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  72. print(f" dest: {dest_row_count.get(client_db, 0):>8}")