import io import json import os from dataclasses import dataclass, field from functools import cached_property from pathlib import Path import pandas as pd import pyodbc from sqlalchemy import Engine, MetaData, create_engine @dataclass class DsnConfig: user: str = "sa" password: str = "Mffu3011#" server: str = "LOCALHOST\\GLOBALCUBE" database: str = "CARLO" driver: str = "mssql" schema: str = "import" def conn_ini(self, db_type: str) -> str: return "\n".join( [ f'{db_type}_SERVER="{self.server}"', f'{db_type}_USER="{self.user}"', f'{db_type}_PASSWORD="{self.password}"', f'{db_type}_DATABASE="{self.database}"', ] ) class DatabaseInspect: _cursor: pyodbc.Cursor = None _sqlalchemy_engine: Engine = None def __init__(self, dsn: DsnConfig, source=False): self.dsn = dsn self.type = "SOURCE" if source else "DEST" @property def conn_string(self) -> str: if self.dsn.driver == "mssql": return ";".join( [ "Driver={SQL Server Native Client 11.0}", f"Server={self.dsn.server}", f"Database={self.dsn.database}", f"Uid={self.dsn.user}", f"Pwd={self.dsn.password}", ] ) if self.dsn.driver == "mysql": return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4" return ";".join( [ "Driver={PostgreSQL Unicode}", f"Server={self.dsn.server}", "Port=5432", f"Database={self.dsn.database}", f"Uid={self.dsn.user}", f"Pwd={self.dsn.password}", ] ) # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}" @property def conn_string_sqlalchemy(self) -> str: if self.dsn.driver == "mssql": return ( f"mssql+pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?" "driver=SQL+Server+Native+Client+11.0" ) if self.dsn.driver == "mysql": return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4" return f"pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?driver={self.dsn.driver}" @property def bcp_conn_params(self) -> str: return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}" @property def cursor(self) -> pyodbc.Cursor: if not self._cursor: self._cursor = self.connect() return self._cursor @property def sqlalchemy_engine(self) -> Engine: if not self._sqlalchemy_engine: self._sqlalchemy_engine = create_engine(self.conn_string_sqlalchemy) return self._sqlalchemy_engine def connect(self) -> pyodbc.Cursor: c = pyodbc.connect(self.conn_string) return c.cursor() @cached_property def tables_list(self) -> list[str]: tables = [x[2] for x in self.cursor.tables(tableType="TABLE")] views = [x[2] for x in self.cursor.tables(tableType="VIEW")] return tables + views def get_prefix(self) -> dict[str, str]: source_tables_prefix = dict( enumerate(sorted(list(set([t.split("$")[0] for t in self.tables_list if "$" in t]))), 1) ) if len(source_tables_prefix) == 0: q = self.cursor.execute("select name FROM sys.databases") source_tables_prefix = [x[0] for x in q.fetchall()] return source_tables_prefix def get_columns(self, table: str) -> list[str]: source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)] if len(source_insp_cols) == 0: q = self.cursor.execute( "SELECT COLUMN_NAME as column_name FROM information_schema.columns " + f"WHERE TABLE_NAME = '{self.convert_table(table)}'" ) source_insp_cols = [col[0] for col in q.fetchall()] return source_insp_cols def get_columns_is_typeof_str(self, table: str) -> list[str]: source_insp_cols = [ col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table) ] if len(source_insp_cols) == 0: q = self.cursor.execute( "SELECT COLLATION_NAME as column_collation FROM information_schema.columns " + f"WHERE TABLE_NAME = '{self.convert_table(table)}'" ) source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()] return source_insp_cols def get_pkey(self, table: str, catalog: str) -> list[str]: source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table, catalog=catalog)] if len(source_insp_cols) == 0: self.cursor.execute(f"USE {catalog}") q = self.cursor.execute( "SELECT COLUMN_NAME " "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE " "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 " f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'" ) source_insp_cols = [col[0] for col in q.fetchall()] self.cursor.execute(f"USE {self.dsn.database}") return source_insp_cols def convert_table(self, table: str) -> str: if "." in table: table = table.split(".")[-1] if "[" in table: table = table[1:-1] return table @dataclass class DbCreateConfig: name: str = "CARLO" csv_file: str = "..\\config\\CARLO.csv" clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"}) filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],) source_dsn: DsnConfig = None dest_dsn: DsnConfig = None temp_db: str = "CARLOX" stage_dir: str = "..\\temp" batch_dir: str = "..\\batch" logs_dir: str = "..\\logs" scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts" source_inspect: DatabaseInspect = None dest_inspect: DatabaseInspect = None _sqlalchemy_engine: Engine = None @property def conn_ini(self) -> str: return "\n".join( [ f'SQL_TEMP="{self.stage_dir}"', f'SQL_BATCH="{self.batch_dir}"', f'SQL_LOGS="{self.logs_dir}"', ] ) @property def temp_db_sqlalchemy_engine(self) -> Engine: if not self._sqlalchemy_engine: self._sqlalchemy_engine = create_engine( f"mssql+pyodbc://{self.dest_dsn.user}:{self.dest_dsn.password}@{self.dest_dsn.server}/{self.temp_db}?" "driver=SQL+Server+Native+Client+11.0" ) source_meta = MetaData() source_meta.reflect(bind=self._sqlalchemy_engine, schema="temp") return self._sqlalchemy_engine @staticmethod def load_config(config_file: str): cfg_import = json.load(open(config_file, "r", encoding="latin-1")) base_dir = Path(config_file).resolve().parent cfg_import["name"] = Path(config_file).stem if "logs_dir" not in cfg_import: cfg_import["logs_dir"] = "..\\logs" if "scripts_dir" not in cfg_import: cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts" if "target_dsn" in cfg_import: cfg_import["dest_dsn"] = cfg_import["target_dsn"] del cfg_import["target_dsn"] for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]: if cfg_import[folder].startswith(".."): cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve()) os.makedirs(cfg_import[folder], exist_ok=True) for folder in ["source", "dest", "diff"]: os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True) if ":" not in cfg_import["csv_file"]: cfg_import["csv_file"] = str((base_dir / cfg_import["csv_file"]).resolve()) cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"]) cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"]) cfg = DbCreateConfig(**cfg_import) cfg.source_inspect = DatabaseInspect(cfg.source_dsn, source=True) cfg.dest_inspect = DatabaseInspect(cfg.dest_dsn, source=False) DbCreateConfig._cfg = cfg return cfg @staticmethod def get_instance(): return DbCreateConfig._cfg def create_db_ini(self) -> None: with open(self.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh: fwh.write(self.conn_ini) fwh.write("\n\n") fwh.write(self.source_dsn.conn_ini("SOURCE")) fwh.write("\n\n") fwh.write(self.dest_dsn.conn_ini("DEST")) fwh.write("\n") @dataclass class SourceTable: source: str client_db: str prefix: str @property def stage_csv(self) -> str: return "" @property def table_client(self) -> str: return "" @property def table_name(self) -> str: return "" @property def select_query(self) -> str: return "" @dataclass class DestTable: source: str dest: str dest_db: str filter_: str query: str iterative: str cols: str source_tables: list[SourceTable] = None dest_inspect: DatabaseInspect = None cfg: DbCreateConfig = None @property def table_batch_file(self) -> str: return f"{self.cfg.batch_dir}/{self.dest}.bat" @property def full_table_name(self) -> str: return f"[{self.dest_db}].[{self.cfg.dest_dsn.schema}].[{self.dest}]" @property def temp_table_name(self) -> str: return f"[{self.cfg.temp_db}].[temp].[{self.dest}]" @cached_property def columns_list(self) -> list[str]: res = self.dest_inspect.get_columns(self.dest) if "CLIENT_DB" in res: res.remove("CLIENT_DB") res.append("Client_DB") return res @cached_property def column_types(self) -> list[str]: return self.dest_inspect.get_columns_is_typeof_str(self.dest) @cached_property def primary_key(self) -> list[str]: return self.dest_inspect.get_pkey(self.dest, self.dest_db) @property def insert_query(self) -> str: return f"INSERT INTO {self.full_table_name} with (TABLOCK) SELECT * FROM {self.temp_table_name} T1" @property def delete_query(self) -> str: # pkey = self.primary_key if len(self.primary_key) == 0: return "" pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key] pkey_join = " AND ".join(pkey_join_list) return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}" class SourceTable2(SourceTable): dest_table: DestTable cfg: DbCreateConfig _select_query: str = None source_inspect: DatabaseInspect = None info: str = "" @property def table_client(self) -> str: return f"{self.dest_table.dest}_{self.client_db}" @property def stage_csv(self) -> str: return f"{self.cfg.stage_dir}\\{self.table_client}.csv" @property def table_name(self) -> str: return self.source.format(self.prefix) @cached_property def source_columns(self) -> set[str]: return set(self.source_inspect.get_columns(self.table_name)) @cached_property def select_query(self): f = io.StringIO() # print("Auf beiden Seiten: " + ";".join(intersect)) diff1 = self.source_columns.difference(self.dest_table.columns_list) if len(diff1) > 0: f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n") diff2 = set(self.dest_table.columns_list).difference(self.source_columns) if "Client_DB" not in diff2: f.write("echo Spalte 'Client_DB' fehlt!\n") return diff2.remove("Client_DB") if len(diff2) > 0: f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n") if not pd.isnull(self.dest_table.query): select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1]) elif "." in self.table_name or self.cfg.source_dsn.schema == "": if self.table_name[0] != "[": self.table_name = f"[{self.table_name}]" select_query = f"SELECT T1.* FROM {self.table_name} T1 " else: select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 " if not pd.isnull(self.dest_table.filter_): select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1]) elif "WHERE" not in select_query: select_query += " WHERE 1 = 1" if "timestamp" not in self.source_columns: print(self.dest_table.dest + " hat kein timestamp-Feld") self.info = f.getvalue() return select_query @property def select_query_with_columns(self) -> str: res = self.select_query.replace("T1.*", self.select_columns) if "timestamp" in self.source_columns: res += " ORDER BY T1.[timestamp] " return res @property def select_columns(self): intersect = self.source_columns.intersection(self.dest_table.columns_list) res = "" for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types): if col in intersect: if False and is_char_type: # vorerst deaktiviert res += f"dbo.cln(T1.[{col}]), " else: res += f"T1.[{col}], " elif col == "Client_DB": res += f"'{self.client_db}' as [Client_DB], " else: res += "'' as [" + col + "], " res = res[:-2] return res