|
@@ -1,31 +1,49 @@
|
|
|
import json
|
|
|
from collections import namedtuple
|
|
|
from pathlib import Path
|
|
|
-from re import escape
|
|
|
-from numpy import select
|
|
|
import pandas as pd
|
|
|
import pyodbc
|
|
|
-from dataclasses import dataclass
|
|
|
|
|
|
-
|
|
|
-DbCreateConfig = namedtuple('DbCreateConfig', 'name csv_file clients filter source_dsn target_dsn stage_dir batch_dir')
|
|
|
-DsnConfig = namedtuple('DsnConfig', 'user password server database driver schema')
|
|
|
-
|
|
|
-cfg = DbCreateConfig(**{
|
|
|
- 'name': 'CARLO',
|
|
|
- 'csv_file': 'CARLO.csv',
|
|
|
- 'clients': {'1': 'M und S Fahrzeughandel GmbH'},
|
|
|
- 'filter': ['01.01.2018', '01.01.2019'],
|
|
|
- 'source_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE',
|
|
|
- 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'},
|
|
|
- 'target_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE',
|
|
|
- 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'},
|
|
|
- 'stage_dir': '..\\temp',
|
|
|
- 'batch_dir': '..\\batch'
|
|
|
-})
|
|
|
-
|
|
|
-
|
|
|
-class database_inspect():
|
|
|
+# from re import escape
|
|
|
+# from numpy import select
|
|
|
+# from dataclasses import dataclass
|
|
|
+
|
|
|
+
|
|
|
+DbCreateConfig = namedtuple(
|
|
|
+ "DbCreateConfig",
|
|
|
+ "name csv_file clients filter source_dsn target_dsn stage_dir batch_dir",
|
|
|
+)
|
|
|
+DsnConfig = namedtuple("DsnConfig", "user password server database driver schema")
|
|
|
+
|
|
|
+cfg = DbCreateConfig(
|
|
|
+ **{
|
|
|
+ "name": "CARLO",
|
|
|
+ "csv_file": "CARLO.csv",
|
|
|
+ "clients": {"1": "M und S Fahrzeughandel GmbH"},
|
|
|
+ "filter": ["01.01.2018", "01.01.2019"],
|
|
|
+ "source_dsn": {
|
|
|
+ "user": "sa",
|
|
|
+ "password": "Mffu3011#",
|
|
|
+ "server": "GC-SERVER1\\GLOBALCUBE",
|
|
|
+ "database": "DE0017",
|
|
|
+ "driver": "mssql",
|
|
|
+ "schema": "dbo",
|
|
|
+ },
|
|
|
+ "target_dsn": {
|
|
|
+ "user": "sa",
|
|
|
+ "password": "Mffu3011#",
|
|
|
+ "server": "GC-SERVER1\\GLOBALCUBE",
|
|
|
+ "database": "CARLO2",
|
|
|
+ "driver": "mssql",
|
|
|
+ "schema": "import",
|
|
|
+ },
|
|
|
+ "stage_dir": "..\\temp",
|
|
|
+ "batch_dir": "..\\batch",
|
|
|
+ }
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+class database_inspect:
|
|
|
tables = []
|
|
|
|
|
|
def __init__(self, dsn):
|
|
@@ -33,10 +51,12 @@ class database_inspect():
|
|
|
self.cursor = self.connect()
|
|
|
|
|
|
def conn_string(self):
|
|
|
- if self.dsn.driver == 'mssql':
|
|
|
- return 'Driver={SQL Server Native Client 11.0};' + \
|
|
|
- f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}"
|
|
|
- if self.dsn.driver == 'mysql':
|
|
|
+ if self.dsn.driver == "mssql":
|
|
|
+ return (
|
|
|
+ "Driver={SQL Server Native Client 11.0};"
|
|
|
+ + f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}"
|
|
|
+ )
|
|
|
+ if self.dsn.driver == "mysql":
|
|
|
return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
|
|
|
return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
|
|
|
|
|
@@ -48,48 +68,58 @@ class database_inspect():
|
|
|
return c.cursor()
|
|
|
|
|
|
def get_tables(self):
|
|
|
- tables = [x[2] for x in self.cursor.tables(tableType='TABLE')]
|
|
|
- views = [x[2] for x in self.cursor.tables(tableType='VIEW')]
|
|
|
+ tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
|
|
|
+ views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
|
|
|
self.tables = tables + views
|
|
|
return self.tables
|
|
|
|
|
|
def get_prefix(self):
|
|
|
if (len(self.tables)) == 0:
|
|
|
self.get_tables()
|
|
|
- source_tables_prefix = dict(enumerate(sorted(list(set([t.split('$')[0] for t in self.tables if '$' in t]))), 1))
|
|
|
+ source_tables_prefix = dict(
|
|
|
+ enumerate(
|
|
|
+ sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1
|
|
|
+ )
|
|
|
+ )
|
|
|
if len(source_tables_prefix) == 0:
|
|
|
- q = self.cursor.execute('select name FROM sys.databases')
|
|
|
+ q = self.cursor.execute("select name FROM sys.databases")
|
|
|
source_tables_prefix = [x[0] for x in q.fetchall()]
|
|
|
return source_tables_prefix
|
|
|
|
|
|
def get_columns(self, table):
|
|
|
source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
|
|
|
if len(source_insp_cols) == 0:
|
|
|
- q = self.cursor.execute('SELECT COLUMN_NAME as column_name FROM information_schema.columns ' +
|
|
|
- f"WHERE TABLE_NAME = '{self.convert_table(table)}'")
|
|
|
+ q = self.cursor.execute(
|
|
|
+ "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
|
|
|
+ + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
|
|
|
+ )
|
|
|
source_insp_cols = [col[0] for col in q.fetchall()]
|
|
|
return source_insp_cols
|
|
|
|
|
|
def convert_table(self, table):
|
|
|
- if '.' in table:
|
|
|
- table = table.split('.')[-1]
|
|
|
- if '[' in table:
|
|
|
+ if "." in table:
|
|
|
+ table = table.split(".")[-1]
|
|
|
+ if "[" in table:
|
|
|
table = table[1:-1]
|
|
|
return table
|
|
|
|
|
|
|
|
|
-def create(config_file='dbtools/OPTIMA.json'):
|
|
|
- cfg_import = json.load(open(config_file, 'r', encoding='latin-1'))
|
|
|
+def create(config_file="dbtools/OPTIMA.json"):
|
|
|
+ cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
|
|
|
base_dir = Path(config_file).resolve().parent
|
|
|
- cfg_import['name'] = Path(config_file).stem
|
|
|
- if cfg_import['stage_dir'][:2] == '..':
|
|
|
- cfg_import['stage_dir'] = str(base_dir.joinpath(cfg_import['stage_dir']).resolve())
|
|
|
- if cfg_import['batch_dir'][:2] == '..':
|
|
|
- cfg_import['batch_dir'] = str(base_dir.joinpath(cfg_import['batch_dir']).resolve())
|
|
|
+ cfg_import["name"] = Path(config_file).stem
|
|
|
+ if cfg_import["stage_dir"][:2] == "..":
|
|
|
+ cfg_import["stage_dir"] = str(
|
|
|
+ base_dir.joinpath(cfg_import["stage_dir"]).resolve()
|
|
|
+ )
|
|
|
+ if cfg_import["batch_dir"][:2] == "..":
|
|
|
+ cfg_import["batch_dir"] = str(
|
|
|
+ base_dir.joinpath(cfg_import["batch_dir"]).resolve()
|
|
|
+ )
|
|
|
cfg = DbCreateConfig(**cfg_import)
|
|
|
|
|
|
- df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=';', encoding='latin-1')
|
|
|
- config = df[df['target'].notnull()]
|
|
|
+ df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")
|
|
|
+ config = df[df["target"].notnull()]
|
|
|
print(config.head())
|
|
|
|
|
|
source_db = database_inspect(cfg.source_dsn)
|
|
@@ -100,31 +130,41 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
target_tables = target_db.get_tables()
|
|
|
|
|
|
for index, current_table in config.iterrows():
|
|
|
- with open(f"{cfg.batch_dir}/{current_table['target']}.bat", 'w', encoding='cp850') as f:
|
|
|
- f.write('@echo off \n')
|
|
|
- f.write('rem ==' + current_table['target'] + '==\n')
|
|
|
-
|
|
|
- if not current_table['target'] in target_tables:
|
|
|
- f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
|
|
|
+ with open(
|
|
|
+ f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850"
|
|
|
+ ) as f:
|
|
|
+ f.write("@echo off \n")
|
|
|
+ f.write("rem ==" + current_table["target"] + "==\n")
|
|
|
+
|
|
|
+ if not current_table["target"] in target_tables:
|
|
|
+ f.write(
|
|
|
+ f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n"
|
|
|
+ )
|
|
|
print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
|
|
|
continue
|
|
|
|
|
|
- f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
|
|
|
- f.write(f"sqlcmd.exe {target_db.bcp_conn_params()} -p " +
|
|
|
- f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n")
|
|
|
-
|
|
|
- target_columns_list = target_db.get_columns(current_table['target'])
|
|
|
- if 'CLIENT_DB' in target_columns_list:
|
|
|
- target_columns_list.remove('CLIENT_DB')
|
|
|
- target_columns_list.append('Client_DB')
|
|
|
+ f.write(
|
|
|
+ f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n"
|
|
|
+ )
|
|
|
+ f.write(
|
|
|
+ f"sqlcmd.exe {target_db.bcp_conn_params()} -p "
|
|
|
+ + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n"
|
|
|
+ )
|
|
|
+
|
|
|
+ target_columns_list = target_db.get_columns(current_table["target"])
|
|
|
+ if "CLIENT_DB" in target_columns_list:
|
|
|
+ target_columns_list.remove("CLIENT_DB")
|
|
|
+ target_columns_list.append("Client_DB")
|
|
|
target_columns = set(target_columns_list)
|
|
|
|
|
|
for client_db, prefix in cfg.clients.items():
|
|
|
- source_table = current_table['source'].format(prefix)
|
|
|
+ source_table = current_table["source"].format(prefix)
|
|
|
if source_table not in source_tables:
|
|
|
source_table2 = source_db.convert_table(source_table)
|
|
|
if source_table2 not in source_tables:
|
|
|
- f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
|
|
|
+ f.write(
|
|
|
+ f"echo Quell-Tabelle '{source_table}' existiert nicht!\n"
|
|
|
+ )
|
|
|
print(f"Quell-Tabelle '{source_table}' existiert nicht!")
|
|
|
continue
|
|
|
|
|
@@ -136,52 +176,64 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
if len(diff1) > 0:
|
|
|
f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
|
|
|
diff2 = target_columns.difference(source_columns)
|
|
|
- if 'Client_DB' not in diff2:
|
|
|
+ if "Client_DB" not in diff2:
|
|
|
f.write("echo Spalte 'Client_DB' fehlt!\n")
|
|
|
- print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
|
|
|
+ print(
|
|
|
+ f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!"
|
|
|
+ )
|
|
|
continue
|
|
|
- diff2.remove('Client_DB')
|
|
|
+ diff2.remove("Client_DB")
|
|
|
if len(diff2) > 0:
|
|
|
f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
|
|
|
|
|
|
- if not pd.isnull(current_table['query']):
|
|
|
- select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1])
|
|
|
- elif '.' in source_table or cfg.source_dsn['schema'] == '':
|
|
|
- select_query = f"SELECT T1.* FROM \\\"{source_table}\\\" T1 "
|
|
|
+ if not pd.isnull(current_table["query"]):
|
|
|
+ select_query = current_table["query"].format(
|
|
|
+ prefix, cfg.filter[0], cfg.filter[1]
|
|
|
+ )
|
|
|
+ elif "." in source_table or cfg.source_dsn["schema"] == "":
|
|
|
+ select_query = f'SELECT T1.* FROM \\"{source_table}\\" T1 '
|
|
|
else:
|
|
|
select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
|
|
|
|
|
|
- if not pd.isnull(current_table['filter']):
|
|
|
- select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1])
|
|
|
+ if not pd.isnull(current_table["filter"]):
|
|
|
+ select_query += " WHERE " + current_table["filter"].format(
|
|
|
+ "", cfg.filter[0], cfg.filter[1]
|
|
|
+ )
|
|
|
# select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
|
|
|
- select_columns = ''
|
|
|
+ select_columns = ""
|
|
|
for col in target_columns_list:
|
|
|
if col in intersect:
|
|
|
select_columns += f"T1.[{col}], "
|
|
|
- elif col == 'Client_DB':
|
|
|
- select_columns += "'" + client_db + "' as \\\"Client_DB\\\", "
|
|
|
+ elif col == "Client_DB":
|
|
|
+ select_columns += "'" + client_db + '\' as \\"Client_DB\\", '
|
|
|
else:
|
|
|
- select_columns += "'' as \\\"" + col + "\\\", "
|
|
|
+ select_columns += "'' as \\\"" + col + '\\", '
|
|
|
|
|
|
select_query = select_query.replace("T1.*", select_columns[:-2])
|
|
|
- select_query = select_query.replace("%", "%%") # batch-Problem
|
|
|
+ select_query = select_query.replace("%", "%%") # batch-Problem
|
|
|
|
|
|
- stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
|
|
|
+ stage_csv = (
|
|
|
+ f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
|
|
|
+ )
|
|
|
# insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
|
|
|
# ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
|
|
|
|
|
|
# print(select_query)
|
|
|
- bulk_copy = 'bcp' if cfg.source_dsn['driver'] == 'mssql' else 'cet'
|
|
|
- f.write(f"{bulk_copy} \"{select_query}\" queryout \"{stage_csv}\" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 " +
|
|
|
- f"-e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
|
|
|
- f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n")
|
|
|
- f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} " +
|
|
|
- f"-c -C 65001 -m 1000 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
|
|
|
- f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n")
|
|
|
-
|
|
|
- f.write(f"del \"{stage_csv}\" /F >nul 2>nul \n")
|
|
|
-
|
|
|
- with open(f"{cfg.batch_dir}/_{cfg.name}.bat", 'w', encoding='cp850') as f:
|
|
|
+ bulk_copy = "bcp" if cfg.source_dsn["driver"] == "mssql" else "cet"
|
|
|
+ f.write(
|
|
|
+ f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 '
|
|
|
+ + f'-e "{stage_csv[:-4]}.queryout.log" > "{stage_csv[:-4]}.bcp1.log" \n'
|
|
|
+ )
|
|
|
+ f.write(f'type "{stage_csv[:-4]}.bcp1.log" | findstr -v "1000" \n')
|
|
|
+ f.write(
|
|
|
+ f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} "
|
|
|
+ + f'-c -C 65001 -m 1000 -e "{stage_csv[:-4]}.in.log" > "{stage_csv[:-4]}.bcp2.log" \n'
|
|
|
+ )
|
|
|
+ f.write(f'type "{stage_csv[:-4]}.bcp2.log" | findstr -v "1000" \n')
|
|
|
+
|
|
|
+ f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
|
|
|
+
|
|
|
+ with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
|
|
|
f.write("@echo off & cd /d %~dp0 \n")
|
|
|
f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
|
|
|
for index, current_table in config.iterrows():
|
|
@@ -190,5 +242,5 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
f.write(f"call {current_table['target']}.bat\n\n")
|
|
|
|
|
|
|
|
|
-if __name__ == '__main__':
|
|
|
+if __name__ == "__main__":
|
|
|
create()
|