import json from pathlib import Path import pandas as pd from database.model import DatabaseInspect, create_db_ini, load_config def get_import_config(filename: str, db_name: str): df = pd.read_csv(filename, sep=";", encoding="latin-1") if "dest" not in df.columns: df["dest"] = df["target"] df["dest_db"] = db_name df["cols"] = "" df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv( filename, sep=";", encoding="latin-1", index=False ) return df[df["dest"].notnull()] def create(config_file: str = "database/CARLO.json"): cfg = load_config(config_file) create_db_ini(cfg) base_dir = str(Path(config_file).parent.parent.resolve()) config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database) source_db = DatabaseInspect(cfg.source_dsn, source=True) source_tables = source_db.get_tables() print(json.dumps(source_db.get_prefix(), indent=2)) dest_db = DatabaseInspect(cfg.dest_dsn) dest_tables = dest_db.get_tables() for _, current_table in config.iterrows(): with open(f"{cfg.batch_dir}/{current_table['dest']}.bat", "w", encoding="cp850") as f: full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]' f.write("@echo off\n") f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n') f.write("rem ==" + current_table["dest"] + "==\n") if not current_table["dest"] in dest_tables: f.write(f"echo Ziel-Tabelle '{current_table['dest']}' existiert nicht!\n") print(f"Ziel-Tabelle '{current_table['dest']}' existiert nicht!") continue f.write(f"del {cfg.logs_dir}\\{current_table['dest']}*.* /Q /F >nul 2>nul\n\n") f.write('if not "%1"=="" goto :increment\n') f.write("\n:full\n") f.write(f' call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n') dest_columns_list = dest_db.get_columns(current_table["dest"]) dest_column_types = dest_db.get_columns_is_typeof_str(current_table["dest"]) if "CLIENT_DB" in dest_columns_list: dest_columns_list.remove("CLIENT_DB") dest_columns_list.append("Client_DB") dest_columns = set(dest_columns_list) select_queries: dict[str, str] = {} for client_db, prefix in cfg.clients.items(): table_client = f'{current_table["dest"]}_{client_db}' source_table = current_table["source"].format(prefix) if source_table not in source_tables: source_table2 = source_db.convert_table(source_table) if source_table2 not in source_tables: f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n") print(f"Quell-Tabelle '{source_table}' existiert nicht!") continue source_columns = set(source_db.get_columns(source_table)) intersect = source_columns.intersection(dest_columns) # print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = source_columns.difference(dest_columns) if len(diff1) > 0: f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n") diff2 = dest_columns.difference(source_columns) if "Client_DB" not in diff2: f.write("echo Spalte 'Client_DB' fehlt!\n") print(f"Ziel-Tabelle '{current_table['dest']}' Spalte 'Client_DB' fehlt!") continue diff2.remove("Client_DB") if len(diff2) > 0: f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n") if not pd.isnull(current_table["query"]): select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1]) elif "." in source_table or cfg.source_dsn.schema == "": if source_table[0] != "[": source_table = f"[{source_table}]" select_query = f"SELECT T1.* FROM {source_table} T1 " else: select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 " if not pd.isnull(current_table["filter"]): select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1]) # select_columns = "T1.[" + "], T1.[".join(intersect) + "]," select_columns = "" for col, is_char_type in zip(dest_columns_list, dest_column_types): if col in intersect: if False and is_char_type: # vorerst deaktiviert select_columns += f"dbo.cln(T1.[{col}]), " else: select_columns += f"T1.[{col}], " elif col == "Client_DB": select_columns += f"'{client_db}' as [Client_DB], " else: select_columns += "'' as [" + col + "], " select_query = select_query.replace("T1.*", select_columns[:-2]) if "timestamp" in source_columns: select_query += " ORDER BY T1.[timestamp] " else: print(current_table["dest"] + " hat kein timestamp-Feld") select_query = select_query.replace("%", "%%%%") # batch-Problem select_queries[table_client] = select_query f.write(f' call bcp_queryout.bat "{table_client}" "{select_query}"\n') f.write( f' call bcp_in.bat "{table_client}" ' f'"[{cfg.dest_dsn.schema}].[{current_table["dest"]}]" "{current_table["dest_db"]}"\n' ) f.write(" goto :cleanup\n\n") f.write(":increment\n") temp_table_name = f"[{cfg.temp_db}].[temp].[{current_table['dest']}]" f.write(f' call sql_query.bat "TRUNCATE TABLE {temp_table_name}"\n\n') for client_db, prefix in cfg.clients.items(): table_client = f'{current_table["dest"]}_{client_db}' select_query = select_queries[table_client] convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)" if "WHERE" in select_query: select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND") elif "ORDER" in select_query: select_query = select_query.replace("ORDER", f"WHERE {convert_timestamp} ORDER") else: print("Dont know where to put WHERE") f.write(f' call sql_timestamp.bat "{table_client}" "{full_table_name}" "{client_db}"\n') f.write(f' call bcp_queryout.bat "{table_client}" "{select_query}"\n') f.write(f' call bcp_in.bat "{table_client}" "[temp].[{current_table["dest"]}]" "{cfg.temp_db}"\n\n') insert_query = f"INSERT INTO {full_table_name} SELECT * FROM {temp_table_name} T1" pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"]) if len(pkey) == 0: print(current_table["dest"] + " hat keinen Primaerschluessel") f.write(f" rem {current_table['dest']} hat keinen Primaerschluessel") else: pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in pkey] pkey_join = " AND ".join(pkey_join_list) delete_query = f"DELETE T1 FROM {full_table_name} T1 INNER JOIN {temp_table_name} T2 ON {pkey_join}" f.write(f' call sql_query.bat "{delete_query}"\n') f.write(f' call sql_query.bat "{insert_query}"\n') f.write("\n:cleanup\n") for client_db, prefix in cfg.clients.items(): stage_csv = f"{cfg.stage_dir}\\{current_table['dest']}_{client_db}.csv" f.write(f' del "{stage_csv}" /F >nul 2>nul\n') with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f: f.write("@echo off & cd /d %~dp0\n") f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n") for index, current_table in config.iterrows(): f.write(f"echo =={current_table['dest']}==\n") f.write(f"echo {current_table['dest']} >CON\n") f.write(f"call {cfg.batch_dir}\\{current_table['dest']}.bat 1\n\n") if __name__ == "__main__": create()