import codecs from pathlib import Path import pandas as pd from pyodbc import ProgrammingError from database.db_create import get_import_config from database.model import DatabaseInspect, DbCreateConfig def decode_ts(ts_binary): return "0x" + codecs.encode(ts_binary, "hex_codec").decode() def compare(config_file: str = "database/CARLO.json"): cfg = DbCreateConfig.load_config(config_file) base_dir = str(Path(config_file).parent.parent.resolve()) config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database) source_db = DatabaseInspect(cfg.source_dsn, source=True) source_tables = source_db.tables_list() dest_db = DatabaseInspect(cfg.dest_dsn) dest_tables = dest_db.tables_list() for _, current_table in config.iterrows(): dest_row_count = {} dest_timestamp = {} if current_table["dest"] in dest_tables: full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]' query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]" q = dest_db.cursor.execute(query_count_dest) dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()]) query_timestamp_dest = ( f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]" ) q = dest_db.cursor.execute(query_timestamp_dest) dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()]) source_row_count = {} source_row_count_ts = {} for client_db, prefix in cfg.clients.items(): source_table = current_table["source"].format(prefix) source_table2 = source_table.split(".")[-1][1:-1] if source_table in source_tables or source_table2 in source_tables: if not pd.isnull(current_table["query"]): select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1]) elif "." in source_table or cfg.source_dsn.schema == "": if source_table[0] != "[": source_table = f"[{source_table}]" select_query = f"SELECT T1.* FROM {source_table} T1 " else: select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 " if not pd.isnull(current_table["filter"]): select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1]) elif "WHERE" not in select_query: select_query += " WHERE 1 = 1" query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]") # print(query_count_source) q = source_db.cursor.execute(query_count_source) source_row_count[client_db] = q.fetchone()[0] query_ts = query_count_source ts = dest_timestamp.get(client_db, "0x0000000000000000") if "WHERE" in query_ts: query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND") else: query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)" # print(query_ts) try: q = source_db.cursor.execute(query_ts) source_row_count_ts[client_db] = q.fetchone()[0] except ProgrammingError: pass if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0): print(f"Tabelle {current_table['dest']} mit Client {client_db} stimmt nicht ueberein.") print(f" Quelle: {source_row_count.get(client_db, 0):>8}") print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}") print(f" dest: {dest_row_count.get(client_db, 0):>8}") compare_details(current_table, client_db, source_db, dest_db, query_count_source, full_table_name, cfg) def compare_details( current_table: pd.Series, client_db: str, source_db: DatabaseInspect, dest_db: DatabaseInspect, query_count_source: str, full_table_name: str, cfg: DbCreateConfig, ): table_client = f'{current_table["dest"]}_{client_db}' pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"]) cols = pkey + ["timestamp"] if "Client_DB" in cols: cols.remove("Client_DB") if "CLIENT_DB" in cols: cols.remove("CLIENT_DB") query_cols = ", ".join([f"T1.[{c}]" for c in cols]) query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols) query_source += f" ORDER BY {query_cols}" query_dest = ( f"SELECT {query_cols} FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' ORDER BY {query_cols}" ) source_file = f"{cfg.stage_dir}\\source\\{table_client}.csv" source_data = pd.read_sql(query_source, source_db.sqlalchemy_engine) source_data["timestamp"] = source_data["timestamp"].apply(decode_ts) source_data.to_csv(source_file, index=False) dest_file = f"{cfg.stage_dir}\\dest\\{table_client}.csv" dest_data = pd.read_sql(query_dest, dest_db.sqlalchemy_engine) dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts) dest_data.to_csv(dest_file, index=False) cols_without_ts = cols[:-1] only_in_source_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_source.sql" only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts) only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])] if only_in_source.shape[0] > 0: ts_list = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"]) query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*") query += f" AND T1.[timestamp] IN ({ts_list})" with open(only_in_source_file, "w") as fwh: fwh.write(query) only_in_dest_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_dest.sql" only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts) only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])] if only_in_dest.shape[0] > 0: ts_list = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"]) query = f"SELECT T1.* FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' AND T1.[timestamp] IN ({ts_list})" with open(only_in_dest_file, "w") as fwh: fwh.write(query) not_updated_file = f"{cfg.stage_dir}\\diff\\{table_client}_not_updated.sql" not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts) not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]] if not_updated.shape[0] > 0: ts_list = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"]) query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*") query += f" AND T1.[timestamp] IN ({ts_list})" with open(not_updated_file, "w") as fwh: fwh.write(query)