123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- import io
- import json
- import os
- from dataclasses import dataclass, field
- from functools import cached_property
- from pathlib import Path
- import pandas as pd
- import pyodbc
- from sqlalchemy import Engine, MetaData, create_engine
- @dataclass
- class DsnConfig:
- user: str = "sa"
- password: str = "Mffu3011#"
- server: str = "LOCALHOST\\GLOBALCUBE"
- database: str = "CARLO"
- driver: str = "mssql"
- schema: str = "import"
- def conn_ini(self, db_type: str) -> str:
- return "\n".join(
- [
- f'{db_type}_SERVER="{self.server}"',
- f'{db_type}_USER="{self.user}"',
- f'{db_type}_PASSWORD="{self.password}"',
- f'{db_type}_DATABASE="{self.database}"',
- ]
- )
- class DatabaseInspect:
- _cursor: pyodbc.Cursor = None
- _sqlalchemy_engine: Engine = None
- def __init__(self, dsn: DsnConfig, source=False):
- self.dsn = dsn
- self.type = "SOURCE" if source else "DEST"
- @property
- def conn_string(self) -> str:
- if self.dsn.driver == "mssql":
- return ";".join(
- [
- "Driver={SQL Server Native Client 11.0}",
- f"Server={self.dsn.server}",
- f"Database={self.dsn.database}",
- f"Uid={self.dsn.user}",
- f"Pwd={self.dsn.password}",
- ]
- )
- if self.dsn.driver == "mysql":
- return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
- return ";".join(
- [
- "Driver={PostgreSQL Unicode}",
- f"Server={self.dsn.server}",
- "Port=5432",
- f"Database={self.dsn.database}",
- f"Uid={self.dsn.user}",
- f"Pwd={self.dsn.password}",
- ]
- )
- # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
- @property
- def conn_string_sqlalchemy(self) -> str:
- if self.dsn.driver == "mssql":
- return (
- f"mssql+pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?"
- "driver=SQL+Server+Native+Client+11.0"
- )
- if self.dsn.driver == "mysql":
- return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
- return f"pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?driver={self.dsn.driver}"
- @property
- def bcp_conn_params(self) -> str:
- return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
- @property
- def cursor(self) -> pyodbc.Cursor:
- if not self._cursor:
- self._cursor = self.connect()
- return self._cursor
- @property
- def sqlalchemy_engine(self) -> Engine:
- if not self._sqlalchemy_engine:
- self._sqlalchemy_engine = create_engine(self.conn_string_sqlalchemy)
- return self._sqlalchemy_engine
- def connect(self) -> pyodbc.Cursor:
- c = pyodbc.connect(self.conn_string)
- return c.cursor()
- @cached_property
- def tables_list(self) -> list[str]:
- tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
- views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
- return tables + views
- def get_prefix(self) -> dict[str, str]:
- source_tables_prefix = dict(
- enumerate(sorted(list(set([t.split("$")[0] for t in self.tables_list if "$" in t]))), 1)
- )
- if len(source_tables_prefix) == 0:
- q = self.cursor.execute("select name FROM sys.databases")
- source_tables_prefix = [x[0] for x in q.fetchall()]
- return source_tables_prefix
- def get_columns(self, table: str) -> list[str]:
- source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
- if len(source_insp_cols) == 0:
- q = self.cursor.execute(
- "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
- + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
- )
- source_insp_cols = [col[0] for col in q.fetchall()]
- return source_insp_cols
- def get_columns_is_typeof_str(self, table: str) -> list[str]:
- source_insp_cols = [
- col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
- ]
- if len(source_insp_cols) == 0:
- q = self.cursor.execute(
- "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
- + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
- )
- source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
- return source_insp_cols
- def get_pkey(self, table: str, catalog: str) -> list[str]:
- source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table, catalog=catalog)]
- if len(source_insp_cols) == 0:
- self.cursor.execute(f"USE {catalog}")
- q = self.cursor.execute(
- "SELECT COLUMN_NAME "
- "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
- "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
- f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'"
- )
- source_insp_cols = [col[0] for col in q.fetchall()]
- self.cursor.execute(f"USE {self.dsn.database}")
- return source_insp_cols
- def convert_table(self, table: str) -> str:
- if "." in table:
- table = table.split(".")[-1]
- if "[" in table:
- table = table[1:-1]
- return table
- @dataclass
- class DbCreateConfig:
- name: str = "CARLO"
- csv_file: str = "..\\config\\CARLO.csv"
- clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
- filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
- source_dsn: DsnConfig = None
- dest_dsn: DsnConfig = None
- temp_db: str = "CARLOX"
- stage_dir: str = "..\\temp"
- batch_dir: str = "..\\batch"
- logs_dir: str = "..\\logs"
- scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
- source_inspect: DatabaseInspect = None
- dest_inspect: DatabaseInspect = None
- _sqlalchemy_engine: Engine = None
- @property
- def conn_ini(self) -> str:
- return "\n".join(
- [
- f'SQL_TEMP="{self.stage_dir}"',
- f'SQL_BATCH="{self.batch_dir}"',
- f'SQL_LOGS="{self.logs_dir}"',
- ]
- )
- @property
- def temp_db_sqlalchemy_engine(self) -> Engine:
- if not self._sqlalchemy_engine:
- self._sqlalchemy_engine = create_engine(
- f"mssql+pyodbc://{self.dest_dsn.user}:{self.dest_dsn.password}@{self.dest_dsn.server}/{self.temp_db}?"
- "driver=SQL+Server+Native+Client+11.0"
- )
- source_meta = MetaData()
- source_meta.reflect(bind=self._sqlalchemy_engine, schema="temp")
- return self._sqlalchemy_engine
- @staticmethod
- def load_config(config_file: str):
- cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
- base_dir = Path(config_file).resolve().parent
- cfg_import["name"] = Path(config_file).stem
- if "logs_dir" not in cfg_import:
- cfg_import["logs_dir"] = "..\\logs"
- if "scripts_dir" not in cfg_import:
- cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
- if "target_dsn" in cfg_import:
- cfg_import["dest_dsn"] = cfg_import["target_dsn"]
- del cfg_import["target_dsn"]
- for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
- if cfg_import[folder].startswith(".."):
- cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
- os.makedirs(cfg_import[folder], exist_ok=True)
- for folder in ["source", "dest", "diff"]:
- os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
- if ":" not in cfg_import["csv_file"]:
- cfg_import["csv_file"] = str((base_dir / cfg_import["csv_file"]).resolve())
- cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
- cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
- cfg = DbCreateConfig(**cfg_import)
- cfg.source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
- cfg.dest_inspect = DatabaseInspect(cfg.dest_dsn, source=False)
- DbCreateConfig._cfg = cfg
- return cfg
- @staticmethod
- def get_instance():
- return DbCreateConfig._cfg
- def create_db_ini(self) -> None:
- with open(self.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
- fwh.write(self.conn_ini)
- fwh.write("\n\n")
- fwh.write(self.source_dsn.conn_ini("SOURCE"))
- fwh.write("\n\n")
- fwh.write(self.dest_dsn.conn_ini("DEST"))
- fwh.write("\n")
- @dataclass
- class SourceTable:
- source: str
- client_db: str
- prefix: str
- @property
- def stage_csv(self) -> str:
- return ""
- @property
- def table_client(self) -> str:
- return ""
- @property
- def table_name(self) -> str:
- return ""
- @property
- def select_query(self) -> str:
- return ""
- @dataclass
- class DestTable:
- source: str
- dest: str
- dest_db: str
- filter_: str
- query: str
- iterative: str
- cols: str
- source_tables: list[SourceTable] = None
- dest_inspect: DatabaseInspect = None
- cfg: DbCreateConfig = None
- @property
- def table_batch_file(self) -> str:
- return f"{self.cfg.batch_dir}/{self.dest}.bat"
- @property
- def full_table_name(self) -> str:
- return f"[{self.dest_db}].[{self.cfg.dest_dsn.schema}].[{self.dest}]"
- @property
- def temp_table_name(self) -> str:
- return f"[{self.cfg.temp_db}].[temp].[{self.dest}]"
- @cached_property
- def columns_list(self) -> list[str]:
- res = self.dest_inspect.get_columns(self.dest)
- if "CLIENT_DB" in res:
- res.remove("CLIENT_DB")
- res.append("Client_DB")
- return res
- @cached_property
- def column_types(self) -> list[str]:
- return self.dest_inspect.get_columns_is_typeof_str(self.dest)
- @cached_property
- def primary_key(self) -> list[str]:
- return self.dest_inspect.get_pkey(self.dest, self.dest_db)
- @property
- def insert_query(self) -> str:
- return f"INSERT INTO {self.full_table_name} with (TABLOCK) SELECT * FROM {self.temp_table_name} T1"
- @property
- def delete_query(self) -> str:
- # pkey = self.primary_key
- if len(self.primary_key) == 0:
- return ""
- pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
- pkey_join = " AND ".join(pkey_join_list)
- return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
- class SourceTable2(SourceTable):
- dest_table: DestTable
- cfg: DbCreateConfig
- _select_query: str = None
- source_inspect: DatabaseInspect = None
- info: str = ""
- @property
- def table_client(self) -> str:
- return f"{self.dest_table.dest}_{self.client_db}"
- @property
- def stage_csv(self) -> str:
- return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
- @property
- def table_name(self) -> str:
- return self.source.format(self.prefix)
- @cached_property
- def source_columns(self) -> set[str]:
- return set(self.source_inspect.get_columns(self.table_name))
- @cached_property
- def select_query(self):
- f = io.StringIO()
- # print("Auf beiden Seiten: " + ";".join(intersect))
- diff1 = self.source_columns.difference(self.dest_table.columns_list)
- if len(diff1) > 0:
- f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
- diff2 = set(self.dest_table.columns_list).difference(self.source_columns)
- if "Client_DB" not in diff2:
- f.write("echo Spalte 'Client_DB' fehlt!\n")
- return
- diff2.remove("Client_DB")
- if len(diff2) > 0:
- f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
- if not pd.isnull(self.dest_table.query):
- select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
- elif "." in self.table_name or self.cfg.source_dsn.schema == "":
- if self.table_name[0] != "[":
- self.table_name = f"[{self.table_name}]"
- select_query = f"SELECT T1.* FROM {self.table_name} T1 "
- else:
- select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
- if not pd.isnull(self.dest_table.filter_):
- select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
- elif "WHERE" not in select_query:
- select_query += " WHERE 1 = 1"
- if "timestamp" not in self.source_columns:
- print(self.dest_table.dest + " hat kein timestamp-Feld")
- self.info = f.getvalue()
- return select_query
- @property
- def select_query_with_columns(self) -> str:
- res = self.select_query.replace("T1.*", self.select_columns)
- if "timestamp" in self.source_columns:
- res += " ORDER BY T1.[timestamp] "
- return res
- @property
- def select_columns(self):
- intersect = self.source_columns.intersection(self.dest_table.columns_list)
- res = ""
- for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
- if col in intersect:
- if False and is_char_type: # vorerst deaktiviert
- res += f"dbo.cln(T1.[{col}]), "
- else:
- res += f"T1.[{col}], "
- elif col == "Client_DB":
- res += f"'{self.client_db}' as [Client_DB], "
- else:
- res += "'' as [" + col + "], "
- res = res[:-2]
- return res
|