|
@@ -1,11 +1,12 @@
|
|
|
import codecs
|
|
|
-from pathlib import Path
|
|
|
+import sys
|
|
|
|
|
|
import pandas as pd
|
|
|
from pyodbc import ProgrammingError
|
|
|
|
|
|
-from database.db_create import get_import_config
|
|
|
-from database.model import DatabaseInspect, DbCreateConfig
|
|
|
+sys.path.insert(0, "C:\\Projekte\\tools")
|
|
|
+from database.db_create import get_table_config
|
|
|
+from database.model import DbCreateConfig, DestTable, SourceTable2
|
|
|
|
|
|
|
|
|
def decode_ts(ts_binary):
|
|
@@ -14,55 +15,42 @@ def decode_ts(ts_binary):
|
|
|
|
|
|
def compare(config_file: str = "database/CARLO.json"):
|
|
|
cfg = DbCreateConfig.load_config(config_file)
|
|
|
- base_dir = str(Path(config_file).parent.parent.resolve())
|
|
|
- config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
|
|
|
+ table_config = get_table_config(cfg)
|
|
|
|
|
|
- source_db = DatabaseInspect(cfg.source_dsn, source=True)
|
|
|
- source_tables = source_db.tables_list()
|
|
|
-
|
|
|
- dest_db = DatabaseInspect(cfg.dest_dsn)
|
|
|
- dest_tables = dest_db.tables_list()
|
|
|
-
|
|
|
- for _, current_table in config.iterrows():
|
|
|
+ for dest_table in table_config:
|
|
|
dest_row_count = {}
|
|
|
dest_timestamp = {}
|
|
|
- if current_table["dest"] in dest_tables:
|
|
|
- full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
|
|
|
- query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]"
|
|
|
- q = dest_db.cursor.execute(query_count_dest)
|
|
|
- dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
|
|
|
-
|
|
|
- query_timestamp_dest = (
|
|
|
- f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]"
|
|
|
- )
|
|
|
- q = dest_db.cursor.execute(query_timestamp_dest)
|
|
|
- dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
|
|
|
+ if dest_table.dest not in cfg.dest_inspect.tables_list:
|
|
|
+ print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
|
|
|
+ continue
|
|
|
+
|
|
|
+ query_count_dest = (
|
|
|
+ f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
|
|
|
+ )
|
|
|
+ q = cfg.dest_inspect.cursor.execute(query_count_dest)
|
|
|
+ dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
|
|
|
+
|
|
|
+ query_timestamp_dest = (
|
|
|
+ f"SELECT [Client_DB], max(timestamp) as [TS] FROM {dest_table.full_table_name} GROUP BY [Client_DB]"
|
|
|
+ )
|
|
|
+ q = cfg.dest_inspect.cursor.execute(query_timestamp_dest)
|
|
|
+ dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
|
|
|
|
|
|
source_row_count = {}
|
|
|
source_row_count_ts = {}
|
|
|
|
|
|
- for client_db, prefix in cfg.clients.items():
|
|
|
- source_table = current_table["source"].format(prefix)
|
|
|
- source_table2 = source_table.split(".")[-1][1:-1]
|
|
|
-
|
|
|
- if source_table in source_tables or source_table2 in source_tables:
|
|
|
- if not pd.isnull(current_table["query"]):
|
|
|
- select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
|
|
|
- elif "." in source_table or cfg.source_dsn.schema == "":
|
|
|
- if source_table[0] != "[":
|
|
|
- source_table = f"[{source_table}]"
|
|
|
- select_query = f"SELECT T1.* FROM {source_table} T1 "
|
|
|
- else:
|
|
|
- select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
|
|
|
+ for source_table in dest_table.source_tables:
|
|
|
+ source_table2 = cfg.source_inspect.convert_table(source_table.table_name)
|
|
|
+ client_db = source_table.client_db
|
|
|
|
|
|
- if not pd.isnull(current_table["filter"]):
|
|
|
- select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
|
|
|
- elif "WHERE" not in select_query:
|
|
|
- select_query += " WHERE 1 = 1"
|
|
|
- query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]")
|
|
|
+ if (
|
|
|
+ source_table.table_name in cfg.source_inspect.tables_list
|
|
|
+ or source_table2 in cfg.source_inspect.tables_list
|
|
|
+ ):
|
|
|
+ query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
|
|
|
|
|
|
|
|
|
- q = source_db.cursor.execute(query_count_source)
|
|
|
+ q = cfg.source_inspect.cursor.execute(query_count_source)
|
|
|
source_row_count[client_db] = q.fetchone()[0]
|
|
|
|
|
|
query_ts = query_count_source
|
|
@@ -73,31 +61,21 @@ def compare(config_file: str = "database/CARLO.json"):
|
|
|
query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
|
|
|
|
|
|
try:
|
|
|
- q = source_db.cursor.execute(query_ts)
|
|
|
+ q = cfg.source_inspect.cursor.execute(query_ts)
|
|
|
source_row_count_ts[client_db] = q.fetchone()[0]
|
|
|
except ProgrammingError:
|
|
|
pass
|
|
|
|
|
|
if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
|
|
|
- print(f"Tabelle {current_table['dest']} mit Client {client_db} stimmt nicht ueberein.")
|
|
|
+ print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
|
|
|
print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
|
|
|
print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
|
|
|
print(f" dest: {dest_row_count.get(client_db, 0):>8}")
|
|
|
- compare_details(current_table, client_db, source_db, dest_db, query_count_source, full_table_name, cfg)
|
|
|
-
|
|
|
-
|
|
|
-def compare_details(
|
|
|
- current_table: pd.Series,
|
|
|
- client_db: str,
|
|
|
- source_db: DatabaseInspect,
|
|
|
- dest_db: DatabaseInspect,
|
|
|
- query_count_source: str,
|
|
|
- full_table_name: str,
|
|
|
- cfg: DbCreateConfig,
|
|
|
-):
|
|
|
- table_client = f'{current_table["dest"]}_{client_db}'
|
|
|
- pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
|
|
|
- cols = pkey + ["timestamp"]
|
|
|
+ compare_details(source_table, dest_table, query_count_source, cfg)
|
|
|
+
|
|
|
+
|
|
|
+def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
|
|
|
+ cols = dest_table.primary_key + ["timestamp"]
|
|
|
if "Client_DB" in cols:
|
|
|
cols.remove("Client_DB")
|
|
|
if "CLIENT_DB" in cols:
|
|
@@ -107,22 +85,23 @@ def compare_details(
|
|
|
query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
|
|
|
query_source += f" ORDER BY {query_cols}"
|
|
|
query_dest = (
|
|
|
- f"SELECT {query_cols} FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' ORDER BY {query_cols}"
|
|
|
+ f"SELECT {query_cols} FROM {dest_table.full_table_name} T1 "
|
|
|
+ f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
|
|
|
)
|
|
|
|
|
|
- source_file = f"{cfg.stage_dir}\\source\\{table_client}.csv"
|
|
|
- source_data = pd.read_sql(query_source, source_db.sqlalchemy_engine)
|
|
|
+ source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
|
|
|
+ source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
|
|
|
source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
|
|
|
source_data.to_csv(source_file, index=False)
|
|
|
|
|
|
- dest_file = f"{cfg.stage_dir}\\dest\\{table_client}.csv"
|
|
|
- dest_data = pd.read_sql(query_dest, dest_db.sqlalchemy_engine)
|
|
|
+ dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
|
|
|
+ dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
|
|
|
dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
|
|
|
dest_data.to_csv(dest_file, index=False)
|
|
|
|
|
|
cols_without_ts = cols[:-1]
|
|
|
|
|
|
- only_in_source_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_source.sql"
|
|
|
+ only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
|
|
|
only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
|
|
|
only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
|
|
|
if only_in_source.shape[0] > 0:
|
|
@@ -132,16 +111,19 @@ def compare_details(
|
|
|
with open(only_in_source_file, "w") as fwh:
|
|
|
fwh.write(query)
|
|
|
|
|
|
- only_in_dest_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_dest.sql"
|
|
|
+ only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
|
|
|
only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
|
|
|
only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
|
|
|
if only_in_dest.shape[0] > 0:
|
|
|
ts_list = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
|
|
|
- query = f"SELECT T1.* FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' AND T1.[timestamp] IN ({ts_list})"
|
|
|
+ query = (
|
|
|
+ f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
|
|
|
+ f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list})"
|
|
|
+ )
|
|
|
with open(only_in_dest_file, "w") as fwh:
|
|
|
fwh.write(query)
|
|
|
|
|
|
- not_updated_file = f"{cfg.stage_dir}\\diff\\{table_client}_not_updated.sql"
|
|
|
+ not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
|
|
|
not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
|
|
|
not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
|
|
|
if not_updated.shape[0] > 0:
|
|
@@ -150,3 +132,7 @@ def compare_details(
|
|
|
query += f" AND T1.[timestamp] IN ({ts_list})"
|
|
|
with open(not_updated_file, "w") as fwh:
|
|
|
fwh.write(query)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ compare()
|