|
@@ -1,9 +1,11 @@
|
|
|
+import io
|
|
|
import json
|
|
|
import os
|
|
|
from dataclasses import dataclass, field
|
|
|
from functools import cached_property
|
|
|
from pathlib import Path
|
|
|
|
|
|
+import pandas as pd
|
|
|
import pyodbc
|
|
|
from sqlalchemy import Engine, create_engine
|
|
|
|
|
@@ -28,30 +30,6 @@ class DsnConfig:
|
|
|
)
|
|
|
|
|
|
|
|
|
-@dataclass
|
|
|
-class DbCreateConfig:
|
|
|
- name: str = "CARLO"
|
|
|
- csv_file: str = "CARLO.csv"
|
|
|
- clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
|
|
|
- filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
|
|
|
- source_dsn: DsnConfig = None
|
|
|
- dest_dsn: DsnConfig = None
|
|
|
- temp_db: str = "CARLOX"
|
|
|
- stage_dir: str = "..\\temp"
|
|
|
- batch_dir: str = "..\\batch"
|
|
|
- logs_dir: str = "..\\logs"
|
|
|
- scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
|
|
|
-
|
|
|
- def conn_ini(self) -> str:
|
|
|
- return "\n".join(
|
|
|
- [
|
|
|
- f'SQL_TEMP="{self.stage_dir}"',
|
|
|
- f'SQL_BATCH="{self.batch_dir}"',
|
|
|
- f'SQL_LOGS="{self.logs_dir}"',
|
|
|
- ]
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
class DatabaseInspect:
|
|
|
_cursor: pyodbc.Cursor = None
|
|
|
_sqlalchemy_engine: Engine = None
|
|
@@ -176,34 +154,222 @@ class DatabaseInspect:
|
|
|
return table
|
|
|
|
|
|
|
|
|
-def load_config(config_file: str) -> DbCreateConfig:
|
|
|
- cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
|
|
|
- base_dir = Path(config_file).resolve().parent
|
|
|
- cfg_import["name"] = Path(config_file).stem
|
|
|
- if "logs_dir" not in cfg_import:
|
|
|
- cfg_import["logs_dir"] = "..\\logs"
|
|
|
- if "scripts_dir" not in cfg_import:
|
|
|
- cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
|
|
|
- if "target_dsn" in cfg_import:
|
|
|
- cfg_import["dest_dsn"] = cfg_import["target_dsn"]
|
|
|
- del cfg_import["target_dsn"]
|
|
|
-
|
|
|
- for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
|
|
|
- if cfg_import[folder].startswith(".."):
|
|
|
- cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
|
|
|
- os.makedirs(cfg_import[folder], exist_ok=True)
|
|
|
- for folder in ["source", "dest", "diff"]:
|
|
|
- os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
|
|
|
- cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
|
|
|
- cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
|
|
|
- return DbCreateConfig(**cfg_import)
|
|
|
-
|
|
|
-
|
|
|
-def create_db_ini(cfg: DbCreateConfig) -> None:
|
|
|
- with open(cfg.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
|
|
|
- fwh.write(cfg.conn_ini())
|
|
|
- fwh.write("\n\n")
|
|
|
- fwh.write(cfg.source_dsn.conn_ini("SOURCE"))
|
|
|
- fwh.write("\n\n")
|
|
|
- fwh.write(cfg.dest_dsn.conn_ini("DEST"))
|
|
|
- fwh.write("\n")
|
|
|
+@dataclass
|
|
|
+class DbCreateConfig:
|
|
|
+ name: str = "CARLO"
|
|
|
+ csv_file: str = "..\\config\\CARLO.csv"
|
|
|
+ clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
|
|
|
+ filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
|
|
|
+ source_dsn: DsnConfig = None
|
|
|
+ dest_dsn: DsnConfig = None
|
|
|
+ temp_db: str = "CARLOX"
|
|
|
+ stage_dir: str = "..\\temp"
|
|
|
+ batch_dir: str = "..\\batch"
|
|
|
+ logs_dir: str = "..\\logs"
|
|
|
+ scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
|
|
|
+
|
|
|
+ source_inspect: DatabaseInspect = None
|
|
|
+ dest_inspect: DatabaseInspect = None
|
|
|
+
|
|
|
+ @property
|
|
|
+ def conn_ini(self) -> str:
|
|
|
+ return "\n".join(
|
|
|
+ [
|
|
|
+ f'SQL_TEMP="{self.stage_dir}"',
|
|
|
+ f'SQL_BATCH="{self.batch_dir}"',
|
|
|
+ f'SQL_LOGS="{self.logs_dir}"',
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def load_config(config_file: str):
|
|
|
+ cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
|
|
|
+ base_dir = Path(config_file).resolve().parent
|
|
|
+ cfg_import["name"] = Path(config_file).stem
|
|
|
+ if "logs_dir" not in cfg_import:
|
|
|
+ cfg_import["logs_dir"] = "..\\logs"
|
|
|
+ if "scripts_dir" not in cfg_import:
|
|
|
+ cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
|
|
|
+ if "target_dsn" in cfg_import:
|
|
|
+ cfg_import["dest_dsn"] = cfg_import["target_dsn"]
|
|
|
+ del cfg_import["target_dsn"]
|
|
|
+
|
|
|
+ for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
|
|
|
+ if cfg_import[folder].startswith(".."):
|
|
|
+ cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
|
|
|
+ os.makedirs(cfg_import[folder], exist_ok=True)
|
|
|
+ for folder in ["source", "dest", "diff"]:
|
|
|
+ os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
|
|
|
+ if ":" not in cfg_import["csv_file"]:
|
|
|
+ cfg_import["csv_file"] = str((base_dir / cfg_import["csv_file"]).resolve())
|
|
|
+ cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
|
|
|
+ cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
|
|
|
+ cfg = DbCreateConfig(**cfg_import)
|
|
|
+ cfg.source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
|
|
|
+ cfg.dest_inspect = DatabaseInspect(cfg.dest_dsn, source=False)
|
|
|
+ DbCreateConfig._cfg = cfg
|
|
|
+ return cfg
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_instance():
|
|
|
+ return DbCreateConfig._cfg
|
|
|
+
|
|
|
+ def create_db_ini(self) -> None:
|
|
|
+ with open(self.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
|
|
|
+ fwh.write(self.conn_ini)
|
|
|
+ fwh.write("\n\n")
|
|
|
+ fwh.write(self.source_dsn.conn_ini("SOURCE"))
|
|
|
+ fwh.write("\n\n")
|
|
|
+ fwh.write(self.dest_dsn.conn_ini("DEST"))
|
|
|
+ fwh.write("\n")
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class SourceTable:
|
|
|
+ source: str
|
|
|
+ client_db: str
|
|
|
+ prefix: str
|
|
|
+
|
|
|
+ @property
|
|
|
+ def stage_csv(self) -> str:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ @property
|
|
|
+ def table_client(self) -> str:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ @property
|
|
|
+ def table_name(self) -> str:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ @property
|
|
|
+ def select_query(self) -> str:
|
|
|
+ return ""
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class DestTable:
|
|
|
+ source: str
|
|
|
+ dest: str
|
|
|
+ dest_db: str
|
|
|
+ filter_: str
|
|
|
+ query: str
|
|
|
+ iterative: str
|
|
|
+ cols: str
|
|
|
+ source_tables: list[SourceTable] = None
|
|
|
+ dest_inspect: DatabaseInspect = None
|
|
|
+ cfg: DbCreateConfig = None
|
|
|
+
|
|
|
+ @property
|
|
|
+ def table_batch_file(self) -> str:
|
|
|
+ return f"{self.cfg.batch_dir}/{self.dest}.bat"
|
|
|
+
|
|
|
+ @property
|
|
|
+ def full_table_name(self) -> str:
|
|
|
+ return f"[{self.dest_db}].[{self.cfg.dest_dsn.schema}].[{self.dest}]"
|
|
|
+
|
|
|
+ @property
|
|
|
+ def temp_table_name(self) -> str:
|
|
|
+ return f"[{self.cfg.temp_db}].[temp].[{self.dest}]"
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def columns_list(self) -> list[str]:
|
|
|
+ res = self.dest_inspect.get_columns(self.dest)
|
|
|
+ if "CLIENT_DB" in res:
|
|
|
+ res.remove("CLIENT_DB")
|
|
|
+ res.append("Client_DB")
|
|
|
+ return res
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def column_types(self) -> list[str]:
|
|
|
+ return self.dest_inspect.get_columns_is_typeof_str(self.dest)
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def primary_key(self) -> list[str]:
|
|
|
+ return self.dest_inspect.get_pkey(self.dest, self.dest_db)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def insert_query(self) -> str:
|
|
|
+ return f"INSERT INTO {self.full_table_name} SELECT * FROM {self.temp_table_name} T1"
|
|
|
+
|
|
|
+ @property
|
|
|
+ def delete_query(self) -> str:
|
|
|
+
|
|
|
+ if len(self.primary_key) == 0:
|
|
|
+ return ""
|
|
|
+ pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
|
|
|
+ pkey_join = " AND ".join(pkey_join_list)
|
|
|
+ return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
|
|
|
+
|
|
|
+
|
|
|
+class SourceTable2(SourceTable):
|
|
|
+ dest_table: DestTable
|
|
|
+ cfg: DbCreateConfig
|
|
|
+
|
|
|
+ _select_query: str = None
|
|
|
+ source_inspect: DatabaseInspect = None
|
|
|
+ info: str = ""
|
|
|
+
|
|
|
+ @property
|
|
|
+ def table_client(self) -> str:
|
|
|
+ return f"{self.dest_table.dest}_{self.client_db}"
|
|
|
+
|
|
|
+ @property
|
|
|
+ def stage_csv(self) -> str:
|
|
|
+ return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
|
|
|
+
|
|
|
+ @property
|
|
|
+ def table_name(self) -> str:
|
|
|
+ return self.source.format(self.prefix)
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def select_query(self):
|
|
|
+ f = io.StringIO()
|
|
|
+ source_columns = set(self.source_inspect.get_columns(self.table_name))
|
|
|
+
|
|
|
+ intersect = source_columns.intersection(self.dest_table.columns_list)
|
|
|
+
|
|
|
+ diff1 = source_columns.difference(self.dest_table.columns_list)
|
|
|
+ if len(diff1) > 0:
|
|
|
+ f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
|
|
|
+ diff2 = set(self.dest_table.columns_list).difference(source_columns)
|
|
|
+ if "Client_DB" not in diff2:
|
|
|
+ f.write("echo Spalte 'Client_DB' fehlt!\n")
|
|
|
+ return
|
|
|
+ diff2.remove("Client_DB")
|
|
|
+ if len(diff2) > 0:
|
|
|
+ f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
|
|
|
+
|
|
|
+ if not pd.isnull(self.dest_table.query):
|
|
|
+ select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
|
|
|
+ elif "." in self.table_name or self.cfg.source_dsn.schema == "":
|
|
|
+ if self.table_name[0] != "[":
|
|
|
+ self.table_name = f"[{self.table_name}]"
|
|
|
+ select_query = f"SELECT T1.* FROM {self.table_name} T1 "
|
|
|
+ else:
|
|
|
+ select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
|
|
|
+
|
|
|
+ if not pd.isnull(self.dest_table.filter_):
|
|
|
+ select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
|
|
|
+ elif "WHERE" not in select_query:
|
|
|
+ select_query += " WHERE 1 = 1"
|
|
|
+
|
|
|
+ select_columns = ""
|
|
|
+ for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
|
|
|
+ if col in intersect:
|
|
|
+ if False and is_char_type:
|
|
|
+ select_columns += f"dbo.cln(T1.[{col}]), "
|
|
|
+ else:
|
|
|
+ select_columns += f"T1.[{col}], "
|
|
|
+ elif col == "Client_DB":
|
|
|
+ select_columns += f"'{self.client_db}' as [Client_DB], "
|
|
|
+ else:
|
|
|
+ select_columns += "'' as [" + col + "], "
|
|
|
+
|
|
|
+ select_query = select_query.replace("T1.*", select_columns[:-2])
|
|
|
+ if "timestamp" in source_columns:
|
|
|
+ select_query += " ORDER BY T1.[timestamp] "
|
|
|
+ else:
|
|
|
+ print(self.dest_table.dest + " hat kein timestamp-Feld")
|
|
|
+ self.info = f.getvalue()
|
|
|
+ return select_query
|