import codecs import itertools import re import sys from datetime import datetime import pandas as pd from pyodbc import Error, ProgrammingError sys.path.insert(0, "C:\\Projekte\\tools") from database.db_create import get_table_config # noqa:E402 from database.model import DbCreateConfig, DestTable, SourceTable2 # noqa:E402 def decode_ts(ts_binary): return "0x" + codecs.encode(ts_binary, "hex_codec").decode() def convert_column(col): if col is None: return "null" if isinstance(col, str): col = col.replace("'", "") return f"'{col}'" if isinstance(col, bytes): return decode_ts(col) if isinstance(col, datetime): return f"'{col.isoformat()[:19]}'" if isinstance(col, bool): return "1" if col else "0" return str(col) def compare(config_file: str = "database/CARLO.json"): cfg = DbCreateConfig.load_config(config_file) table_config = get_table_config(cfg) for dest_table in table_config: dest_row_count = {} dest_timestamp = {} if dest_table.dest not in cfg.dest_inspect.tables_list: print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!") continue query_count_dest = ( f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]" ) q = cfg.dest_inspect.cursor.execute(query_count_dest) dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()]) query_timestamp_dest = ( f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]" ) q = cfg.dest_inspect.cursor.execute(query_timestamp_dest) dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()]) source_row_count = {} source_row_count_ts = {} for source_table in dest_table.source_tables: source_table2 = cfg.source_inspect.convert_table(source_table.table_name) client_db = source_table.client_db if ( source_table.table_name in cfg.source_inspect.tables_list or source_table2 in cfg.source_inspect.tables_list ): query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]") q = cfg.source_inspect.cursor.execute(query_count_source) source_row_count[client_db] = q.fetchone()[0] query_ts = query_count_source ts = dest_timestamp.get(client_db, "0x0000000000000000") if "WHERE" in query_ts: query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND") else: query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)" try: q = cfg.source_inspect.cursor.execute(query_ts) source_row_count_ts[client_db] = q.fetchone()[0] except ProgrammingError: pass if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0): print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.") print(f" Timestamp: {ts}") print(f" Quelle: {source_row_count.get(client_db, 0):>8}") print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}") print(f" dest: {dest_row_count.get(client_db, 0):>8}") res = compare_details(source_table, dest_table, query_count_source, cfg) resolve_mismatch(source_table, dest_table, cfg, res) def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig): cols = dest_table.primary_key + ["timestamp"] if "Client_DB" in cols: cols.remove("Client_DB") if "CLIENT_DB" in cols: cols.remove("CLIENT_DB") query_cols = ", ".join([f"T1.[{c}]" for c in cols]) query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols) query_source += f" ORDER BY {query_cols}" query_dest = ( f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 " f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}" ) source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine) source_data["timestamp"] = source_data["timestamp"].apply(decode_ts) # source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv" # source_data.to_csv(source_file, index=False) dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine) dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts) # dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv" # dest_data.to_csv(dest_file, index=False) cols_without_ts = cols[:-1] only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts) only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])] only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql" ts_list1 = "0x0000000000000000" if only_in_source.shape[0] > 0: ts_list1 = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"]) query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*") query += f" AND T1.[timestamp] IN ({ts_list1})" with open(only_in_source_file, "w") as fwh: fwh.write(query) not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts) not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]] not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql" ts_list2 = "0x0000000000000000" if not_updated.shape[0] > 0: ts_list2 = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"]) query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*") query += f" AND T1.[timestamp] IN ({ts_list2})" with open(not_updated_file, "w") as fwh: fwh.write(query) only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts) only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])] only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql" ts_list0 = "" if only_in_dest.shape[0] > 0: ts_list0 = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"]) query = ( f"SELECT T1.* FROM {dest_table.full_table_name} T1 " f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list0})" ) with open(only_in_dest_file, "w") as fwh: fwh.write(query) return [ts_list0, ts_list1 + ", " + ts_list2] def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbCreateConfig, ts_lists: list[str]): if len(ts_lists[0]) > 100_000 or len(ts_lists[1]) > 100_000: print(f"!! Tabellen-Import fuer {source_table.table_client} abgebrochen: zu viele Zeilen !!") print(" Veraltete Datensaetze in Ziel: " + str(ts_lists[0].count(",") + 1)) print(" Fehlende Datensaetze in Ziel: " + str(ts_lists[1].count(",") + 1)) return if ts_lists[0] != "": delete_query = ( f"DELETE FROM {dest_table.full_table_name} " f"WHERE [Client_DB] = '{source_table.client_db}' AND [timestamp] IN ({ts_lists[0]})" ) res = cfg.dest_inspect.cursor.execute(delete_query) print(res.rowcount) res.commit() res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}") res.commit() print(res.rowcount) query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns) res = cfg.source_inspect.cursor.execute(query) for i in itertools.count(start=1): insert_rows = [] chunk = res.fetchmany(1_000) if not chunk: break for row in chunk: insert_rows.append("(" + ", ".join([convert_column(c) for c in row]) + ")") print(len(insert_rows)) if len(insert_rows) == 0: break insert_query = f"INSERT INTO {dest_table.temp_table_name} with (TABLOCK) VALUES \n" + ", \n".join(insert_rows) try: res2 = cfg.dest_inspect.cursor.execute(insert_query) print(res2.rowcount) res2.commit() except Error as e: with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert_{i}.sql", "w") as fwh: fwh.write(insert_query) print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen") print(e) res = cfg.dest_inspect.cursor.execute(dest_table.delete_query) print(res.rowcount) res.commit() res = cfg.dest_inspect.cursor.execute(dest_table.insert_query) print(res.rowcount) res.commit() if __name__ == "__main__": compare()