|
@@ -1,48 +1,41 @@
|
|
|
import pandas as pd
|
|
|
from sqlalchemy import create_engine, inspect
|
|
|
+from database import bcp_conn_params, conn_string
|
|
|
|
|
|
-
|
|
|
-csv_file = 'CARLO.csv'
|
|
|
-clients = {'1': 'M und S Fahrzeughandel GmbH'}
|
|
|
-client_db = '1'
|
|
|
-date_filter = "'2018-01-01'"
|
|
|
-source_dsn = {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'DE0017'}
|
|
|
-source_schema = 'dbo'
|
|
|
-target_dsn = {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'CARLO2'}
|
|
|
-target_schema = 'import'
|
|
|
-# stage_dir = "\\\\gc-server1\Austausch\\stage"
|
|
|
-stage_dir = 'C:\\GlobalCube\\System\\CARLO\\Export\\stage'
|
|
|
+cfg = {
|
|
|
+ 'csv_file': 'CARLO.csv',
|
|
|
+ 'clients': {'1': 'M und S Fahrzeughandel GmbH'},
|
|
|
+ 'date_filter': "'2018-01-01'",
|
|
|
+ 'source_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'DE0017', 'driver': 'mssql'},
|
|
|
+ 'source_schema': 'dbo',
|
|
|
+ 'target_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'CARLO2', 'driver': 'mssql'},
|
|
|
+ 'target_schema': 'import',
|
|
|
+ # stage_dir = "\\\\gc-server1\Austausch\\stage",
|
|
|
+ 'stage_dir': 'C:\\GlobalCube\\System\\CARLO\\Export\\stage'
|
|
|
+}
|
|
|
|
|
|
|
|
|
def db_import(select_query, source_db, current_table, target_db, target_schema):
|
|
|
pd.read_sql(select_query, source_db).to_sql(current_table['target'], target_db, schema=target_schema, index=False, if_exists='append')
|
|
|
|
|
|
|
|
|
-def conn_string(dsn):
|
|
|
- return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
|
|
|
-
|
|
|
-
|
|
|
-def conn_params(dsn):
|
|
|
- return f"-S {dsn['server']} -d {dsn['database']} -U {dsn['user']} -P {dsn['pass']}"
|
|
|
-
|
|
|
-
|
|
|
-df = pd.read_csv(csv_file, sep=';', encoding='ansi')
|
|
|
+df = pd.read_csv(cfg['csv_file'], sep=';', encoding='ansi')
|
|
|
config = df[df['target'].notnull()]
|
|
|
print(config.head())
|
|
|
|
|
|
-source_db = create_engine(conn_string(source_dsn))
|
|
|
+source_db = create_engine(conn_string(cfg['source_dsn']))
|
|
|
source_insp = inspect(source_db)
|
|
|
-source_tables = source_insp.get_table_names(schema=source_schema)
|
|
|
+source_tables = source_insp.get_table_names(schema=cfg['source_schema'])
|
|
|
source_tables_prefix = set([t.split('$')[0] for t in source_tables if '$' in t])
|
|
|
print(source_tables_prefix)
|
|
|
|
|
|
-target_db = create_engine(conn_string(target_dsn))
|
|
|
+target_db = create_engine(conn_string(cfg['target_dsn']))
|
|
|
target_insp = inspect(target_db)
|
|
|
-target_tables = target_insp.get_table_names(schema=target_schema)
|
|
|
+target_tables = target_insp.get_table_names(schema=cfg['target_schema'])
|
|
|
|
|
|
|
|
|
for index, current_table in config.iterrows():
|
|
|
- with open(stage_dir + "\\batch\\" + current_table['target'] + '.bat', 'w', encoding='cp850') as f:
|
|
|
+ with open(cfg['stage_dir'] + "\\batch\\" + current_table['target'] + '.bat', 'w', encoding='cp850') as f:
|
|
|
f.write('@echo off \n')
|
|
|
f.write('rem ==' + current_table['target'] + '==\n')
|
|
|
|
|
@@ -50,28 +43,28 @@ for index, current_table in config.iterrows():
|
|
|
f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
|
|
|
continue
|
|
|
|
|
|
- f.write(f"del {stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
|
|
|
- f.write(f"sqlcmd.exe {conn_params(target_dsn)} -p -Q \"TRUNCATE TABLE [{target_schema}].[{current_table['target']}]\" \n")
|
|
|
+ f.write(f"del {cfg['stage_dir']}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
|
|
|
+ f.write(f"sqlcmd.exe {bcp_conn_params(cfg['target_dsn'])} -p -Q \"TRUNCATE TABLE [{cfg['target_schema']}].[{current_table['target']}]\" \n")
|
|
|
|
|
|
- target_insp_cols = target_insp.get_columns(current_table['target'], schema=target_schema)
|
|
|
+ target_insp_cols = target_insp.get_columns(current_table['target'], schema=cfg['target_schema'])
|
|
|
target_columns_list = [col['name'] for col in target_insp_cols]
|
|
|
target_columns = set(target_columns_list)
|
|
|
|
|
|
- for client_db, prefix in clients.items():
|
|
|
+ for client_db, prefix in cfg['clients'].items():
|
|
|
source_table = current_table['source'].format(prefix)
|
|
|
|
|
|
if source_table not in source_tables:
|
|
|
f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
|
|
|
continue
|
|
|
- stage_csv = f"{stage_dir}\\{current_table['target']}_{client_db}.csv"
|
|
|
+ stage_csv = f"{cfg['stage_dir']}\\{current_table['target']}_{client_db}.csv"
|
|
|
|
|
|
if not pd.isnull(current_table['query']):
|
|
|
- select_query = current_table['query'].format(prefix, date_filter)
|
|
|
+ select_query = current_table['query'].format(prefix, cfg['date_filter'])
|
|
|
else:
|
|
|
- select_query = f"SELECT T1.* FROM [{source_schema}].[{source_table}] T1 "
|
|
|
+ select_query = f"SELECT T1.* FROM [{cfg['source_schema']}].[{source_table}] T1 "
|
|
|
|
|
|
if not pd.isnull(current_table['filter']):
|
|
|
- select_query += " WHERE " + current_table['filter'].format("", date_filter)
|
|
|
+ select_query += " WHERE " + current_table['filter'].format("", cfg['date_filter'])
|
|
|
|
|
|
source_insp_cols = source_insp.get_columns(source_table)
|
|
|
source_columns = set([col['name'] for col in source_insp_cols])
|
|
@@ -101,16 +94,19 @@ for index, current_table in config.iterrows():
|
|
|
|
|
|
select_query = select_query.replace("T1.*", select_columns[:-2])
|
|
|
select_query = select_query.replace("%", "%%") # batch-Problem
|
|
|
+
|
|
|
+ insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
|
|
|
+
|
|
|
# print(select_query)
|
|
|
- f.write(f"bcp \"{select_query}\" queryout \"{stage_csv}\" {conn_params(source_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
|
|
|
+ f.write(f"bcp \"{select_query}\" queryout \"{stage_csv}\" {bcp_conn_params(cfg['source_dsn'])} -c -C 65001 -e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
|
|
|
f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n")
|
|
|
- f.write(f"bcp [{target_schema}].[{current_table['target']}] in \"{stage_csv}\" {conn_params(target_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
|
|
|
+ f.write(f"bcp [{cfg['target_schema']}].[{current_table['target']}] in \"{stage_csv}\" {bcp_conn_params(cfg['target_dsn'])} -c -C 65001 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
|
|
|
f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n")
|
|
|
|
|
|
|
|
|
-with open(stage_dir + "\\batch\\_all.bat", "w", encoding="cp850") as f:
|
|
|
+with open(cfg['stage_dir'] + "\\batch\\_all.bat", "w", encoding="cp850") as f:
|
|
|
f.write("@echo off & cd /d %~dp0 \n")
|
|
|
- f.write(f"del {stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
|
|
|
+ f.write(f"del {cfg['stage_dir']}\\*.* /Q /F >nul 2>nul \n\n")
|
|
|
for index, current_table in config.iterrows():
|
|
|
f.write("echo ==" + current_table['target'] + "==\n")
|
|
|
f.write("echo " + current_table['target'] + " >CON \n")
|