| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 | import jsonfrom collections import namedtuplefrom pathlib import Pathimport pandas as pdimport pyodbc# from re import escape# from numpy import select# from dataclasses import dataclassDbCreateConfig = namedtuple(    "DbCreateConfig",    "name csv_file clients filter source_dsn target_dsn stage_dir batch_dir",)DsnConfig = namedtuple("DsnConfig", "user password server database driver schema")cfg = DbCreateConfig(    **{        "name": "CARLO",        "csv_file": "CARLO.csv",        "clients": {"1": "M und S Fahrzeughandel GmbH"},        "filter": ["01.01.2018", "01.01.2019"],        "source_dsn": {            "user": "sa",            "password": "Mffu3011#",            "server": "GC-SERVER1\\GLOBALCUBE",            "database": "DE0017",            "driver": "mssql",            "schema": "dbo",        },        "target_dsn": {            "user": "sa",            "password": "Mffu3011#",            "server": "GC-SERVER1\\GLOBALCUBE",            "database": "CARLO2",            "driver": "mssql",            "schema": "import",        },        "stage_dir": "..\\temp",        "batch_dir": "..\\batch",        "logs_dir": "..\\logs",    })class database_inspect:    tables = []    def __init__(self, dsn, source=False):        self.dsn = DsnConfig(**dsn)        self.type = "SOURCE" if source else "DEST"        self.cursor = self.connect()    def conn_string(self):        if self.dsn.driver == "mssql":            return ";".join(                [                    "Driver={SQL Server Native Client 11.0}",                    f"Server={self.dsn.server}",                    f"Database={self.dsn.database}",                    f"Uid={self.dsn.user}",                    f"Pwd={self.dsn.password}",                ]            )        if self.dsn.driver == "mysql":            return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"        return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"    def conn_ini(self):        return "\r\n".join(            [                f'{self.type}_SERVER="{self.dsn.server}"',                f'{self.type}_USER="{self.dsn.user}"',                f'{self.type}_PASSWORD="{self.dsn.password}"',                f'{self.type}_DATABASE="{self.dsn.database}"',            ]        )    def bcp_conn_params(self):        return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"    def connect(self):        c = pyodbc.connect(self.conn_string())        return c.cursor()    def get_tables(self):        tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]        views = [x[2] for x in self.cursor.tables(tableType="VIEW")]        self.tables = tables + views        return self.tables    def get_prefix(self):        if (len(self.tables)) == 0:            self.get_tables()        source_tables_prefix = dict(            enumerate(                sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1            )        )        if len(source_tables_prefix) == 0:            q = self.cursor.execute("select name FROM sys.databases")            source_tables_prefix = [x[0] for x in q.fetchall()]        return source_tables_prefix    def get_columns(self, table):        source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]        if len(source_insp_cols) == 0:            q = self.cursor.execute(                "SELECT COLUMN_NAME as column_name FROM information_schema.columns "                + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"            )            source_insp_cols = [col[0] for col in q.fetchall()]        return source_insp_cols    def convert_table(self, table):        if "." in table:            table = table.split(".")[-1]        if "[" in table:            table = table[1:-1]        return tabledef load_config(config_file: str):    cfg_import = json.load(open(config_file, "r", encoding="latin-1"))    base_dir = Path(config_file).resolve().parent    cfg_import["name"] = Path(config_file).stem    if cfg_import["stage_dir"][:2] == "..":        cfg_import["stage_dir"] = str(            base_dir.joinpath(cfg_import["stage_dir"]).resolve()        )    if cfg_import["batch_dir"][:2] == "..":        cfg_import["batch_dir"] = str(            base_dir.joinpath(cfg_import["batch_dir"]).resolve()        )    if "logs_dir" not in cfg_import:        cfg_import["logs_dir"] = "..\\logs"    if cfg_import["batch_dir"][:2] == "..":        cfg_import["batch_dir"] = str(            base_dir.joinpath(cfg_import["logs_dir"]).resolve()        )    return DbCreateConfig(**cfg_import)def create(config_file="dbtools/OPTIMA.json"):  #    cfg = load_config(config_file)    base_dir = str(Path(cfg.batch_dir).parent)    df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")    if "cols" not in df.columns:        df["target_db"] = ""        df["cols"] = ""        df.to_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")    config = df[df["target"].notnull()]    # print(config.head())    source_db = database_inspect(cfg.source_dsn, source=True)    source_tables = source_db.get_tables()    print(source_db.get_prefix())    target_db = database_inspect(cfg.target_dsn)    target_tables = target_db.get_tables()    for _, current_table in config.iterrows():        with open(            f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850"        ) as f:            f.write("@echo off \n")            f.write("rem ==" + current_table["target"] + "==\n")            if not current_table["target"] in target_tables:                f.write(                    f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n"                )                print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")                continue            f.write(                f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n"            )            f.write(                f"sqlcmd.exe {target_db.bcp_conn_params()} -p "                + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n"            )            target_columns_list = target_db.get_columns(current_table["target"])            if "CLIENT_DB" in target_columns_list:                target_columns_list.remove("CLIENT_DB")                target_columns_list.append("Client_DB")            target_columns = set(target_columns_list)            for client_db, prefix in cfg.clients.items():                source_table = current_table["source"].format(prefix)                if source_table not in source_tables:                    source_table2 = source_db.convert_table(source_table)                    if source_table2 not in source_tables:                        f.write(                            f"echo Quell-Tabelle '{source_table}' existiert nicht!\n"                        )                        print(f"Quell-Tabelle '{source_table}' existiert nicht!")                        continue                source_columns = set(source_db.get_columns(source_table))                intersect = source_columns.intersection(target_columns)                # print("Auf beiden Seiten: " + ";".join(intersect))                diff1 = source_columns.difference(target_columns)                if len(diff1) > 0:                    f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")                diff2 = target_columns.difference(source_columns)                if "Client_DB" not in diff2:                    f.write("echo Spalte 'Client_DB' fehlt!\n")                    print(                        f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!"                    )                    continue                diff2.remove("Client_DB")                if len(diff2) > 0:                    f.write("rem Nur in Ziel:   " + ";".join(diff2) + "\n")                if not pd.isnull(current_table["query"]):                    select_query = current_table["query"].format(                        prefix, cfg.filter[0], cfg.filter[1]                    )                elif "." in source_table or cfg.source_dsn["schema"] == "":                    if source_table[0] != "[":                        source_table = f"[{source_table}]"                    select_query = f"SELECT T1.* FROM {source_table} T1 "                else:                    select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "                if not pd.isnull(current_table["filter"]):                    select_query += " WHERE " + current_table["filter"].format(                        "", cfg.filter[0], cfg.filter[1]                    )                # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"                select_columns = ""                for col in target_columns_list:                    if col in intersect:                        select_columns += f"T1.[{col}], "                    elif col == "Client_DB":                        select_columns += "'" + client_db + '\' as \\"Client_DB\\", '                    else:                        select_columns += "'' as \\\"" + col + '\\", '                select_query = select_query.replace("T1.*", select_columns[:-2])                select_query = select_query.replace("%", "%%")  # batch-Problem                stage_csv = (                    f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"                )                # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','                # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"                # print(select_query)                bulk_copy = "bcp" if cfg.source_dsn["driver"] == "mssql" else "cet"                f.write(                    f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 '                    + f'-e "{stage_csv[:-4]}.queryout.log" > "{stage_csv[:-4]}.bcp1.log" \n'                )                f.write(f'type "{stage_csv[:-4]}.bcp1.log" | findstr -v "1000" \n')                f.write(                    f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} "                    + f'-c -C 65001 -m 1000 -e "{stage_csv[:-4]}.in.log" > "{stage_csv[:-4]}.bcp2.log" \n'                )                f.write(f'type "{stage_csv[:-4]}.bcp2.log" | findstr -v "1000" \n')                f.write(f'del "{stage_csv}" /F >nul 2>nul \n')    with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:        f.write("@echo off & cd /d %~dp0 \n")        f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")        for index, current_table in config.iterrows():            f.write(f"echo =={current_table['target']}==\n")            f.write(f"echo {current_table['target']} >CON \n")            f.write(f"call {current_table['target']}.bat\n\n")if __name__ == "__main__":    create()
 |