|
@@ -1,4 +1,5 @@
|
|
|
import codecs
|
|
|
+import itertools
|
|
|
import re
|
|
|
import sys
|
|
|
from datetime import datetime
|
|
@@ -172,20 +173,26 @@ def resolve_mismatch(source_table: SourceTable2, dest_table: DestTable, cfg: DbC
|
|
|
query = re.sub(r"WHERE(.*)", f"WHERE T1.[timestamp] IN ({ts_lists[1]})", source_table.select_query_with_columns)
|
|
|
res = cfg.source_inspect.cursor.execute(query)
|
|
|
|
|
|
- chunk = []
|
|
|
- for row in res.fetchall():
|
|
|
- chunk.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
|
|
|
- print(len(chunk))
|
|
|
- insert_query = f"INSERT INTO {dest_table.temp_table_name} VALUES \n" + ", \n".join(chunk)
|
|
|
-
|
|
|
- try:
|
|
|
- res2 = cfg.dest_inspect.cursor.execute(insert_query)
|
|
|
- print(res2.rowcount)
|
|
|
- res2.commit()
|
|
|
- except Exception:
|
|
|
- with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert.sql", "w") as fwh:
|
|
|
- fwh.write(insert_query)
|
|
|
- print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
|
|
|
+ for i in itertools.count(start=1):
|
|
|
+ insert_rows = []
|
|
|
+ chunk = res.fetchmany(1_000)
|
|
|
+ if not chunk:
|
|
|
+ break
|
|
|
+ for row in chunk:
|
|
|
+ insert_rows.append("(" + ", ".join([convert_column(c) for c in row]) + ")")
|
|
|
+ print(len(insert_rows))
|
|
|
+ if len(insert_rows) == 0:
|
|
|
+ break
|
|
|
+ insert_query = f"INSERT INTO {dest_table.temp_table_name} VALUES \n" + ", \n".join(insert_rows)
|
|
|
+
|
|
|
+ try:
|
|
|
+ res2 = cfg.dest_inspect.cursor.execute(insert_query)
|
|
|
+ print(res2.rowcount)
|
|
|
+ res2.commit()
|
|
|
+ except Exception:
|
|
|
+ with open(f"{cfg.stage_dir}\\diff\\{source_table.table_client}_insert_{i}.sql", "w") as fwh:
|
|
|
+ fwh.write(insert_query)
|
|
|
+ print(f"Tabellen-Import fuer {source_table.table_client} abgebrochen")
|
|
|
|
|
|
res = cfg.dest_inspect.cursor.execute(dest_table.delete_query)
|
|
|
print(res.rowcount)
|