db_compare.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import codecs
  2. import itertools
  3. import re
  4. import sys
  5. from datetime import datetime
  6. import pandas as pd
  7. from pyodbc import Error, ProgrammingError
  8. sys.path.insert(0, "C:\\Projekte\\tools")
  9. from database.db_create import get_table_config # noqa:E402
  10. from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402
  11. def decode_ts(ts_binary):
  12. return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
  13. def convert_column(col):
  14. if col is None:
  15. return "null"
  16. if isinstance(col, str):
  17. return f"'{col}'"
  18. if isinstance(col, bytes):
  19. return decode_ts(col)
  20. if isinstance(col, datetime):
  21. return f"'{col.isoformat()[:19]}'"
  22. if isinstance(col, bool):
  23. return "1" if col else "0"
  24. return str(col)
  25. def compare(config_file: str = "database/CARLO.json"):
  26. cfg = DbCreateConfig.load_config(config_file)
  27. table_config = get_table_config(cfg)
  28. for dest_table in table_config:
  29. dest_row_count = {}
  30. dest_timestamp = {}
  31. if dest_table.dest not in cfg.dest_inspect.tables_list:
  32. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  33. continue
  34. query_count_dest = (
  35. f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  36. )
  37. q = cfg.dest_inspect.cursor.execute(query_count_dest)
  38. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  39. query_timestamp_dest = (
  40. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  41. )
  42. q = cfg.dest_inspect.cursor.execute(query_timestamp_dest)
  43. dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
  44. source_row_count = {}
  45. source_row_count_ts = {}
  46. for source_table in dest_table.source_tables:
  47. source_table2 = cfg.source_inspect.convert_table(source_table.table_name)
  48. client_db = source_table.client_db
  49. if (
  50. source_table.table_name in cfg.source_inspect.tables_list
  51. or source_table2 in cfg.source_inspect.tables_list
  52. ):
  53. query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
  54. q = cfg.source_inspect.cursor.execute(query_count_source)
  55. source_row_count[client_db] = q.fetchone()[0]
  56. query_ts = query_count_source
  57. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  58. if "WHERE" in query_ts:
  59. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  60. else:
  61. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  62. try:
  63. q = cfg.source_inspect.cursor.execute(query_ts)
  64. source_row_count_ts[client_db] = q.fetchone()[0]
  65. except ProgrammingError:
  66. pass
  67. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  68. print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
  69. print(f" Timestamp: {ts}")
  70. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  71. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  72. print(f" dest: {dest_row_count.get(client_db, 0):>8}")
  73. res = compare_details(source_table, dest_table, query_count_source, cfg)
  74. resolve_mismatch(source_table, dest_table, cfg, res)
  75. def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
  76. cols = dest_table.primary_key + ["timestamp"]
  77. if "Client_DB" in cols:
  78. cols.remove("Client_DB")
  79. if "CLIENT_DB" in cols:
  80. cols.remove("CLIENT_DB")
  81. query_cols = ", ".join([f"T1.[{c}]" for c in cols])
  82. query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
  83. query_source += f" ORDER BY {query_cols}"
  84. query_dest = (
  85. f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 "
  86. f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
  87. )
  88. source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
  89. source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
  90. # source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
  91. # source_data.to_csv(source_file, index=False)
  92. dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
  93. dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
  94. # dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
  95. # dest_data.to_csv(dest_file, index=False)
  96. cols_without_ts = cols[:-1]
  97. only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
  98. only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
  99. only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
  100. ts_list1 = "0x0000000000000000"
  101. if only_in_source.shape[0] > 0:
  102. ts_list1 = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
  103. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  104. query += f" AND T1.[timestamp] IN ({ts_list1})"
  105. with open(only_in_source_file, "w") as fwh:
  106. fwh.write(query)
  107. not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
  108. not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
  109. not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
  110. ts_list2 = "0x0000000000000000"
  111. if not_updated.shape[0] > 0:
  112. ts_list2 = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
  113. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  114. query += f" AND T1.[timestamp] IN ({ts_list2})"
  115. with open(not_updated_file, "w") as fwh:
  116. fwh.write(query)
  117. only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
  118. only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
  119. only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
  120. ts_list0 = ""
  121. if only_in_dest.shape[0] > 0:
  122. ts_list0 = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
  123. query = (
  124. f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
  125. f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list0})"
  126. )
  127. with open(only_in_dest_file, "w") as fwh:
  128. fwh.write(query)
  129. return [ts_list0, ts_list1 + ", " + ts_list2]
  130. def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbCreateConfig, ts_lists: list[str]):
  131. if ts_lists[0] != "":
  132. delete_query = (
  133. f"DELETE FROM {dest_table.full_table_name} "
  134. f"WHERE [Client_DB] = '{source_table.client_db}' AND [timestamp] IN ({ts_lists[0]})"
  135. )
  136. res = cfg.dest_inspect.cursor.execute(delete_query)
  137. print(res.rowcount)
  138. res.commit()
  139. if len(ts_lists[1]) > 100_000:
  140. print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen: zu viele Zeilen!")
  141. return
  142. res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}")
  143. res.commit()
  144. print(res.rowcount)
  145. query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
  146. res = cfg.source_inspect.cursor.execute(query)
  147. for i in itertools.count(start=1):
  148. insert_rows = []
  149. chunk = res.fetchmany(1_000)
  150. if not chunk:
  151. break
  152. for row in chunk:
  153. insert_rows.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
  154. print(len(insert_rows))
  155. if len(insert_rows) == 0:
  156. break
  157. insert_query = f"INSERT INTO {dest_table.temp_table_name} with (TABLOCK) VALUES \n" + ", \n".join(insert_rows)
  158. try:
  159. res2 = cfg.dest_inspect.cursor.execute(insert_query)
  160. print(res2.rowcount)
  161. res2.commit()
  162. except Error as e:
  163. with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert_{i}.sql", "w") as fwh:
  164. fwh.write(insert_query)
  165. print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
  166. print(e)
  167. res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
  168. print(res.rowcount)
  169. res.commit()
  170. res = cfg.dest_inspect.cursor.execute(dest_table.insert_query)
  171. print(res.rowcount)
  172. res.commit()
  173. if __name__ == "__main__":
  174. compare()