import codecs import json from pathlib import Path import pandas as pd import pyodbc from database.db_create import get_import_config from database.model import DatabaseInspect, create_db_ini, load_config def compare(config_file: str = "database/CARLO.json"): cfg = load_config(config_file) create_db_ini(cfg) base_dir = str(Path(config_file).parent.parent.resolve()) config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database) source_db = DatabaseInspect(cfg.source_dsn, source=True) source_tables = source_db.get_tables() print(json.dumps(source_db.get_prefix(), indent=2)) dest_db = DatabaseInspect(cfg.dest_dsn) dest_tables = dest_db.get_tables() for _, current_table in config.iterrows(): dest_row_count = {} dest_timestamp = {} if current_table["dest"] in dest_tables: full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]' # pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"]) # cols = pkey + ["timestamp"] query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]" q = dest_db.cursor.execute(query_count_dest) dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()]) query_timestamp_dest = ( f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]" ) q = dest_db.cursor.execute(query_timestamp_dest) dest_timestamp = dict( [(col[0], "0x" + codecs.encode(col[1], "hex_codec").decode()) for col in q.fetchall()] ) source_row_count = {} source_row_count_ts = {} for client_db, prefix in cfg.clients.items(): # table_client = f'{current_table["dest"]}_{client_db}' source_table = current_table["source"].format(prefix) source_table2 = source_table.split(".")[-1][1:-1] if source_table in source_tables or source_table2 in source_tables: if not pd.isnull(current_table["query"]): select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1]) elif "." in source_table or cfg.source_dsn.schema == "": if source_table[0] != "[": source_table = f"[{source_table}]" select_query = f"SELECT T1.* FROM {source_table} T1 " else: select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 " if not pd.isnull(current_table["filter"]): select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1]) query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]") # print(query_count_source) q = source_db.cursor.execute(query_count_source) source_row_count[client_db] = q.fetchone()[0] query_ts = query_count_source ts = dest_timestamp.get(client_db, "0x0000000000000000") if "WHERE" in query_ts: query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND") else: query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)" # print(query_ts) try: q = source_db.cursor.execute(query_ts) source_row_count_ts[client_db] = q.fetchone()[0] except pyodbc.ProgrammingError: pass if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0): print(f"Tabelle {current_table['dest']} mit Client {client_db} stimmt nicht ueberein.") print(f" Quelle: {source_row_count.get(client_db, 0):>8}") print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}") print(f" dest: {dest_row_count.get(client_db, 0):>8}")