import json from collections import namedtuple from pathlib import Path import pandas as pd import pyodbc # from re import escape # from numpy import select # from dataclasses import dataclass DbCreateConfig = namedtuple( "DbCreateConfig", "name csv_file clients filter source_dsn target_dsn stage_dir batch_dir", ) DsnConfig = namedtuple("DsnConfig", "user password server database driver schema") cfg = DbCreateConfig( **{ "name": "CARLO", "csv_file": "CARLO.csv", "clients": {"1": "M und S Fahrzeughandel GmbH"}, "filter": ["01.01.2018", "01.01.2019"], "source_dsn": { "user": "sa", "password": "Mffu3011#", "server": "GC-SERVER1\\GLOBALCUBE", "database": "DE0017", "driver": "mssql", "schema": "dbo", }, "target_dsn": { "user": "sa", "password": "Mffu3011#", "server": "GC-SERVER1\\GLOBALCUBE", "database": "CARLO2", "driver": "mssql", "schema": "import", }, "stage_dir": "..\\temp", "batch_dir": "..\\batch", } ) class database_inspect: tables = [] def __init__(self, dsn): self.dsn = DsnConfig(**dsn) self.cursor = self.connect() def conn_string(self): if self.dsn.driver == "mssql": return ( "Driver={SQL Server Native Client 11.0};" + f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}" ) if self.dsn.driver == "mysql": return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4" return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}" def bcp_conn_params(self): return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}" def connect(self): c = pyodbc.connect(self.conn_string()) return c.cursor() def get_tables(self): tables = [x[2] for x in self.cursor.tables(tableType="TABLE")] views = [x[2] for x in self.cursor.tables(tableType="VIEW")] self.tables = tables + views return self.tables def get_prefix(self): if (len(self.tables)) == 0: self.get_tables() source_tables_prefix = dict( enumerate( sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1 ) ) if len(source_tables_prefix) == 0: q = self.cursor.execute("select name FROM sys.databases") source_tables_prefix = [x[0] for x in q.fetchall()] return source_tables_prefix def get_columns(self, table): source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)] if len(source_insp_cols) == 0: q = self.cursor.execute( "SELECT COLUMN_NAME as column_name FROM information_schema.columns " + f"WHERE TABLE_NAME = '{self.convert_table(table)}'" ) source_insp_cols = [col[0] for col in q.fetchall()] return source_insp_cols def convert_table(self, table): if "." in table: table = table.split(".")[-1] if "[" in table: table = table[1:-1] return table def create(config_file="dbtools/OPTIMA.json"): cfg_import = json.load(open(config_file, "r", encoding="latin-1")) base_dir = Path(config_file).resolve().parent cfg_import["name"] = Path(config_file).stem if cfg_import["stage_dir"][:2] == "..": cfg_import["stage_dir"] = str( base_dir.joinpath(cfg_import["stage_dir"]).resolve() ) if cfg_import["batch_dir"][:2] == "..": cfg_import["batch_dir"] = str( base_dir.joinpath(cfg_import["batch_dir"]).resolve() ) cfg = DbCreateConfig(**cfg_import) df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1") config = df[df["target"].notnull()] print(config.head()) source_db = database_inspect(cfg.source_dsn) source_tables = source_db.get_tables() print(source_db.get_prefix()) target_db = database_inspect(cfg.target_dsn) target_tables = target_db.get_tables() for index, current_table in config.iterrows(): with open( f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850" ) as f: f.write("@echo off \n") f.write("rem ==" + current_table["target"] + "==\n") if not current_table["target"] in target_tables: f.write( f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n" ) print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!") continue f.write( f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n" ) f.write( f"sqlcmd.exe {target_db.bcp_conn_params()} -p " + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n" ) target_columns_list = target_db.get_columns(current_table["target"]) if "CLIENT_DB" in target_columns_list: target_columns_list.remove("CLIENT_DB") target_columns_list.append("Client_DB") target_columns = set(target_columns_list) for client_db, prefix in cfg.clients.items(): source_table = current_table["source"].format(prefix) if source_table not in source_tables: source_table2 = source_db.convert_table(source_table) if source_table2 not in source_tables: f.write( f"echo Quell-Tabelle '{source_table}' existiert nicht!\n" ) print(f"Quell-Tabelle '{source_table}' existiert nicht!") continue source_columns = set(source_db.get_columns(source_table)) intersect = source_columns.intersection(target_columns) # print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = source_columns.difference(target_columns) if len(diff1) > 0: f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n") diff2 = target_columns.difference(source_columns) if "Client_DB" not in diff2: f.write("echo Spalte 'Client_DB' fehlt!\n") print( f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!" ) continue diff2.remove("Client_DB") if len(diff2) > 0: f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n") if not pd.isnull(current_table["query"]): select_query = current_table["query"].format( prefix, cfg.filter[0], cfg.filter[1] ) elif "." in source_table or cfg.source_dsn["schema"] == "": select_query = f'SELECT T1.* FROM \\"{source_table}\\" T1 ' else: select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 " if not pd.isnull(current_table["filter"]): select_query += " WHERE " + current_table["filter"].format( "", cfg.filter[0], cfg.filter[1] ) # select_columns = "T1.[" + "], T1.[".join(intersect) + "]," select_columns = "" for col in target_columns_list: if col in intersect: select_columns += f"T1.[{col}], " elif col == "Client_DB": select_columns += "'" + client_db + '\' as \\"Client_DB\\", ' else: select_columns += "'' as \\\"" + col + '\\", ' select_query = select_query.replace("T1.*", select_columns[:-2]) select_query = select_query.replace("%", "%%") # batch-Problem stage_csv = ( f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv" ) # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ',' # ENCLOSED BY '\"' LINES TERMINATED BY '\n';" # print(select_query) bulk_copy = "bcp" if cfg.source_dsn["driver"] == "mssql" else "cet" f.write( f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 ' + f'-e "{stage_csv[:-4]}.queryout.log" > "{stage_csv[:-4]}.bcp1.log" \n' ) f.write(f'type "{stage_csv[:-4]}.bcp1.log" | findstr -v "1000" \n') f.write( f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} " + f'-c -C 65001 -m 1000 -e "{stage_csv[:-4]}.in.log" > "{stage_csv[:-4]}.bcp2.log" \n' ) f.write(f'type "{stage_csv[:-4]}.bcp2.log" | findstr -v "1000" \n') f.write(f'del "{stage_csv}" /F >nul 2>nul \n') with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f: f.write("@echo off & cd /d %~dp0 \n") f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n") for index, current_table in config.iterrows(): f.write(f"echo =={current_table['target']}==\n") f.write(f"echo {current_table['target']} >CON \n") f.write(f"call {current_table['target']}.bat\n\n") if __name__ == "__main__": create()