db_create.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import plac
  2. import pandas as pd
  3. from sqlalchemy import create_engine, inspect
  4. from database import bcp_conn_params, conn_string
  5. import json
  6. from pathlib import Path
  7. from collections import namedtuple
  8. DbCreateConfig = namedtuple('DbCreateConfig', 'name csv_file clients filter source_dsn target_dsn stage_dir batch_dir')
  9. cfg = DbCreateConfig(**{
  10. 'name': 'CARLO',
  11. 'csv_file': 'CARLO.csv',
  12. 'clients': {'1': 'M und S Fahrzeughandel GmbH'},
  13. 'filter': ['01.01.2018', '01.01.2019'],
  14. 'source_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'},
  15. 'target_dsn': {'user': 'sa', 'pass': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE', 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'},
  16. 'stage_dir': '..\\temp',
  17. 'batch_dir': '..\\batch'
  18. })
  19. def db_import(select_query, source_db, current_table, target_db, target_schema):
  20. pd.read_sql(select_query, source_db).to_sql(current_table['target'], target_db, schema=target_schema, index=False, if_exists='append')
  21. @plac.pos('config_file', '', type=str)
  22. def create(config_file='CARLO.json'):
  23. cfg_import = json.load(open(config_file, 'r', encoding='ansi'))
  24. base_dir = Path(config_file).resolve().parent
  25. cfg_import['name'] = Path(config_file).stem
  26. if cfg_import['stage_dir'][:2] == '..':
  27. cfg_import['stage_dir'] = str(base_dir.joinpath(cfg_import['stage_dir']).resolve())
  28. if cfg_import['batch_dir'][:2] == '..':
  29. cfg_import['batch_dir'] = str(base_dir.joinpath(cfg_import['batch_dir']).resolve())
  30. cfg = DbCreateConfig(**cfg_import)
  31. df = pd.read_csv(str(base_dir) + "\\" + cfg.csv_file, sep=';', encoding='ansi')
  32. config = df[df['target'].notnull()]
  33. print(config.head())
  34. source_db = create_engine(conn_string(cfg.source_dsn))
  35. source_insp = inspect(source_db)
  36. source_tables = source_insp.get_table_names(schema=cfg.source_dsn['schema'])
  37. source_tables_prefix = dict(enumerate(sorted(list(set([t.split('$')[0] for t in source_tables if '$' in t]))), 1))
  38. print(source_tables_prefix)
  39. target_db = create_engine(conn_string(cfg.target_dsn))
  40. target_insp = inspect(target_db)
  41. target_tables = target_insp.get_table_names(schema=cfg.target_dsn['schema'])
  42. for index, current_table in config.iterrows():
  43. with open(cfg.batch_dir + "\\" + current_table['target'] + '.bat', 'w', encoding='cp850') as f:
  44. f.write('@echo off \n')
  45. f.write('rem ==' + current_table['target'] + '==\n')
  46. if not current_table['target'] in target_tables:
  47. f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
  48. continue
  49. f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
  50. f.write(f"sqlcmd.exe {bcp_conn_params(cfg.target_dsn)} -p -Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n")
  51. target_insp_cols = target_insp.get_columns(current_table['target'], schema=cfg.target_dsn['schema'])
  52. target_columns_list = [col['name'] for col in target_insp_cols]
  53. target_columns = set(target_columns_list)
  54. for client_db, prefix in cfg.clients.items():
  55. source_table = current_table['source'].format(prefix)
  56. if source_table not in source_tables:
  57. f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
  58. continue
  59. stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
  60. if not pd.isnull(current_table['query']):
  61. select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1])
  62. elif '.' in source_table:
  63. select_query = f"SELECT T1.* FROM {source_table} T1 "
  64. else:
  65. select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
  66. if not pd.isnull(current_table['filter']):
  67. select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1])
  68. source_insp_cols = source_insp.get_columns(source_table)
  69. source_columns = set([col['name'] for col in source_insp_cols])
  70. intersect = source_columns.intersection(target_columns)
  71. # print("Auf beiden Seiten: " + ";".join(intersect))
  72. diff1 = source_columns.difference(target_columns)
  73. if len(diff1) > 0:
  74. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  75. diff2 = target_columns.difference(source_columns)
  76. if 'Client_DB' not in diff2:
  77. f.write("echo Spalte 'Client_DB' fehlt!\n")
  78. continue
  79. diff2.remove('Client_DB')
  80. if len(diff2) > 0:
  81. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  82. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  83. select_columns = ''
  84. for col in target_columns_list:
  85. if col in intersect:
  86. select_columns += "T1.[" + col + "], "
  87. elif col == 'Client_DB':
  88. select_columns += "'" + client_db + "' as \\\"Client_DB\\\", "
  89. else:
  90. select_columns += "'' as \\\"" + col + "\\\", "
  91. select_query = select_query.replace("T1.*", select_columns[:-2])
  92. select_query = select_query.replace("%", "%%") # batch-Problem
  93. # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
  94. # print(select_query)
  95. f.write(f"bcp \"{select_query}\" queryout \"{stage_csv}\" {bcp_conn_params(cfg.source_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
  96. f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n")
  97. f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {bcp_conn_params(cfg.target_dsn)} -c -C 65001 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
  98. f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n")
  99. with open(cfg.batch_dir + "\\_" + cfg.name + ".bat", "w", encoding="cp850") as f:
  100. f.write("@echo off & cd /d %~dp0 \n")
  101. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
  102. for index, current_table in config.iterrows():
  103. f.write("echo ==" + current_table['target'] + "==\n")
  104. f.write("echo " + current_table['target'] + " >CON \n")
  105. f.write("call " + current_table['target'] + ".bat\n\n")
  106. if __name__ == '__main__':
  107. plac.call(create)