import plac import pandas as pd from sqlalchemy import create_engine, inspect from database import bcp_conn_params, conn_string import json from pathlib import Path from collections import namedtuple DbCreateConfig = namedtuple('DbCreateConfig', 'name csv_file clients filter source_dsn target_dsn stage_dir batch_dir') cfg = DbCreateConfig(**{ 'name': 'CARLO', 'csv_file': 'CARLO.csv', 'clients': {'1': 'M und S Fahrzeughandel GmbH'}, 'filter': ['01.01.2018', '01.01.2019'], 'source_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'}, 'target_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'}, 'stage_dir': '..\\temp', 'batch_dir': '..\\batch' }) class database_inspect(): def __init__(self, dsn): self.dsn = dsn self.engine = create_engine(conn_string(self.dsn)) self.insp = inspect(self.engine) def get_tables(self): self.tables = self.insp.get_table_names(schema=self.dsn['schema']) + self.insp.get_view_names(schema=self.dsn['schema']) return self.tables def get_prefix(self): source_tables_prefix = dict(enumerate(sorted(list(set([t.split('$')[0] for t in self.tables if '$' in t]))), 1)) if len(source_tables_prefix) == 0: q = self.engine.execute('select name FROM sys.databases') source_tables_prefix = [x[0] for x in q.fetchall()] return source_tables_prefix def get_columns(self, table): source_insp_cols = self.insp.get_columns(table) if len(source_insp_cols) == 0: q = self.engine.execute(f"SELECT COLUMN_NAME as name FROM information_schema.columns WHERE TABLE_NAME = '{self.convert_table(table)}'") source_insp_cols = q.fetchall() return set([col['name'] for col in source_insp_cols]) def covert_table(self, table): if '.' in table: table = table.split('.')[-1] if '[' in table: table = table[1:-1] return table @plac.pos('config_file', '', type=str) def create(config_file='dbtools/OPTIMA.json'): cfg_import = json.load(open(config_file, 'r', encoding='ansi')) base_dir = Path(config_file).resolve().parent cfg_import['name'] = Path(config_file).stem if cfg_import['stage_dir'][:2] == '..': cfg_import['stage_dir'] = str(base_dir.joinpath(cfg_import['stage_dir']).resolve()) if cfg_import['batch_dir'][:2] == '..': cfg_import['batch_dir'] = str(base_dir.joinpath(cfg_import['batch_dir']).resolve()) cfg = DbCreateConfig(**cfg_import) df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=';', encoding='ansi') config = df[df['target'].notnull()] print(config.head()) source_db = database_inspect(cfg.source_dsn) source_tables = source_db.get_tables() print(source_db.get_prefix()) target_db = database_inspect(cfg.target_dsn) target_tables = target_db.get_tables() for index, current_table in config.iterrows(): with open(f"{cfg.batch_dir}/{current_table['target']}.bat", 'w', encoding='cp850') as f: f.write('@echo off \n') f.write('rem ==' + current_table['target'] + '==\n') if not current_table['target'] in target_tables: f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n") print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!") continue f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n") f.write(f"sqlcmd.exe {bcp_conn_params(cfg.target_dsn)} -p -Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n") target_columns_list = target_db.get_columns(current_table['target']) if 'CLIENT_DB' in target_columns_list: target_columns_list.remove('CLIENT_DB') target_columns_list.append('Client_DB') target_columns = set(target_columns_list) for client_db, prefix in cfg.clients.items(): source_table = current_table['source'].format(prefix) if source_table not in source_tables: source_table2 = source_db.convert_table(source_table) if source_table2 not in source_tables: f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n") print(f"Quell-Tabelle '{source_table}' existiert nicht!") continue source_columns = source_db.get_columns(source_table) if not pd.isnull(current_table['query']): select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1]) elif '.' in source_table: select_query = f"SELECT T1.* FROM {source_table} T1 " else: select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 " if not pd.isnull(current_table['filter']): select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1]) intersect = source_columns.intersection(target_columns) # print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = source_columns.difference(target_columns) if len(diff1) > 0: f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n") diff2 = target_columns.difference(source_columns) if 'Client_DB' not in diff2: f.write("echo Spalte 'Client_DB' fehlt!\n") print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!") continue diff2.remove('Client_DB') if len(diff2) > 0: f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n") # select_columns = "T1.[" + "], T1.[".join(intersect) + "]," select_columns = '' for col in target_columns_list: if col in intersect: select_columns += f"T1.[{col}], " elif col == 'Client_DB': select_columns += "'" + client_db + "' as \\\"Client_DB\\\", " else: select_columns += "'' as \\\"" + col + "\\\", " select_query = select_query.replace("T1.*", select_columns[:-2]) select_query = select_query.replace("%", "%%") # batch-Problem stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv" # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n';" # print(select_query) f.write(f"bcp \"{select_query}\" queryout \"{stage_csv}\" {bcp_conn_params(cfg.source_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n") f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n") f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {bcp_conn_params(cfg.target_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n") f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n") with open(f"{cfg.batch_dir}/_{cfg.name}.bat", 'w', encoding='cp850') as f: f.write("@echo off & cd /d %~dp0 \n") f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n") for index, current_table in config.iterrows(): f.write(f"echo =={current_table['target']}==\n") f.write(f"echo {current_table['target']} >CON \n") f.write(f"call {current_table['target']}.bat\n\n") if __name__ == '__main__': plac.call(create)