|
@@ -1,5 +1,7 @@
|
|
|
import codecs
|
|
|
+import re
|
|
|
import sys
|
|
|
+from datetime import datetime
|
|
|
|
|
|
import pandas as pd
|
|
|
from pyodbc import ProgrammingError
|
|
@@ -13,6 +15,20 @@ def decode_ts(ts_binary):
|
|
|
return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
|
|
|
|
|
|
|
|
|
+def convert_column(col):
|
|
|
+ if col is None:
|
|
|
+ return "null"
|
|
|
+ if isinstance(col, str):
|
|
|
+ return f"'{col}'"
|
|
|
+ if isinstance(col, bytes):
|
|
|
+ return decode_ts(col)
|
|
|
+ if isinstance(col, datetime):
|
|
|
+ return f"'{col.isoformat()[:19]}'"
|
|
|
+ if isinstance(col, bool):
|
|
|
+ return "1" if col else "0"
|
|
|
+ return str(col)
|
|
|
+
|
|
|
+
|
|
|
def compare(config_file: str = "database/CARLO.json"):
|
|
|
cfg = DbCreateConfig.load_config(config_file)
|
|
|
table_config = get_table_config(cfg)
|
|
@@ -68,10 +84,12 @@ def compare(config_file: str = "database/CARLO.json"):
|
|
|
|
|
|
if dest_row_count.get(client_db, 0) != source_row_count.get(client_db, 0):
|
|
|
print(f"Tabelle {dest_table.dest} mit Client {client_db} stimmt nicht ueberein.")
|
|
|
+ print(f" Timestamp: {ts}")
|
|
|
print(f" Quelle: {source_row_count.get(client_db, 0):>8}")
|
|
|
print(f" Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
|
|
|
print(f" dest: {dest_row_count.get(client_db, 0):>8}")
|
|
|
- compare_details(source_table, dest_table, query_count_source, cfg)
|
|
|
+ res = compare_details(source_table, dest_table, query_count_source, cfg)
|
|
|
+ resolve_mismatch(source_table, dest_table, cfg, res)
|
|
|
|
|
|
|
|
|
def compare_details(source_table: SourceTable2, dest_table: DestTable, query_count_source: str, cfg: DbCreateConfig):
|
|
@@ -89,49 +107,111 @@ def compare_details(source_table: SourceTable2, dest_table: DestTable, query_cou
|
|
|
f"WHERE T1.[Client_DB] = '{source_table.client_db}' ORDER BY {query_cols}"
|
|
|
)
|
|
|
|
|
|
- source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
|
|
|
source_data = pd.read_sql(query_source, cfg.source_inspect.sqlalchemy_engine)
|
|
|
source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
|
|
|
- source_data.to_csv(source_file, index=False)
|
|
|
+ # source_file = f"{cfg.stage_dir}\\source\\{source_table.table_client}.csv"
|
|
|
+ # source_data.to_csv(source_file, index=False)
|
|
|
|
|
|
- dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
|
|
|
dest_data = pd.read_sql(query_dest, cfg.dest_inspect.sqlalchemy_engine)
|
|
|
dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
|
|
|
- dest_data.to_csv(dest_file, index=False)
|
|
|
+ # dest_file = f"{cfg.stage_dir}\\dest\\{source_table.table_client}.csv"
|
|
|
+ # dest_data.to_csv(dest_file, index=False)
|
|
|
|
|
|
cols_without_ts = cols[:-1]
|
|
|
|
|
|
- only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
|
|
|
only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
|
|
|
only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
|
|
|
+ only_in_source_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_source.sql"
|
|
|
+ ts_list1 = "0x0000000000000000"
|
|
|
if only_in_source.shape[0] > 0:
|
|
|
- ts_list = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
|
|
|
+ ts_list1 = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
|
|
|
query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
|
|
|
- query += f" AND T1.[timestamp] IN ({ts_list})"
|
|
|
+ query += f" AND T1.[timestamp] IN ({ts_list1})"
|
|
|
with open(only_in_source_file, "w") as fwh:
|
|
|
fwh.write(query)
|
|
|
|
|
|
- only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
|
|
|
+ not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
|
|
|
+ not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
|
|
|
+ not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
|
|
|
+ ts_list2 = "0x0000000000000000"
|
|
|
+ if not_updated.shape[0] > 0:
|
|
|
+ ts_list2 = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
|
|
|
+ query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
|
|
|
+ query += f" AND T1.[timestamp] IN ({ts_list2})"
|
|
|
+ with open(not_updated_file, "w") as fwh:
|
|
|
+ fwh.write(query)
|
|
|
+
|
|
|
only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
|
|
|
only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
|
|
|
+ only_in_dest_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_only_in_dest.sql"
|
|
|
+ ts_list0 = ""
|
|
|
if only_in_dest.shape[0] > 0:
|
|
|
- ts_list = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
|
|
|
+ ts_list0 = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
|
|
|
query = (
|
|
|
f"SELECT T1.* FROM {dest_table.full_table_name} T1 "
|
|
|
- f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list})"
|
|
|
+ f"WHERE T1.[Client_DB] = '{source_table.client_db}' AND T1.[timestamp] IN ({ts_list0})"
|
|
|
)
|
|
|
with open(only_in_dest_file, "w") as fwh:
|
|
|
fwh.write(query)
|
|
|
+ return [ts_list0, ts_list1 + ", " + ts_list2]
|
|
|
|
|
|
- not_updated_file = f"{cfg.stage_dir}\\diff\\{source_table.table_client}_not_updated.sql"
|
|
|
- not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
|
|
|
- not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
|
|
|
- if not_updated.shape[0] > 0:
|
|
|
- ts_list = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
|
|
|
- query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
|
|
|
- query += f" AND T1.[timestamp] IN ({ts_list})"
|
|
|
- with open(not_updated_file, "w") as fwh:
|
|
|
- fwh.write(query)
|
|
|
+
|
|
|
+def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbCreateConfig, ts_lists: list[str]):
|
|
|
+ if ts_lists[0] != "":
|
|
|
+ delete_query = (
|
|
|
+ f"DELETE FROM {dest_table.full_table_name} "
|
|
|
+ f"WHERE [Client_DB] = '{source_table.client_db}' AND [timestamp] IN ({ts_lists[0]})"
|
|
|
+ )
|
|
|
+ res = cfg.dest_inspect.cursor.execute(delete_query)
|
|
|
+ print(res.rowcount)
|
|
|
+ res.commit()
|
|
|
+
|
|
|
+ res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}")
|
|
|
+ res.commit()
|
|
|
+ print(res.rowcount)
|
|
|
+ # meta = MetaData(schema="temp")
|
|
|
+ # meta.reflect(bind=cfg.temp_db_sqlalchemy_engine)
|
|
|
+ # dest_table_obj = meta.tables[dest_table.dest]
|
|
|
+
|
|
|
+ query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
|
|
|
+ # df = pd.read_sql(query, cfg.source_inspect.cursor.connection)
|
|
|
+ # df["timestamp"] = df["timestamp"].apply(decode_ts)
|
|
|
+ res = cfg.source_inspect.cursor.execute(query)
|
|
|
+
|
|
|
+ chunk = []
|
|
|
+ for row in res.fetchall():
|
|
|
+ chunk.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
|
|
|
+ print(len(chunk))
|
|
|
+ insert_query = f"INSERT INTO {dest_table.temp_table_name} VALUES \n" + ", \n".join(chunk)
|
|
|
+
|
|
|
+ try:
|
|
|
+ res2 = cfg.dest_inspect.cursor.execute(insert_query)
|
|
|
+ print(res2.rowcount)
|
|
|
+ res2.commit()
|
|
|
+ except Exception:
|
|
|
+ with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert.sql", "w") as fwh:
|
|
|
+ fwh.write(insert_query)
|
|
|
+ print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
|
|
|
+
|
|
|
+ # with cfg.temp_db_sqlalchemy_engine.connect() as dest_conn:
|
|
|
+ # with cfg.source_inspect.sqlalchemy_engine.execution_options(stream_results=True).connect() as source_conn:
|
|
|
+ # chunks = source_conn.execute(query)
|
|
|
+ # while True:
|
|
|
+ # chunk = chunks.fetchmany(10_000)
|
|
|
+ # res = []
|
|
|
+ # if not chunk:
|
|
|
+ # break
|
|
|
+ # res = [dict(row.items()) for row in chunk]
|
|
|
+ # insert_query = dest_table_obj.insert().values(res)
|
|
|
+ # res2 = cfg.dest_inspect.cursor.execute(insert_query)
|
|
|
+ # print(res2.rowcount)
|
|
|
+
|
|
|
+ res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
|
|
|
+ print(res.rowcount)
|
|
|
+ res.commit()
|
|
|
+ res = cfg.dest_inspect.cursor.execute(dest_table.insert_query)
|
|
|
+ print(res.rowcount)
|
|
|
+ res.commit()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|