|
@@ -64,7 +64,6 @@ def compare(config_file: str = "database/CARLO.json"):
|
|
|
or source_table2 in cfg.source_inspect.tables_list
|
|
|
):
|
|
|
query_count_source = source_table.select_query.replace("T1.*", "COUNT(*) as [Rows]")
|
|
|
- # print(query_count_source)
|
|
|
|
|
|
q = cfg.source_inspect.cursor.execute(query_count_source)
|
|
|
source_row_count[client_db] = q.fetchone()[0]
|
|
@@ -75,7 +74,7 @@ def compare(config_file: str = "database/CARLO.json"):
|
|
|
query_ts = query_ts.replace("WHERE", f"WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1) AND")
|
|
|
else:
|
|
|
query_ts += f" WHERE T1.[timestamp] <= convert(binary(8), '{ts}', 1)"
|
|
|
- # print(query_ts)
|
|
|
+
|
|
|
try:
|
|
|
q = cfg.source_inspect.cursor.execute(query_ts)
|
|
|
source_row_count_ts[client_db] = q.fetchone()[0]
|
|
@@ -169,13 +168,8 @@ def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbC
|
|
|
res = cfg.dest_inspect.cursor.execute(f"TRUNCATE TABLE {dest_table.temp_table_name}")
|
|
|
res.commit()
|
|
|
print(res.rowcount)
|
|
|
- # meta = MetaData(schema="temp")
|
|
|
- # meta.reflect(bind=cfg.temp_db_sqlalchemy_engine)
|
|
|
- # dest_table_obj = meta.tables[dest_table.dest]
|
|
|
|
|
|
query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
|
|
|
- # df = pd.read_sql(query, cfg.source_inspect.cursor.connection)
|
|
|
- # df["timestamp"] = df["timestamp"].apply(decode_ts)
|
|
|
res = cfg.source_inspect.cursor.execute(query)
|
|
|
|
|
|
chunk = []
|
|
@@ -193,19 +187,6 @@ def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbC
|
|
|
fwh.write(insert_query)
|
|
|
print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
|
|
|
|
|
|
- # with cfg.temp_db_sqlalchemy_engine.connect() as dest_conn:
|
|
|
- # with cfg.source_inspect.sqlalchemy_engine.execution_options(stream_results=True).connect() as source_conn:
|
|
|
- # chunks = source_conn.execute(query)
|
|
|
- # while True:
|
|
|
- # chunk = chunks.fetchmany(10_000)
|
|
|
- # res = []
|
|
|
- # if not chunk:
|
|
|
- # break
|
|
|
- # res = [dict(row.items()) for row in chunk]
|
|
|
- # insert_query = dest_table_obj.insert().values(res)
|
|
|
- # res2 = cfg.dest_inspect.cursor.execute(insert_query)
|
|
|
- # print(res2.rowcount)
|
|
|
-
|
|
|
res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
|
|
|
print(res.rowcount)
|
|
|
res.commit()
|