123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import json
- from pathlib import Path
- import pandas as pd
- from model import DatabaseInspect, load_config
- def get_import_config(filename: str, db_name: str):
- df = pd.read_csv(filename, sep=";", encoding="latin-1")
- if "cols" not in df.columns:
- df["target_db"] = db_name
- df["cols"] = ""
- df[["source", "target", "target_db", "filter", "query", "iterative", "cols"]].to_csv(
- filename, sep=";", encoding="latin-1", index=False
- )
- return df[df["target"].notnull()]
- def create(config_file: str = "database/CARLO.json"):
- cfg = load_config(config_file)
- base_dir = str(Path(cfg.batch_dir).parent)
- config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.target_dsn.database)
- source_db = DatabaseInspect(cfg.source_dsn, source=True)
- source_tables = source_db.get_tables()
- print(json.dumps(source_db.get_prefix(), indent=2))
- target_db = DatabaseInspect(cfg.target_dsn)
- target_tables = target_db.get_tables()
- for _, current_table in config.iterrows():
- with open(f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850") as f:
- f.write("@echo off \n")
- f.write("rem ==" + current_table["target"] + "==\n")
- if not current_table["target"] in target_tables:
- f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
- print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
- continue
- f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
- f.write(
- f"sqlcmd.exe {target_db.bcp_conn_params} -p "
- + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn.schema}].[{current_table['target']}]\" \n"
- )
- target_columns_list = target_db.get_columns(current_table["target"])
- target_column_types = target_db.get_columns_is_typeof_str(current_table["target"])
- if "CLIENT_DB" in target_columns_list:
- target_columns_list.remove("CLIENT_DB")
- target_columns_list.append("Client_DB")
- target_columns = set(target_columns_list)
- for client_db, prefix in cfg.clients.items():
- source_table = current_table["source"].format(prefix)
- if source_table not in source_tables:
- source_table2 = source_db.convert_table(source_table)
- if source_table2 not in source_tables:
- f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
- print(f"Quell-Tabelle '{source_table}' existiert nicht!")
- continue
- source_columns = set(source_db.get_columns(source_table))
- intersect = source_columns.intersection(target_columns)
- # print("Auf beiden Seiten: " + ";".join(intersect))
- diff1 = source_columns.difference(target_columns)
- if len(diff1) > 0:
- f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
- diff2 = target_columns.difference(source_columns)
- if "Client_DB" not in diff2:
- f.write("echo Spalte 'Client_DB' fehlt!\n")
- print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
- continue
- diff2.remove("Client_DB")
- if len(diff2) > 0:
- f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
- if not pd.isnull(current_table["query"]):
- select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
- elif "." in source_table or cfg.source_dsn.schema == "":
- if source_table[0] != "[":
- source_table = f"[{source_table}]"
- select_query = f"SELECT T1.* FROM {source_table} T1 "
- else:
- select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
- if not pd.isnull(current_table["filter"]):
- select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
- # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
- select_columns = ""
- for col, is_char_type in zip(target_columns_list, target_column_types):
- if col in intersect:
- if is_char_type:
- select_columns += f"dbo.cln(T1.[{col}]), "
- else:
- select_columns += f"T1.[{col}], "
- elif col == "Client_DB":
- select_columns += f"'{client_db}' as \\\"Client_DB\\\", "
- else:
- select_columns += "'' as \\\"" + col + '\\", '
- select_query = select_query.replace("T1.*", select_columns[:-2])
- if "timestamp" in source_columns:
- select_query += " ORDER BY T1.[timestamp] "
- else:
- print(current_table["target"] + " hat kein timestamp-Feld")
- pkey = target_db.get_pkey(current_table["target"])
- if len(pkey) == 0:
- print(current_table["target"] + " hat keinen Primaerschluessel")
- select_query = select_query.replace("%", "%%") # batch-Problem
- stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
- logfile = f"{cfg.logs_dir}\\{current_table['target']}_{client_db}"
- # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
- # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
- # print(select_query)
- bulk_copy = "bcp" if cfg.source_dsn.driver == "mssql" else "cet"
- f.write(
- f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params} -c -C 65001 -m 1000 '
- + f'-e "{logfile}.queryout.log" > "{logfile}.bcp1.log" \n'
- )
- f.write(f'type "{logfile}.bcp1.log" | findstr -v "1000" \n')
- f.write(
- f"bcp [{cfg.target_dsn.schema}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params} "
- + f'-c -C 65001 -m 1000 -e "{logfile}.in.log" > "{logfile}.bcp2.log" \n'
- )
- f.write(f'type "{logfile}.bcp2.log" | findstr -v "1000" \n')
- f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
- with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
- f.write("@echo off & cd /d %~dp0 \n")
- f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
- for index, current_table in config.iterrows():
- f.write(f"echo =={current_table['target']}==\n")
- f.write(f"echo {current_table['target']} >CON \n")
- f.write(f"call {current_table['target']}.bat\n\n")
- if __name__ == "__main__":
- create()
|