|
@@ -1,57 +1,44 @@
|
|
|
-import pandas as pd
|
|
|
import json
|
|
|
+import os
|
|
|
+from dataclasses import dataclass, field
|
|
|
from pathlib import Path
|
|
|
-from collections import namedtuple
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
import pyodbc
|
|
|
|
|
|
-# from re import escape
|
|
|
-# from numpy import select
|
|
|
-# from dataclasses import dataclass
|
|
|
-
|
|
|
-
|
|
|
-DbCreateConfig = namedtuple(
|
|
|
- "DbCreateConfig",
|
|
|
- "name csv_file clients filter source_dsn target_dsn stage_dir batch_dir logs_dir",
|
|
|
-)
|
|
|
-DsnConfig = namedtuple("DsnConfig", "user password server database driver schema")
|
|
|
-
|
|
|
-cfg = DbCreateConfig(
|
|
|
- **{
|
|
|
- "name": "CARLO",
|
|
|
- "csv_file": "CARLO.csv",
|
|
|
- "clients": {"1": "M und S Fahrzeughandel GmbH"},
|
|
|
- "filter": ["01.01.2018", "01.01.2019"],
|
|
|
- "source_dsn": {
|
|
|
- "user": "sa",
|
|
|
- "password": "Mffu3011#",
|
|
|
- "server": "GC-SERVER1\\GLOBALCUBE",
|
|
|
- "database": "DE0017",
|
|
|
- "driver": "mssql",
|
|
|
- "schema": "dbo",
|
|
|
- },
|
|
|
- "target_dsn": {
|
|
|
- "user": "sa",
|
|
|
- "password": "Mffu3011#",
|
|
|
- "server": "GC-SERVER1\\GLOBALCUBE",
|
|
|
- "database": "CARLO2",
|
|
|
- "driver": "mssql",
|
|
|
- "schema": "import",
|
|
|
- },
|
|
|
- "stage_dir": "..\\temp",
|
|
|
- "batch_dir": "..\\batch",
|
|
|
- "logs_dir": "..\\logs",
|
|
|
- }
|
|
|
-)
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class DsnConfig:
|
|
|
+ user: str = "sa"
|
|
|
+ password: str = "Mffu3011#"
|
|
|
+ server: str = "LOCALHOST\\GLOBALCUBE"
|
|
|
+ database: str = "CARLO"
|
|
|
+ driver: str = "mssql"
|
|
|
+ schema: str = "import"
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class DbCreateConfig:
|
|
|
+ name: str = "CARLO"
|
|
|
+ csv_file: str = "CARLO.csv"
|
|
|
+ clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
|
|
|
+ filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
|
|
|
+ source_dsn: DsnConfig = None
|
|
|
+ target_dsn: DsnConfig = None
|
|
|
+ stage_dir: str = "..\\temp"
|
|
|
+ batch_dir: str = "..\\batch"
|
|
|
+ logs_dir: str = "..\\logs"
|
|
|
|
|
|
|
|
|
class database_inspect:
|
|
|
tables = []
|
|
|
|
|
|
- def __init__(self, dsn, source=False):
|
|
|
- self.dsn = DsnConfig(**dsn)
|
|
|
+ def __init__(self, dsn: DsnConfig, source=False):
|
|
|
+ self.dsn = dsn
|
|
|
self.type = "SOURCE" if source else "DEST"
|
|
|
self.cursor = self.connect()
|
|
|
|
|
|
+ @property
|
|
|
def conn_string(self):
|
|
|
if self.dsn.driver == "mssql":
|
|
|
return ";".join(
|
|
@@ -77,6 +64,7 @@ class database_inspect:
|
|
|
)
|
|
|
# f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
|
|
|
|
|
|
+ @property
|
|
|
def conn_ini(self):
|
|
|
return "\r\n".join(
|
|
|
[
|
|
@@ -87,11 +75,12 @@ class database_inspect:
|
|
|
]
|
|
|
)
|
|
|
|
|
|
+ @property
|
|
|
def bcp_conn_params(self):
|
|
|
return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
|
|
|
|
|
|
def connect(self):
|
|
|
- c = pyodbc.connect(self.conn_string())
|
|
|
+ c = pyodbc.connect(self.conn_string)
|
|
|
return c.cursor()
|
|
|
|
|
|
def get_tables(self):
|
|
@@ -131,6 +120,18 @@ class database_inspect:
|
|
|
source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
|
|
|
return source_insp_cols
|
|
|
|
|
|
+ def get_pkey(self, table):
|
|
|
+ source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table)]
|
|
|
+ if len(source_insp_cols) == 0:
|
|
|
+ q = self.cursor.execute(
|
|
|
+ "SELECT COLUMN_NAME "
|
|
|
+ "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
|
|
|
+ "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
|
|
|
+ f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'"
|
|
|
+ )
|
|
|
+ source_insp_cols = [col[0] for col in q.fetchall()]
|
|
|
+ return source_insp_cols
|
|
|
+
|
|
|
def convert_table(self, table):
|
|
|
if "." in table:
|
|
|
table = table.split(".")[-1]
|
|
@@ -143,32 +144,37 @@ def load_config(config_file: str):
|
|
|
cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
|
|
|
base_dir = Path(config_file).resolve().parent
|
|
|
cfg_import["name"] = Path(config_file).stem
|
|
|
- if cfg_import["stage_dir"][:2] == "..":
|
|
|
- cfg_import["stage_dir"] = str(base_dir.joinpath(cfg_import["stage_dir"]).resolve())
|
|
|
- if cfg_import["batch_dir"][:2] == "..":
|
|
|
- cfg_import["batch_dir"] = str(base_dir.joinpath(cfg_import["batch_dir"]).resolve())
|
|
|
if "logs_dir" not in cfg_import:
|
|
|
cfg_import["logs_dir"] = "..\\logs"
|
|
|
- if cfg_import["batch_dir"][:2] == "..":
|
|
|
- cfg_import["batch_dir"] = str(base_dir.joinpath(cfg_import["logs_dir"]).resolve())
|
|
|
- return DbCreateConfig(**cfg_import)
|
|
|
|
|
|
+ for folder in ["stage_dir", "batch_dir", "logs_dir"]:
|
|
|
+ if cfg_import[folder].startswith(".."):
|
|
|
+ cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
|
|
|
+ os.makedirs(cfg_import[folder], exist_ok=True)
|
|
|
+ cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
|
|
|
+ cfg_import["target_dsn"] = DsnConfig(**cfg_import["target_dsn"])
|
|
|
+ return DbCreateConfig(**cfg_import)
|
|
|
|
|
|
-def create(config_file="dbtools/OPTIMA.json"): #
|
|
|
- cfg = load_config(config_file)
|
|
|
- base_dir = str(Path(cfg.batch_dir).parent)
|
|
|
|
|
|
- df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")
|
|
|
+def get_import_config(filename: str, db_name: str):
|
|
|
+ df = pd.read_csv(filename, sep=";", encoding="latin-1")
|
|
|
if "cols" not in df.columns:
|
|
|
- df["target_db"] = ""
|
|
|
+ df["target_db"] = db_name
|
|
|
df["cols"] = ""
|
|
|
- df.to_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")
|
|
|
- config = df[df["target"].notnull()]
|
|
|
- # print(config.head())
|
|
|
+ df[["source", "target", "target_db", "filter", "query", "iterative", "cols"]].to_csv(
|
|
|
+ filename, sep=";", encoding="latin-1", index=False
|
|
|
+ )
|
|
|
+ return df[df["target"].notnull()]
|
|
|
+
|
|
|
+
|
|
|
+def create(config_file: str = "database/CARLO.json"):
|
|
|
+ cfg = load_config(config_file)
|
|
|
+ base_dir = str(Path(cfg.batch_dir).parent)
|
|
|
+ config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.target_dsn.database)
|
|
|
|
|
|
source_db = database_inspect(cfg.source_dsn, source=True)
|
|
|
source_tables = source_db.get_tables()
|
|
|
- print(source_db.get_prefix())
|
|
|
+ print(json.dumps(source_db.get_prefix(), indent=2))
|
|
|
|
|
|
target_db = database_inspect(cfg.target_dsn)
|
|
|
target_tables = target_db.get_tables()
|
|
@@ -185,8 +191,8 @@ def create(config_file="dbtools/OPTIMA.json"): #
|
|
|
|
|
|
f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
|
|
|
f.write(
|
|
|
- f"sqlcmd.exe {target_db.bcp_conn_params()} -p "
|
|
|
- + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n"
|
|
|
+ f"sqlcmd.exe {target_db.bcp_conn_params} -p "
|
|
|
+ + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn.schema}].[{current_table['target']}]\" \n"
|
|
|
)
|
|
|
|
|
|
target_columns_list = target_db.get_columns(current_table["target"])
|
|
@@ -224,20 +230,20 @@ def create(config_file="dbtools/OPTIMA.json"): #
|
|
|
|
|
|
if not pd.isnull(current_table["query"]):
|
|
|
select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
|
|
|
- elif "." in source_table or cfg.source_dsn["schema"] == "":
|
|
|
+ elif "." in source_table or cfg.source_dsn.schema == "":
|
|
|
if source_table[0] != "[":
|
|
|
source_table = f"[{source_table}]"
|
|
|
select_query = f"SELECT T1.* FROM {source_table} T1 "
|
|
|
else:
|
|
|
- select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
|
|
|
+ select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
|
|
|
|
|
|
if not pd.isnull(current_table["filter"]):
|
|
|
select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
|
|
|
# select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
|
|
|
select_columns = ""
|
|
|
- for col, col_type in zip(target_columns_list, target_column_types):
|
|
|
+ for col, is_char_type in zip(target_columns_list, target_column_types):
|
|
|
if col in intersect:
|
|
|
- if col_type:
|
|
|
+ if is_char_type:
|
|
|
select_columns += f"dbo.cln(T1.[{col}]), "
|
|
|
else:
|
|
|
select_columns += f"T1.[{col}], "
|
|
@@ -247,24 +253,32 @@ def create(config_file="dbtools/OPTIMA.json"): #
|
|
|
select_columns += "'' as \\\"" + col + '\\", '
|
|
|
|
|
|
select_query = select_query.replace("T1.*", select_columns[:-2])
|
|
|
+ if "timestamp" in source_columns:
|
|
|
+ select_query += " ORDER BY T1.[timestamp] "
|
|
|
+ else:
|
|
|
+ print(current_table["target"] + " hat kein timestamp-Feld")
|
|
|
+ pkey = target_db.get_pkey(current_table["target"])
|
|
|
+ if len(pkey) == 0:
|
|
|
+ print(current_table["target"] + " hat keinen Primaerschluessel")
|
|
|
select_query = select_query.replace("%", "%%") # batch-Problem
|
|
|
|
|
|
stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
|
|
|
+ logfile = f"{cfg.logs_dir}\\{current_table['target']}_{client_db}"
|
|
|
# insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
|
|
|
# ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
|
|
|
|
|
|
# print(select_query)
|
|
|
- bulk_copy = "bcp" if cfg.source_dsn["driver"] == "mssql" else "cet"
|
|
|
+ bulk_copy = "bcp" if cfg.source_dsn.driver == "mssql" else "cet"
|
|
|
f.write(
|
|
|
- f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 '
|
|
|
- + f'-e "{stage_csv[:-4]}.queryout.log" > "{stage_csv[:-4]}.bcp1.log" \n'
|
|
|
+ f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params} -c -C 65001 -m 1000 '
|
|
|
+ + f'-e "{logfile}.queryout.log" > "{logfile}.bcp1.log" \n'
|
|
|
)
|
|
|
- f.write(f'type "{stage_csv[:-4]}.bcp1.log" | findstr -v "1000" \n')
|
|
|
+ f.write(f'type "{logfile}.bcp1.log" | findstr -v "1000" \n')
|
|
|
f.write(
|
|
|
- f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} "
|
|
|
- + f'-c -C 65001 -m 1000 -e "{stage_csv[:-4]}.in.log" > "{stage_csv[:-4]}.bcp2.log" \n'
|
|
|
+ f"bcp [{cfg.target_dsn.schema}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params} "
|
|
|
+ + f'-c -C 65001 -m 1000 -e "{logfile}.in.log" > "{logfile}.bcp2.log" \n'
|
|
|
)
|
|
|
- f.write(f'type "{stage_csv[:-4]}.bcp2.log" | findstr -v "1000" \n')
|
|
|
+ f.write(f'type "{logfile}.bcp2.log" | findstr -v "1000" \n')
|
|
|
|
|
|
f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
|
|
|
|