|
|
@@ -1,6 +1,7 @@
|
|
|
import io
|
|
|
import json
|
|
|
import os
|
|
|
+from collections import Counter
|
|
|
from dataclasses import dataclass, field
|
|
|
from functools import cached_property
|
|
|
from pathlib import Path
|
|
|
@@ -114,12 +115,26 @@ class DatabaseInspect:
|
|
|
return tables + views
|
|
|
|
|
|
def get_prefix(self) -> dict[str, str]:
|
|
|
- source_tables_prefix = dict(
|
|
|
- enumerate(sorted(list(set([t.split("$")[0] for t in self.tables_list if "$" in t]))), 1)
|
|
|
- )
|
|
|
- if len(source_tables_prefix) == 0:
|
|
|
- q = self.cursor.execute("select name FROM sys.databases")
|
|
|
- source_tables_prefix = [x[0] for x in q.fetchall()]
|
|
|
+ prefix_count = Counter([t.split("$")[0] for t in self.tables_list if "$" in t])
|
|
|
+ source_tables_prefix = {}
|
|
|
+ for i, p in enumerate(prefix_count.most_common(), 1):
|
|
|
+ if p[1] > 10 and len(p[0]) > 0:
|
|
|
+ source_tables_prefix[str(i)] = p[0]
|
|
|
+
|
|
|
+ if len(source_tables_prefix) > 0:
|
|
|
+ return source_tables_prefix
|
|
|
+
|
|
|
+ q = self.cursor.execute("select name FROM sys.databases")
|
|
|
+ databases = list(sorted([x[0] for x in q.fetchall()]))
|
|
|
+ if "deop00" in databases:
|
|
|
+ # Special case for OPTIMA
|
|
|
+ databases.remove("deop00")
|
|
|
+ for i, p in enumerate(databases, 1):
|
|
|
+ if p.startswith("deop"):
|
|
|
+ source_tables_prefix[str(i)] = p
|
|
|
+ for i, p in enumerate(databases, len(source_tables_prefix.keys()) + 1):
|
|
|
+ if p.startswith("de") and p not in source_tables_prefix.values():
|
|
|
+ source_tables_prefix[str(i)] = p
|
|
|
return source_tables_prefix
|
|
|
|
|
|
def get_columns(self, table: str) -> list[str]:
|
|
|
@@ -178,6 +193,8 @@ class DbCreateConfig:
|
|
|
stage_dir: str = "..\\temp"
|
|
|
batch_dir: str = "..\\batch"
|
|
|
logs_dir: str = "..\\logs"
|
|
|
+ sql_import_full_dir: str = "..\\exec\\import_full"
|
|
|
+ sql_import_inc_dir: str = "..\\exec\\import_inc"
|
|
|
scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
|
|
|
|
|
|
source_inspect: DatabaseInspect = None
|
|
|
@@ -218,8 +235,19 @@ class DbCreateConfig:
|
|
|
if "target_dsn" in cfg_import:
|
|
|
cfg_import["dest_dsn"] = cfg_import["target_dsn"]
|
|
|
del cfg_import["target_dsn"]
|
|
|
-
|
|
|
- for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
|
|
|
+ if "sql_import_full_dir" not in cfg_import:
|
|
|
+ cfg_import["sql_import_full_dir"] = "..\\exec\\import_full"
|
|
|
+ if "sql_import_inc_dir" not in cfg_import:
|
|
|
+ cfg_import["sql_import_inc_dir"] = "..\\exec\\import_inc"
|
|
|
+
|
|
|
+ for folder in [
|
|
|
+ "stage_dir",
|
|
|
+ "batch_dir",
|
|
|
+ "logs_dir",
|
|
|
+ "scripts_dir",
|
|
|
+ "sql_import_full_dir",
|
|
|
+ "sql_import_inc_dir",
|
|
|
+ ]:
|
|
|
if cfg_import[folder].startswith(".."):
|
|
|
cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
|
|
|
os.makedirs(cfg_import[folder], exist_ok=True)
|
|
|
@@ -333,7 +361,22 @@ class SourceTable2(SourceTable):
|
|
|
|
|
|
_select_query: str = None
|
|
|
source_inspect: DatabaseInspect = None
|
|
|
- info: str = ""
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def info(self) -> str:
|
|
|
+ f = io.StringIO()
|
|
|
+ # print("Auf beiden Seiten: " + ";".join(intersect))
|
|
|
+ diff1 = self.source_columns.difference(self.dest_table.columns_list)
|
|
|
+ if len(diff1) > 0:
|
|
|
+ f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
|
|
|
+ diff2 = set(self.dest_table.columns_list).difference(self.source_columns)
|
|
|
+ if "Client_DB" not in diff2:
|
|
|
+ f.write("echo Spalte 'Client_DB' fehlt!\n")
|
|
|
+ return
|
|
|
+ diff2.remove("Client_DB")
|
|
|
+ if len(diff2) > 0:
|
|
|
+ f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
|
|
|
+ return f.getvalue()
|
|
|
|
|
|
@property
|
|
|
def table_client(self) -> str:
|
|
|
@@ -353,19 +396,6 @@ class SourceTable2(SourceTable):
|
|
|
|
|
|
@cached_property
|
|
|
def select_query(self):
|
|
|
- f = io.StringIO()
|
|
|
- # print("Auf beiden Seiten: " + ";".join(intersect))
|
|
|
- diff1 = self.source_columns.difference(self.dest_table.columns_list)
|
|
|
- if len(diff1) > 0:
|
|
|
- f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
|
|
|
- diff2 = set(self.dest_table.columns_list).difference(self.source_columns)
|
|
|
- if "Client_DB" not in diff2:
|
|
|
- f.write("echo Spalte 'Client_DB' fehlt!\n")
|
|
|
- return
|
|
|
- diff2.remove("Client_DB")
|
|
|
- if len(diff2) > 0:
|
|
|
- f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
|
|
|
-
|
|
|
if not pd.isnull(self.dest_table.query):
|
|
|
select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
|
|
|
elif "." in self.table_name or self.cfg.source_dsn.schema == "":
|
|
|
@@ -382,14 +412,13 @@ class SourceTable2(SourceTable):
|
|
|
|
|
|
if "timestamp" not in self.source_columns:
|
|
|
print(self.dest_table.dest + " hat kein timestamp-Feld")
|
|
|
- self.info = f.getvalue()
|
|
|
return select_query
|
|
|
|
|
|
@property
|
|
|
def select_query_with_columns(self) -> str:
|
|
|
res = self.select_query.replace("T1.*", self.select_columns)
|
|
|
- if "timestamp" in self.source_columns:
|
|
|
- res += " ORDER BY T1.[timestamp] "
|
|
|
+ # if "timestamp" in self.source_columns:
|
|
|
+ # res += " ORDER BY T1.[timestamp] "
|
|
|
return res
|
|
|
|
|
|
@property
|