12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import json
- from pathlib import Path
- import pandas as pd
- from database.db_create import get_import_config
- from database.model import DatabaseInspect, create_db_ini, load_config
- def compare(config_file: str = "database/CARLO.json"):
- cfg = load_config(config_file)
- create_db_ini(cfg)
- base_dir = str(Path(config_file).parent.parent.resolve())
- config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
- source_db = DatabaseInspect(cfg.source_dsn, source=True)
- source_tables = source_db.get_tables()
- print(json.dumps(source_db.get_prefix(), indent=2))
- dest_db = DatabaseInspect(cfg.dest_dsn)
- dest_tables = dest_db.get_tables()
- for _, current_table in config.iterrows():
- dest_row_count = {}
- dest_timestamp = {}
- if current_table["dest"] in dest_tables:
- full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
- # pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
- # cols = pkey + ["timestamp"]
- query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]"
- q = dest_db.cursor.execute(query_count_dest)
- dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
- query_timestamp_dest = (
- f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]"
- )
- q = dest_db.cursor.execute(query_timestamp_dest)
- dest_timestamp = dict([(col[0], col[1]) for col in q.fetchall()])
- source_row_count = {}
- source_row_count_ts = {}
- for client_db, prefix in cfg.clients.items():
- # table_client = f'{current_table["dest"]}_{client_db}'
- source_table = current_table["source"].format(prefix)
- if source_table in source_tables:
- if not pd.isnull(current_table["query"]):
- select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
- elif "." in source_table or cfg.source_dsn.schema == "":
- if source_table[0] != "[":
- source_table = f"[{source_table}]"
- select_query = f"SELECT T1.* FROM {source_table} T1 "
- else:
- select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
- if not pd.isnull(current_table["filter"]):
- select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
- query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]")
- q = source_db.cursor.execute(query_count_source)
- source_row_count[client_db] = q.fetchone()[0]
- query_ts = query_count_source
- ts = dest_timestamp.get(client_db, "0x0000000000000000")
- if "WHERE" in query_ts:
- query_ts = query_ts.replace("WHERE", "WHERE [timestamp] <= convert(binary(8), '{ts}', 1) AND")
- else:
- query_ts += f" WHERE [timestamp] <= convert(binary(8), '{ts}', 1)"
- q = source_db.cursor.execute(query_count_source)
- source_row_count_ts[client_db] = q.fetchone()[0]
- if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
- print(f"Tabelle {current_table['dest']} mit Client {client_db} stimmt nicht ueberein.")
- print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
- print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
- print(f" dest: {dest_row_count.get(client_db, 0):>8}")
|