import io import json import sys from dataclasses import dataclass from functools import cached_property from pathlib import Path import pandas as pd sys.path.insert(0, "C:\\Projekte\\tools") from database.model import ( # noqa:E402 DatabaseInspect, DbCreateConfig, create_db_ini, load_config, ) def get_import_config(filename: str, db_name: str): df = pd.read_csv(filename, sep=";", encoding="latin-1") if "dest" not in df.columns: df["dest"] = df["target"] df["dest_db"] = db_name df["cols"] = "" df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv( filename, sep=";", encoding="latin-1", index=False ) return df[df["dest"].notnull()] @dataclass class SourceTable: source: str client_db: str prefix: str @property def stage_csv(self) -> str: return "" @property def table_client(self) -> str: return "" @property def table_name(self) -> str: return "" @property def select_query(self) -> str: return "" @dataclass class DestTable: source: str dest: str dest_db: str filter_: str query: str iterative: str cols: str source_tables: list[SourceTable] = None dest_inspect: DatabaseInspect = None def table_batch_file(self, batch_dir: str) -> str: return f"{batch_dir}/{self.dest}.bat" def full_table_name(self, schema_name: str) -> str: return f"[{self.dest_db}].[{schema_name}].[{self.dest}]" def temp_table_name(self, temp_db: str) -> str: return f"[{temp_db}].[temp].[{self.dest}]" @cached_property def columns_list(self) -> list[str]: res = self.dest_inspect.get_columns(self.dest) if "CLIENT_DB" in res: res.remove("CLIENT_DB") res.append("Client_DB") return res @cached_property def column_types(self) -> list[str]: return self.dest_inspect.get_columns_is_typeof_str(self.dest) @cached_property def primary_key(self) -> list[str]: return self.dest_inspect.get_pkey(self.dest, self.dest_db) @property def insert_query(self) -> str: return f"INSERT INTO {self.full_table_name} SELECT * FROM {self.temp_table_name} T1" @property def delete_query(self) -> str: # pkey = self.primary_key if len(self.primary_key) == 0: return "" pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key] pkey_join = " AND ".join(pkey_join_list) return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}" class SourceTable2(SourceTable): dest_table: DestTable cfg: DbCreateConfig _select_query: str = None source_inspect: DatabaseInspect = None info: str = "" @property def table_client(self) -> str: return f"{self.dest_table.dest}_{self.client_db}" @property def stage_csv(self) -> str: return f"{self.cfg.stage_dir}\\{self.table_client}.csv" @property def table_name(self) -> str: return self.source.format(self.prefix) @cached_property def select_query(self): f = io.StringIO() source_columns = set(self.source_inspect.get_columns(self.table_name)) intersect = source_columns.intersection(self.dest_table.columns_list) # print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = source_columns.difference(self.dest_table.columns_list) if len(diff1) > 0: f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n") diff2 = set(self.dest_table.columns_list).difference(source_columns) if "Client_DB" not in diff2: f.write("echo Spalte 'Client_DB' fehlt!\n") return diff2.remove("Client_DB") if len(diff2) > 0: f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n") if not pd.isnull(self.dest_table.query): select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1]) elif "." in self.table_name or self.cfg.source_dsn.schema == "": if self.table_name[0] != "[": self.table_name = f"[{self.table_name}]" select_query = f"SELECT T1.* FROM {self.table_name} T1 " else: select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 " if not pd.isnull(self.dest_table.filter_): select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1]) elif "WHERE" not in select_query: select_query += " WHERE 1 = 1" # select_columns = "T1.[" + "], T1.[".join(intersect) + "]," select_columns = "" for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types): if col in intersect: if False and is_char_type: # vorerst deaktiviert select_columns += f"dbo.cln(T1.[{col}]), " else: select_columns += f"T1.[{col}], " elif col == "Client_DB": select_columns += f"'{self.client_db}' as [Client_DB], " else: select_columns += "'' as [" + col + "], " select_query = select_query.replace("T1.*", select_columns[:-2]) if "timestamp" in source_columns: select_query += " ORDER BY T1.[timestamp] " else: print(self.dest_table.dest + " hat kein timestamp-Feld") self.info = f.getvalue() return select_query def create(config_file: str = "database/CARLO.json"): cfg = load_config(config_file) create_db_ini(cfg) base_dir = str(Path(config_file).parent.parent.resolve()) source_inspect = DatabaseInspect(cfg.source_dsn, source=True) print(json.dumps(source_inspect.get_prefix(), indent=2)) SourceTable2.source_inspect = source_inspect SourceTable2.cfg = cfg dest_inspect = DatabaseInspect(cfg.dest_dsn) # DestTable.dest_inspect = dest_inspect config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database) table_config = [DestTable(*row.values()) for row in config.to_dict(orient="records")] for dest_table in table_config: dest_table.dest_inspect = dest_inspect dest_table.source_tables = [] for client_db, prefix in cfg.clients.items(): st = SourceTable2(dest_table.source, client_db, prefix) st.dest_table = dest_table dest_table.source_tables.append(st) for dest_table in table_config: with open(dest_table.table_batch_file(cfg.batch_dir), "w", encoding="cp850") as f: full_table_name = dest_table.full_table_name(cfg.dest_dsn.schema) f.write("@echo off\n") f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n') f.write("rem ==" + dest_table.dest + "==\n") if dest_table.dest not in dest_inspect.tables_list: f.write(f"echo Ziel-Tabelle '{dest_table.dest}' existiert nicht!\n") print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!") continue f.write(f"del {cfg.logs_dir}\\{dest_table.dest}*.* /Q /F >nul 2>nul\n\n") f.write('if not "%1"=="" goto :increment\n') f.write("\n:full\n") f.write(f' call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n') for source_table in dest_table.source_tables: if source_table.table_name not in source_inspect.tables_list: source_table2 = source_inspect.convert_table(source_table.table_name) if source_table2 not in source_inspect.tables_list: f.write(f"echo Quell-Tabelle '{source_table.table_name}' existiert nicht!\n") print(f"Quell-Tabelle '{source_table.table_name}' existiert nicht!") continue select_query = source_table.select_query.replace("%", "%%%%") # batch-Problem f.write(source_table.info) if select_query == "": print(f"Ziel-Tabelle '{dest_table.dest}' Spalte 'Client_DB' fehlt!") continue f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n') f.write( f' call bcp_in.bat "{source_table.table_client}" ' f'"[{cfg.dest_dsn.schema}].[{dest_table.dest}]" "{dest_table.dest_db}"\n' ) f.write(" goto :cleanup\n\n") f.write(":increment\n") f.write(f' call sql_query.bat "TRUNCATE TABLE {dest_table.temp_table_name}"\n\n') for source_table in dest_table.source_tables: select_query = source_table.select_query convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)" if "WHERE" in select_query: select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND") else: print("Dont know where to put WHERE") f.write(f' call sql_timestamp.bat "{source_table.table_client}" "{full_table_name}" "{client_db}"\n') f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n') f.write( f' call bcp_in.bat "{source_table.table_client}" "[temp].[{dest_table.dest}]" "{cfg.temp_db}"\n\n' ) if dest_table.delete_query == "": print(dest_table.dest + " hat keinen Primaerschluessel") f.write(f" rem {dest_table.dest} hat keinen Primaerschluessel") else: f.write(f' call sql_query.bat "{dest_table.delete_query}"\n') f.write(f' call sql_query.bat "{dest_table.insert_query}"\n') f.write("\n:cleanup\n") for source_table in dest_table.source_tables: f.write(f' call delete.bat "{source_table.stage_csv}"\n') with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f: f.write("@echo off & cd /d %~dp0\n") f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n") for dest_table in table_config: f.write(f"echo =={dest_table.dest}==\n") f.write(f"echo {dest_table.dest} >CON\n") f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat 1\n\n") with open(f"{cfg.batch_dir}/_{cfg.name}_full_load.bat", "w", encoding="cp850") as f: f.write("@echo off & cd /d %~dp0\n") f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n") for dest_table in table_config: f.write(f"echo =={dest_table.dest}==\n") f.write(f"echo {dest_table.dest} >CON\n") f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat\n\n") if __name__ == "__main__": create()