|
@@ -1,49 +1,70 @@
|
|
|
-import plac
|
|
|
-import pandas as pd
|
|
|
-from sqlalchemy import create_engine, inspect
|
|
|
-from database import bcp_conn_params, conn_string
|
|
|
import json
|
|
|
-from pathlib import Path
|
|
|
from collections import namedtuple
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+import plac
|
|
|
+import pyodbc
|
|
|
+
|
|
|
DbCreateConfig = namedtuple('DbCreateConfig', 'name csv_file clients filter source_dsn target_dsn stage_dir batch_dir')
|
|
|
+DsnConfig = namedtuple('DsnConfig', 'user password server database driver schema')
|
|
|
|
|
|
cfg = DbCreateConfig(**{
|
|
|
'name': 'CARLO',
|
|
|
'csv_file': 'CARLO.csv',
|
|
|
'clients': {'1': 'M und S Fahrzeughandel GmbH'},
|
|
|
'filter': ['01.01.2018', '01.01.2019'],
|
|
|
- 'source_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'},
|
|
|
- 'target_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'},
|
|
|
+ 'source_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'},
|
|
|
+ 'target_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'},
|
|
|
'stage_dir': '..\\temp',
|
|
|
'batch_dir': '..\\batch'
|
|
|
})
|
|
|
|
|
|
|
|
|
class database_inspect():
|
|
|
+ tables = []
|
|
|
+
|
|
|
def __init__(self, dsn):
|
|
|
- self.dsn = dsn
|
|
|
- self.engine = create_engine(conn_string(self.dsn))
|
|
|
- self.insp = inspect(self.engine)
|
|
|
+ self.dsn = DsnConfig(**dsn)
|
|
|
+ self.cursor = self.connect()
|
|
|
+
|
|
|
+ def conn_string(self):
|
|
|
+ if self.dsn.driver == 'mssql':
|
|
|
+ return 'Driver={SQL Server Native Client 11.0};' + f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}"
|
|
|
+ if self.dsn.driver == 'mysql':
|
|
|
+ return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
|
|
|
+ return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
|
|
|
+
|
|
|
+ def bcp_conn_params(self):
|
|
|
+ return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ c = pyodbc.connect(self.conn_string())
|
|
|
+ return c.cursor()
|
|
|
|
|
|
def get_tables(self):
|
|
|
- self.tables = self.insp.get_table_names(schema=self.dsn['schema']) + self.insp.get_view_names(schema=self.dsn['schema'])
|
|
|
+ tables = [x[2] for x in self.cursor.tables(tableType='TABLE')]
|
|
|
+ views = [x[2] for x in self.cursor.tables(tableType='VIEW')]
|
|
|
+ self.tables = tables + views
|
|
|
return self.tables
|
|
|
|
|
|
def get_prefix(self):
|
|
|
+ if (len(self.tables)) == 0:
|
|
|
+ self.get_tables()
|
|
|
source_tables_prefix = dict(enumerate(sorted(list(set([t.split('$')[0] for t in self.tables if '$' in t]))), 1))
|
|
|
if len(source_tables_prefix) == 0:
|
|
|
- q = self.engine.execute('select name FROM sys.databases')
|
|
|
+ q = self.cursor.execute('select name FROM sys.databases')
|
|
|
source_tables_prefix = [x[0] for x in q.fetchall()]
|
|
|
return source_tables_prefix
|
|
|
|
|
|
def get_columns(self, table):
|
|
|
- source_insp_cols = self.insp.get_columns(table)
|
|
|
+ source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
|
|
|
if len(source_insp_cols) == 0:
|
|
|
- q = self.engine.execute(f"SELECT COLUMN_NAME as name FROM information_schema.columns WHERE TABLE_NAME = '{self.convert_table(table)}'")
|
|
|
- source_insp_cols = q.fetchall()
|
|
|
- return set([col['name'] for col in source_insp_cols])
|
|
|
+ q = self.cursor.execute(f"SELECT COLUMN_NAME as column_name FROM information_schema.columns WHERE TABLE_NAME = '{self.convert_table(table)}'")
|
|
|
+ source_insp_cols = [col[0] for col in q.fetchall()]
|
|
|
+ return source_insp_cols
|
|
|
|
|
|
- def covert_table(self, table):
|
|
|
+ def convert_table(self, table):
|
|
|
if '.' in table:
|
|
|
table = table.split('.')[-1]
|
|
|
if '[' in table:
|
|
@@ -84,7 +105,7 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
continue
|
|
|
|
|
|
f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
|
|
|
- f.write(f"sqlcmd.exe {bcp_conn_params(cfg.target_dsn)} -p -Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n")
|
|
|
+ f.write(f"sqlcmd.exe {target_db.bcp_conn_params()} -p -Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n")
|
|
|
|
|
|
target_columns_list = target_db.get_columns(current_table['target'])
|
|
|
if 'CLIENT_DB' in target_columns_list:
|
|
@@ -101,17 +122,7 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
print(f"Quell-Tabelle '{source_table}' existiert nicht!")
|
|
|
continue
|
|
|
|
|
|
- source_columns = source_db.get_columns(source_table)
|
|
|
-
|
|
|
- if not pd.isnull(current_table['query']):
|
|
|
- select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1])
|
|
|
- elif '.' in source_table:
|
|
|
- select_query = f"SELECT T1.* FROM {source_table} T1 "
|
|
|
- else:
|
|
|
- select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
|
|
|
-
|
|
|
- if not pd.isnull(current_table['filter']):
|
|
|
- select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1])
|
|
|
+ source_columns = set(source_db.get_columns(source_table))
|
|
|
|
|
|
intersect = source_columns.intersection(target_columns)
|
|
|
# print("Auf beiden Seiten: " + ";".join(intersect))
|
|
@@ -127,11 +138,20 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
if len(diff2) > 0:
|
|
|
f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
|
|
|
|
|
|
+ if not pd.isnull(current_table['query']):
|
|
|
+ select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1])
|
|
|
+ elif '.' in source_table or cfg.source_dsn['schema'] == '':
|
|
|
+ select_query = f"SELECT T1.* FROM \\\"{source_table}\\\" T1 "
|
|
|
+ else:
|
|
|
+ select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
|
|
|
+
|
|
|
+ if not pd.isnull(current_table['filter']):
|
|
|
+ select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1])
|
|
|
# select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
|
|
|
select_columns = ''
|
|
|
for col in target_columns_list:
|
|
|
if col in intersect:
|
|
|
- select_columns += f"T1.[{col}], "
|
|
|
+ select_columns += f"T1.\\\"{col}\\\", "
|
|
|
elif col == 'Client_DB':
|
|
|
select_columns += "'" + client_db + "' as \\\"Client_DB\\\", "
|
|
|
else:
|
|
@@ -144,9 +164,10 @@ def create(config_file='dbtools/OPTIMA.json'):
|
|
|
# insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
|
|
|
|
|
|
# print(select_query)
|
|
|
- f.write(f"bcp \"{select_query}\" queryout \"{stage_csv}\" {bcp_conn_params(cfg.source_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
|
|
|
+ bulk_copy = 'bcp' if cfg.source_dsn['driver'] == 'mssql' else 'cet'
|
|
|
+ f.write(f"{bulk_copy} \"{select_query}\" queryout \"{stage_csv}\" {source_db.bcp_conn_params()} -c -C 65001 -e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
|
|
|
f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n")
|
|
|
- f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {bcp_conn_params(cfg.target_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
|
|
|
+ f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} -c -C 65001 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
|
|
|
f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n")
|
|
|
|
|
|
with open(f"{cfg.batch_dir}/_{cfg.name}.bat", 'w', encoding='cp850') as f:
|