db_compare.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import codecs
  2. import re
  3. import sys
  4. from datetime import datetime
  5. import pandas as pd
  6. from pyodbc import ProgrammingError
  7. sys.path.insert(0, "C:\\Projekte\\tools")
  8. from database.db_create import get_table_config # noqa:E402
  9. from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402
  10. def decode_ts(ts_binary):
  11. return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
  12. def convert_column(col):
  13. if col is None:
  14. return "null"
  15. if isinstance(col, str):
  16. return f"'{col}'"
  17. if isinstance(col, bytes):
  18. return decode_ts(col)
  19. if isinstance(col, datetime):
  20. return f"'{col.isoformat()[:19]}'"
  21. if isinstance(col, bool):
  22. return "1" if col else "0"
  23. return str(col)
  24. def compare(config_file: str = "database/CARLO.json"):
  25. cfg = DbCreateConfig.load_config(config_file)
  26. table_config = get_table_config(cfg)
  27. for dest_table in table_config:
  28. dest_row_count = {}
  29. dest_timestamp = {}
  30. if dest_table.dest not in cfg.dest_inspect.tables_list:
  31. print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
  32. continue
  33. query_count_dest = (
  34. f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  35. )
  36. q = cfg.dest_inspect.cursor.execute(query_count_dest)
  37. dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
  38. query_timestamp_dest = (
  39. f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
  40. )
  41. q = cfg.dest_inspect.cursor.execute(query_timestamp_dest)
  42. dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
  43. source_row_count = {}
  44. source_row_count_ts = {}
  45. for source_table in dest_table.source_tables:
  46. source_table2 = cfg.source_inspect.convert_table(source_table.table_name)
  47. client_db = source_table.client_db
  48. if (
  49. source_table.table_name in cfg.source_inspect.tables_list
  50. or source_table2 in cfg.source_inspect.tables_list
  51. ):
  52. query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
  53. # print(query_count_source)
  54. q = cfg.source_inspect.cursor.execute(query_count_source)
  55. source_row_count[client_db] = q.fetchone()[0]
  56. query_ts = query_count_source
  57. ts = dest_timestamp.get(client_db, "0x0000000000000000")
  58. if "WHERE" in query_ts:
  59. query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
  60. else:
  61. query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
  62. # print(query_ts)
  63. try:
  64. q = cfg.source_inspect.cursor.execute(query_ts)
  65. source_row_count_ts[client_db] = q.fetchone()[0]
  66. except ProgrammingError:
  67. pass
  68. if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
  69. print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
  70. print(f" Timestamp: {ts}")
  71. print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
  72. print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
  73. print(f" dest: {dest_row_count.get(client_db, 0):>8}")
  74. res = compare_details(source_table, dest_table, query_count_source, cfg)
  75. resolve_mismatch(source_table, dest_table, cfg, res)
  76. def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
  77. cols = dest_table.primary_key + ["timestamp"]
  78. if "Client_DB" in cols:
  79. cols.remove("Client_DB")
  80. if "CLIENT_DB" in cols:
  81. cols.remove("CLIENT_DB")
  82. query_cols = ", ".join([f"T1.[{c}]" for c in cols])
  83. query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
  84. query_source += f" ORDER BY {query_cols}"
  85. query_dest = (
  86. f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 "
  87. f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
  88. )
  89. source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
  90. source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
  91. # source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
  92. # source_data.to_csv(source_file, index=False)
  93. dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
  94. dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
  95. # dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
  96. # dest_data.to_csv(dest_file, index=False)
  97. cols_without_ts = cols[:-1]
  98. only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
  99. only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
  100. only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
  101. ts_list1 = "0x0000000000000000"
  102. if only_in_source.shape[0] > 0:
  103. ts_list1 = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
  104. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  105. query += f" AND T1.[timestamp] IN ({ts_list1})"
  106. with open(only_in_source_file, "w") as fwh:
  107. fwh.write(query)
  108. not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
  109. not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
  110. not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
  111. ts_list2 = "0x0000000000000000"
  112. if not_updated.shape[0] > 0:
  113. ts_list2 = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
  114. query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
  115. query += f" AND T1.[timestamp] IN ({ts_list2})"
  116. with open(not_updated_file, "w") as fwh:
  117. fwh.write(query)
  118. only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
  119. only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
  120. only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
  121. ts_list0 = ""
  122. if only_in_dest.shape[0] > 0:
  123. ts_list0 = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
  124. query = (
  125. f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
  126. f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list0})"
  127. )
  128. with open(only_in_dest_file, "w") as fwh:
  129. fwh.write(query)
  130. return [ts_list0, ts_list1 + ", " + ts_list2]
  131. def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbCreateConfig, ts_lists: list[str]):
  132. if ts_lists[0] != "":
  133. delete_query = (
  134. f"DELETE FROM {dest_table.full_table_name} "
  135. f"WHERE [Client_DB] = '{source_table.client_db}' AND [timestamp] IN ({ts_lists[0]})"
  136. )
  137. res = cfg.dest_inspect.cursor.execute(delete_query)
  138. print(res.rowcount)
  139. res.commit()
  140. res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}")
  141. res.commit()
  142. print(res.rowcount)
  143. # meta = MetaData(schema="temp")
  144. # meta.reflect(bind=cfg.temp_db_sqlalchemy_engine)
  145. # dest_table_obj = meta.tables[dest_table.dest]
  146. query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
  147. # df = pd.read_sql(query, cfg.source_inspect.cursor.connection)
  148. # df["timestamp"] = df["timestamp"].apply(decode_ts)
  149. res = cfg.source_inspect.cursor.execute(query)
  150. chunk = []
  151. for row in res.fetchall():
  152. chunk.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
  153. print(len(chunk))
  154. insert_query = f"INSERT INTO {dest_table.temp_table_name} VALUES \n" + ", \n".join(chunk)
  155. try:
  156. res2 = cfg.dest_inspect.cursor.execute(insert_query)
  157. print(res2.rowcount)
  158. res2.commit()
  159. except Exception:
  160. with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert.sql", "w") as fwh:
  161. fwh.write(insert_query)
  162. print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
  163. # with cfg.temp_db_sqlalchemy_engine.connect() as dest_conn:
  164. # with cfg.source_inspect.sqlalchemy_engine.execution_options(stream_results=True).connect() as source_conn:
  165. # chunks = source_conn.execute(query)
  166. # while True:
  167. # chunk = chunks.fetchmany(10_000)
  168. # res = []
  169. # if not chunk:
  170. # break
  171. # res = [dict(row.items()) for row in chunk]
  172. # insert_query = dest_table_obj.insert().values(res)
  173. # res2 = cfg.dest_inspect.cursor.execute(insert_query)
  174. # print(res2.rowcount)
  175. res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
  176. print(res.rowcount)
  177. res.commit()
  178. res = cfg.dest_inspect.cursor.execute(dest_table.insert_query)
  179. print(res.rowcount)
  180. res.commit()
  181. if __name__ == "__main__":
  182. compare()