| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 | 
							- import json
 
- import os
 
- from dataclasses import dataclass, field
 
- from pathlib import Path
 
- import pandas as pd
 
- import pyodbc
 
- @dataclass
 
- class DsnConfig:
 
-     user: str = "sa"
 
-     password: str = "Mffu3011#"
 
-     server: str = "LOCALHOST\\GLOBALCUBE"
 
-     database: str = "CARLO"
 
-     driver: str = "mssql"
 
-     schema: str = "import"
 
- @dataclass
 
- class DbCreateConfig:
 
-     name: str = "CARLO"
 
-     csv_file: str = "CARLO.csv"
 
-     clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
 
-     filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
 
-     source_dsn: DsnConfig = None
 
-     target_dsn: DsnConfig = None
 
-     stage_dir: str = "..\\temp"
 
-     batch_dir: str = "..\\batch"
 
-     logs_dir: str = "..\\logs"
 
- class database_inspect:
 
-     tables = []
 
-     def __init__(self, dsn: DsnConfig, source=False):
 
-         self.dsn = dsn
 
-         self.type = "SOURCE" if source else "DEST"
 
-         self.cursor = self.connect()
 
-     @property
 
-     def conn_string(self):
 
-         if self.dsn.driver == "mssql":
 
-             return ";".join(
 
-                 [
 
-                     "Driver={SQL Server Native Client 11.0}",
 
-                     f"Server={self.dsn.server}",
 
-                     f"Database={self.dsn.database}",
 
-                     f"Uid={self.dsn.user}",
 
-                     f"Pwd={self.dsn.password}",
 
-                 ]
 
-             )
 
-         if self.dsn.driver == "mysql":
 
-             return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
 
-         return ";".join(
 
-             [
 
-                 "Driver={PostgreSQL Unicode}",
 
-                 f"Server={self.dsn.server}",
 
-                 "Port=5432",
 
-                 f"Database={self.dsn.database}",
 
-                 f"Uid={self.dsn.user}",
 
-                 f"Pwd={self.dsn.password}",
 
-             ]
 
-         )
 
-         # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
 
-     @property
 
-     def conn_ini(self):
 
-         return "\r\n".join(
 
-             [
 
-                 f'{self.type}_SERVER="{self.dsn.server}"',
 
-                 f'{self.type}_USER="{self.dsn.user}"',
 
-                 f'{self.type}_PASSWORD="{self.dsn.password}"',
 
-                 f'{self.type}_DATABASE="{self.dsn.database}"',
 
-             ]
 
-         )
 
-     @property
 
-     def bcp_conn_params(self):
 
-         return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
 
-     def connect(self):
 
-         c = pyodbc.connect(self.conn_string)
 
-         return c.cursor()
 
-     def get_tables(self):
 
-         tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
 
-         views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
 
-         self.tables = tables + views
 
-         return self.tables
 
-     def get_prefix(self):
 
-         if (len(self.tables)) == 0:
 
-             self.get_tables()
 
-         source_tables_prefix = dict(enumerate(sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1))
 
-         if len(source_tables_prefix) == 0:
 
-             q = self.cursor.execute("select name FROM sys.databases")
 
-             source_tables_prefix = [x[0] for x in q.fetchall()]
 
-         return source_tables_prefix
 
-     def get_columns(self, table):
 
-         source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
 
-         if len(source_insp_cols) == 0:
 
-             q = self.cursor.execute(
 
-                 "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
 
-                 + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
 
-             )
 
-             source_insp_cols = [col[0] for col in q.fetchall()]
 
-         return source_insp_cols
 
-     def get_columns_is_typeof_str(self, table):
 
-         source_insp_cols = [
 
-             col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
 
-         ]
 
-         if len(source_insp_cols) == 0:
 
-             q = self.cursor.execute(
 
-                 "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
 
-                 + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
 
-             )
 
-             source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
 
-         return source_insp_cols
 
-     def get_pkey(self, table):
 
-         source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table)]
 
-         if len(source_insp_cols) == 0:
 
-             q = self.cursor.execute(
 
-                 "SELECT COLUMN_NAME "
 
-                 "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
 
-                 "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
 
-                 f"AND TABLE_NAME = '{self.convert_table(table)}' "  # AND TABLE_SCHEMA = 'dbo'"
 
-             )
 
-             source_insp_cols = [col[0] for col in q.fetchall()]
 
-         return source_insp_cols
 
-     def convert_table(self, table):
 
-         if "." in table:
 
-             table = table.split(".")[-1]
 
-         if "[" in table:
 
-             table = table[1:-1]
 
-         return table
 
- def load_config(config_file: str):
 
-     cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
 
-     base_dir = Path(config_file).resolve().parent
 
-     cfg_import["name"] = Path(config_file).stem
 
-     if "logs_dir" not in cfg_import:
 
-         cfg_import["logs_dir"] = "..\\logs"
 
-     for folder in ["stage_dir", "batch_dir", "logs_dir"]:
 
-         if cfg_import[folder].startswith(".."):
 
-             cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
 
-         os.makedirs(cfg_import[folder], exist_ok=True)
 
-     cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
 
-     cfg_import["target_dsn"] = DsnConfig(**cfg_import["target_dsn"])
 
-     return DbCreateConfig(**cfg_import)
 
- def get_import_config(filename: str, db_name: str):
 
-     df = pd.read_csv(filename, sep=";", encoding="latin-1")
 
-     if "cols" not in df.columns:
 
-         df["target_db"] = db_name
 
-         df["cols"] = ""
 
-         df[["source", "target", "target_db", "filter", "query", "iterative", "cols"]].to_csv(
 
-             filename, sep=";", encoding="latin-1", index=False
 
-         )
 
-     return df[df["target"].notnull()]
 
- def create(config_file: str = "database/CARLO.json"):
 
-     cfg = load_config(config_file)
 
-     base_dir = str(Path(cfg.batch_dir).parent)
 
-     config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.target_dsn.database)
 
-     source_db = database_inspect(cfg.source_dsn, source=True)
 
-     source_tables = source_db.get_tables()
 
-     print(json.dumps(source_db.get_prefix(), indent=2))
 
-     target_db = database_inspect(cfg.target_dsn)
 
-     target_tables = target_db.get_tables()
 
-     for _, current_table in config.iterrows():
 
-         with open(f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850") as f:
 
-             f.write("@echo off \n")
 
-             f.write("rem ==" + current_table["target"] + "==\n")
 
-             if not current_table["target"] in target_tables:
 
-                 f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
 
-                 print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
 
-                 continue
 
-             f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
 
-             f.write(
 
-                 f"sqlcmd.exe {target_db.bcp_conn_params} -p "
 
-                 + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn.schema}].[{current_table['target']}]\" \n"
 
-             )
 
-             target_columns_list = target_db.get_columns(current_table["target"])
 
-             target_column_types = target_db.get_columns_is_typeof_str(current_table["target"])
 
-             if "CLIENT_DB" in target_columns_list:
 
-                 target_columns_list.remove("CLIENT_DB")
 
-                 target_columns_list.append("Client_DB")
 
-             target_columns = set(target_columns_list)
 
-             for client_db, prefix in cfg.clients.items():
 
-                 source_table = current_table["source"].format(prefix)
 
-                 if source_table not in source_tables:
 
-                     source_table2 = source_db.convert_table(source_table)
 
-                     if source_table2 not in source_tables:
 
-                         f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
 
-                         print(f"Quell-Tabelle '{source_table}' existiert nicht!")
 
-                         continue
 
-                 source_columns = set(source_db.get_columns(source_table))
 
-                 intersect = source_columns.intersection(target_columns)
 
-                 # print("Auf beiden Seiten: " + ";".join(intersect))
 
-                 diff1 = source_columns.difference(target_columns)
 
-                 if len(diff1) > 0:
 
-                     f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
 
-                 diff2 = target_columns.difference(source_columns)
 
-                 if "Client_DB" not in diff2:
 
-                     f.write("echo Spalte 'Client_DB' fehlt!\n")
 
-                     print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
 
-                     continue
 
-                 diff2.remove("Client_DB")
 
-                 if len(diff2) > 0:
 
-                     f.write("rem Nur in Ziel:   " + ";".join(diff2) + "\n")
 
-                 if not pd.isnull(current_table["query"]):
 
-                     select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
 
-                 elif "." in source_table or cfg.source_dsn.schema == "":
 
-                     if source_table[0] != "[":
 
-                         source_table = f"[{source_table}]"
 
-                     select_query = f"SELECT T1.* FROM {source_table} T1 "
 
-                 else:
 
-                     select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
 
-                 if not pd.isnull(current_table["filter"]):
 
-                     select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
 
-                 # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
 
-                 select_columns = ""
 
-                 for col, is_char_type in zip(target_columns_list, target_column_types):
 
-                     if col in intersect:
 
-                         if is_char_type:
 
-                             select_columns += f"dbo.cln(T1.[{col}]), "
 
-                         else:
 
-                             select_columns += f"T1.[{col}], "
 
-                     elif col == "Client_DB":
 
-                         select_columns += f"'{client_db}' as \\\"Client_DB\\\", "
 
-                     else:
 
-                         select_columns += "'' as \\\"" + col + '\\", '
 
-                 select_query = select_query.replace("T1.*", select_columns[:-2])
 
-                 if "timestamp" in source_columns:
 
-                     select_query += " ORDER BY T1.[timestamp] "
 
-                 else:
 
-                     print(current_table["target"] + " hat kein timestamp-Feld")
 
-                 pkey = target_db.get_pkey(current_table["target"])
 
-                 if len(pkey) == 0:
 
-                     print(current_table["target"] + " hat keinen Primaerschluessel")
 
-                 select_query = select_query.replace("%", "%%")  # batch-Problem
 
-                 stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
 
-                 logfile = f"{cfg.logs_dir}\\{current_table['target']}_{client_db}"
 
-                 # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
 
-                 # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
 
-                 # print(select_query)
 
-                 bulk_copy = "bcp" if cfg.source_dsn.driver == "mssql" else "cet"
 
-                 f.write(
 
-                     f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params} -c -C 65001 -m 1000 '
 
-                     + f'-e "{logfile}.queryout.log" > "{logfile}.bcp1.log" \n'
 
-                 )
 
-                 f.write(f'type "{logfile}.bcp1.log" | findstr -v "1000" \n')
 
-                 f.write(
 
-                     f"bcp [{cfg.target_dsn.schema}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params} "
 
-                     + f'-c -C 65001 -m 1000 -e "{logfile}.in.log" > "{logfile}.bcp2.log" \n'
 
-                 )
 
-                 f.write(f'type "{logfile}.bcp2.log" | findstr -v "1000" \n')
 
-                 f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
 
-     with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
 
-         f.write("@echo off & cd /d %~dp0 \n")
 
-         f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
 
-         for index, current_table in config.iterrows():
 
-             f.write(f"echo =={current_table['target']}==\n")
 
-             f.write(f"echo {current_table['target']} >CON \n")
 
-             f.write(f"call {current_table['target']}.bat\n\n")
 
- if __name__ == "__main__":
 
-     create()
 
 
  |