db_compare.py 8.9 KB


  1. import codecs
  2. import itertools
  3. import re
  4. import sys
  5. from datetime import datetime
  6. import pandas as pd
  7. from pyodbc import Error, ProgrammingError
  8. sys.path.insert(0, "C:\\Projekte\\tools")
  9. from database.db_create import get_table_config # noqa:E402
  10. from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402
  11. def decode_ts(ts_binary):
  12. return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
  13. def convert_column(col):
  14. if col is None:
  15. return "null"
  16. if isinstance(col, str):
  17. col = col.replace("'", "")
  18. return f"'{col}'"
  19. if isinstance(col, bytes):
  20. return decode_ts(col)
  21. if isinstance(col, datetime):
  22. return f"'{col.isoformat()[:19]}'"
  23. if isinstance(col, bool):
  24. return "1" if col else "0"
  25. return str(col)
  26. def compare(config_file: str = "database/CARLO.json"):
  27. cfg = DbCreateConfig.load_config(config_file)
  28. table_config = get_table_config(cfg)
  29. for dest_table in table_config:
  30. dest_row_count = {}
  31. dest_timestamp = {}
  32. if dest_table.dest not in cfg.dest_inspect.tables_list:
  33. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  34. continue
  35. query_count_dest = (
  36. f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  37. )
  38. q = cfg.dest_inspect.cursor.execute(query_count_dest)
  39. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  40. query_timestamp_dest = (
  41. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  42. )
  43. q = cfg.dest_inspect.cursor.execute(query_timestamp_dest)
  44. dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
  45. source_row_count = {}
  46. source_row_count_ts = {}
  47. for source_table in dest_table.source_tables:
  48. source_table2 = cfg.source_inspect.convert_table(source_table.table_name)
  49. client_db = source_table.client_db
  50. if (
  51. source_table.table_name in cfg.source_inspect.tables_list
  52. or source_table2 in cfg.source_inspect.tables_list
  53. ):
  54. query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
  55. q = cfg.source_inspect.cursor.execute(query_count_source)
  56. source_row_count[client_db] = q.fetchone()[0]
  57. query_ts = query_count_source
  58. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  59. if "WHERE" in query_ts:
  60. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  61. else:
  62. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  63. try:
  64. q = cfg.source_inspect.cursor.execute(query_ts)
  65. source_row_count_ts[client_db] = q.fetchone()[0]
  66. except ProgrammingError:
  67. pass
  68. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  69. print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
  70. print(f" Timestamp: {ts}")
  71. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  72. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  73. print(f" dest: {dest_row_count.get(client_db, 0):>8}")
  74. res = compare_details(source_table, dest_table, query_count_source, cfg)
  75. resolve_mismatch(source_table, dest_table, cfg, res)
  76. def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
  77. cols = dest_table.primary_key + ["timestamp"]
  78. if "Client_DB" in cols:
  79. cols.remove("Client_DB")
  80. if "CLIENT_DB" in cols:
  81. cols.remove("CLIENT_DB")
  82. query_cols = ", ".join([f"T1.[{c}]" for c in cols])
  83. query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
  84. query_source += f" ORDER BY {query_cols}"
  85. query_dest = (
  86. f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 "
  87. f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
  88. )
  89. source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
  90. source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
  91. # source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
  92. # source_data.to_csv(source_file, index=False)
  93. dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
  94. dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
  95. # dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
  96. # dest_data.to_csv(dest_file, index=False)
  97. cols_without_ts = cols[:-1]
  98. only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
  99. only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
  100. only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
  101. ts_list1 = "0x0000000000000000"
  102. if only_in_source.shape[0] > 0:
  103. ts_list1 = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
  104. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  105. query += f" AND T1.[timestamp] IN ({ts_list1})"
  106. with open(only_in_source_file, "w") as fwh:
  107. fwh.write(query)
  108. not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
  109. not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
  110. not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
  111. ts_list2 = "0x0000000000000000"
  112. if not_updated.shape[0] > 0:
  113. ts_list2 = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
  114. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  115. query += f" AND T1.[timestamp] IN ({ts_list2})"
  116. with open(not_updated_file, "w") as fwh:
  117. fwh.write(query)
  118. only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
  119. only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
  120. only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
  121. ts_list0 = ""
  122. if only_in_dest.shape[0] > 0:
  123. ts_list0 = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
  124. query = (
  125. f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
  126. f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list0})"
  127. )
  128. with open(only_in_dest_file, "w") as fwh:
  129. fwh.write(query)
  130. return [ts_list0, ts_list1 + ", " + ts_list2]
  131. def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbCreateConfig, ts_lists: list[str]):
  132. if len(ts_lists[0]) > 100_000 or len(ts_lists[1]) > 100_000:
  133. print(f"!! Tabellen-Import fuer {source_table.table_client} abgebrochen: zu viele Zeilen !!")
  134. print(" Veraltete Datensaetze in Ziel: " + str(ts_lists[0].count(",") + 1))
  135. print(" Fehlende Datensaetze in Ziel: " + str(ts_lists[1].count(",") + 1))
  136. return
  137. if ts_lists[0] != "":
  138. delete_query = (
  139. f"DELETE FROM {dest_table.full_table_name} "
  140. f"WHERE [Client_DB] = '{source_table.client_db}' AND [timestamp] IN ({ts_lists[0]})"
  141. )
  142. res = cfg.dest_inspect.cursor.execute(delete_query)
  143. print(res.rowcount)
  144. res.commit()
  145. res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}")
  146. res.commit()
  147. print(res.rowcount)
  148. query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
  149. res = cfg.source_inspect.cursor.execute(query)
  150. for i in itertools.count(start=1):
  151. insert_rows = []
  152. chunk = res.fetchmany(1_000)
  153. if not chunk:
  154. break
  155. for row in chunk:
  156. insert_rows.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
  157. print(len(insert_rows))
  158. if len(insert_rows) == 0:
  159. break
  160. insert_query = f"INSERT INTO {dest_table.temp_table_name} with (TABLOCK) VALUES \n" + ", \n".join(insert_rows)
  161. try:
  162. res2 = cfg.dest_inspect.cursor.execute(insert_query)
  163. print(res2.rowcount)
  164. res2.commit()
  165. except Error as e:
  166. with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert_{i}.sql", "w") as fwh:
  167. fwh.write(insert_query)
  168. print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
  169. print(e)
  170. res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
  171. print(res.rowcount)
  172. res.commit()
  173. res = cfg.dest_inspect.cursor.execute(dest_table.insert_query)
  174. print(res.rowcount)
  175. res.commit()
  176. if __name__ == "__main__":
  177. compare()