db_compare.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import codecs
  2. import itertools
  3. import re
  4. import sys
  5. from datetime import datetime
  6. import pandas as pd
  7. from pyodbc import Error, ProgrammingError
  8. sys.path.insert(0, "C:\\Projekte\\tools")
  9. from database.db_create import get_table_config # noqa:E402
  10. from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402
  11. def decode_ts(ts_binary):
  12. return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
  13. def convert_column(col):
  14. if col is None:
  15. return "null"
  16. if isinstance(col, str):
  17. col = col.replace("'", "")
  18. return f"'{col}'"
  19. if isinstance(col, bytes):
  20. return decode_ts(col)
  21. if isinstance(col, datetime):
  22. return f"'{col.isoformat()[:19]}'"
  23. if isinstance(col, bool):
  24. return "1" if col else "0"
  25. return str(col)
  26. def compare(config_file: str = "database/CARLO.json"):
  27. cfg = DbCreateConfig.load_config(config_file)
  28. table_config = get_table_config(cfg)
  29. for dest_table in table_config:
  30. dest_row_count = {}
  31. dest_timestamp = {}
  32. if dest_table.dest not in cfg.dest_inspect.tables_list:
  33. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  34. continue
  35. query_count_dest = (
  36. f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  37. )
  38. q = cfg.dest_inspect.cursor.execute(query_count_dest)
  39. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  40. query_timestamp_dest = (
  41. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  42. )
  43. q = cfg.dest_inspect.cursor.execute(query_timestamp_dest)
  44. dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
  45. source_row_count = {}
  46. source_row_count_ts = {}
  47. for source_table in dest_table.source_tables:
  48. source_table2 = cfg.source_inspect.convert_table(source_table.table_name)
  49. client_db = source_table.client_db
  50. if (
  51. source_table.table_name in cfg.source_inspect.tables_list
  52. or source_table2 in cfg.source_inspect.tables_list
  53. ):
  54. query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
  55. q = cfg.source_inspect.cursor.execute(query_count_source)
  56. source_row_count[client_db] = q.fetchone()[0]
  57. query_ts = query_count_source
  58. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  59. if "WHERE" in query_ts:
  60. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  61. else:
  62. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  63. try:
  64. q = cfg.source_inspect.cursor.execute(query_ts)
  65. source_row_count_ts[client_db] = q.fetchone()[0]
  66. except ProgrammingError:
  67. pass
  68. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  69. print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
  70. print(f" Timestamp: {ts}")
  71. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  72. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  73. print(f" dest: {dest_row_count.get(client_db, 0):>8}")
  74. res = compare_details(source_table, dest_table, query_count_source, cfg)
  75. resolve_mismatch(source_table, dest_table, cfg, res)
  76. def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
  77. cols = dest_table.primary_key + ["timestamp"]
  78. if "Client_DB" in cols:
  79. cols.remove("Client_DB")
  80. if "CLIENT_DB" in cols:
  81. cols.remove("CLIENT_DB")
  82. query_cols = ", ".join([f"T1.[{c}]" for c in cols])
  83. query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
  84. query_source += f" ORDER BY {query_cols}"
  85. query_dest = (
  86. f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 "
  87. f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
  88. )
  89. source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
  90. source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
  91. # source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
  92. # source_data.to_csv(source_file, index=False)
  93. dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
  94. dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
  95. # dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
  96. # dest_data.to_csv(dest_file, index=False)
  97. cols_without_ts = cols[:-1]
  98. only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
  99. only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
  100. only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
  101. ts_list1 = "0x0000000000000000"
  102. if only_in_source.shape[0] > 0:
  103. ts_list1 = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
  104. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  105. query += f" AND T1.[timestamp] IN ({ts_list1})"
  106. with open(only_in_source_file, "w") as fwh:
  107. fwh.write(query)
  108. not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
  109. not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
  110. not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
  111. ts_list2 = "0x0000000000000000"
  112. if not_updated.shape[0] > 0:
  113. ts_list2 = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
  114. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  115. query += f" AND T1.[timestamp] IN ({ts_list2})"
  116. with open(not_updated_file, "w") as fwh:
  117. fwh.write(query)
  118. only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
  119. only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
  120. only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
  121. ts_list0 = ""
  122. if only_in_dest.shape[0] > 0:
  123. ts_list0 = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
  124. query = (
  125. f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
  126. f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list0})"
  127. )
  128. with open(only_in_dest_file, "w") as fwh:
  129. fwh.write(query)
  130. return [ts_list0, ts_list1 + ", " + ts_list2]
  131. def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbCreateConfig, ts_lists: list[str]):
  132. if len(ts_lists[0]) > 100_000 or len(ts_lists[1]) > 100_000:
  133. print(f"!! Tabellen-Import fuer {source_table.table_client} abgebrochen: zu viele Zeilen !!")
  134. print(" Veraltete Datensaetze in Ziel: " + str(ts_lists[0].count(",") + 1))
  135. print(" Fehlende Datensaetze in Ziel: " + str(ts_lists[1].count(",") + 1))
  136. return
  137. if ts_lists[0] != "":
  138. delete_query = (
  139. f"DELETE FROM {dest_table.full_table_name} "
  140. f"WHERE [Client_DB] = '{source_table.client_db}' AND [timestamp] IN ({ts_lists[0]})"
  141. )
  142. res = cfg.dest_inspect.cursor.execute(delete_query)
  143. print(res.rowcount)
  144. res.commit()
  145. res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}")
  146. res.commit()
  147. print(res.rowcount)
  148. query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
  149. res = cfg.source_inspect.cursor.execute(query)
  150. for i in itertools.count(start=1):
  151. insert_rows = []
  152. chunk = res.fetchmany(1_000)
  153. if not chunk:
  154. break
  155. for row in chunk:
  156. insert_rows.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
  157. print(len(insert_rows))
  158. if len(insert_rows) == 0:
  159. break
  160. insert_query = f"INSERT INTO {dest_table.temp_table_name} with (TABLOCK) VALUES \n" + ", \n".join(insert_rows)
  161. try:
  162. res2 = cfg.dest_inspect.cursor.execute(insert_query)
  163. print(res2.rowcount)
  164. res2.commit()
  165. except Error as e:
  166. with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert_{i}.sql", "w") as fwh:
  167. fwh.write(insert_query)
  168. print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
  169. print(e)
  170. res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
  171. print(res.rowcount)
  172. res.commit()
  173. res = cfg.dest_inspect.cursor.execute(dest_table.insert_query)
  174. print(res.rowcount)
  175. res.commit()
  176. if __name__ == "__main__":
  177. compare()