db_create.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import json
  2. from collections import namedtuple
  3. from pathlib import Path
  4. from re import escape
  5. from numpy import select
  6. import pandas as pd
  7. import plac
  8. import pyodbc
  9. DbCreateConfig = namedtuple('DbCreateConfig', 'name csv_file clients filter source_dsn target_dsn stage_dir batch_dir')
  10. DsnConfig = namedtuple('DsnConfig', 'user password server database driver schema')
  11. cfg = DbCreateConfig(**{
  12. 'name': 'CARLO',
  13. 'csv_file': 'CARLO.csv',
  14. 'clients': {'1': 'M und S Fahrzeughandel GmbH'},
  15. 'filter': ['01.01.2018', '01.01.2019'],
  16. 'source_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE',
  17. 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'},
  18. 'target_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE',
  19. 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'},
  20. 'stage_dir': '..\\temp',
  21. 'batch_dir': '..\\batch'
  22. })
  23. class database_inspect():
  24. tables = []
  25. def __init__(self, dsn):
  26. self.dsn = DsnConfig(**dsn)
  27. self.cursor = self.connect()
  28. def conn_string(self):
  29. if self.dsn.driver == 'mssql':
  30. return 'Driver={SQL Server Native Client 11.0};' + \
  31. f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}"
  32. if self.dsn.driver == 'mysql':
  33. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  34. return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  35. def bcp_conn_params(self):
  36. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  37. def connect(self):
  38. c = pyodbc.connect(self.conn_string())
  39. return c.cursor()
  40. def get_tables(self):
  41. tables = [x[2] for x in self.cursor.tables(tableType='TABLE')]
  42. views = [x[2] for x in self.cursor.tables(tableType='VIEW')]
  43. self.tables = tables + views
  44. return self.tables
  45. def get_prefix(self):
  46. if (len(self.tables)) == 0:
  47. self.get_tables()
  48. source_tables_prefix = dict(enumerate(sorted(list(set([t.split('$')[0] for t in self.tables if '$' in t]))), 1))
  49. if len(source_tables_prefix) == 0:
  50. q = self.cursor.execute('select name FROM sys.databases')
  51. source_tables_prefix = [x[0] for x in q.fetchall()]
  52. return source_tables_prefix
  53. def get_columns(self, table):
  54. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  55. if len(source_insp_cols) == 0:
  56. q = self.cursor.execute('SELECT COLUMN_NAME as column_name FROM information_schema.columns ' +
  57. f"WHERE TABLE_NAME = '{self.convert_table(table)}'")
  58. source_insp_cols = [col[0] for col in q.fetchall()]
  59. return source_insp_cols
  60. def convert_table(self, table):
  61. if '.' in table:
  62. table = table.split('.')[-1]
  63. if '[' in table:
  64. table = table[1:-1]
  65. return table
  66. @plac.pos('config_file', '', type=str)
  67. def create(config_file='dbtools/OPTIMA.json'):
  68. cfg_import = json.load(open(config_file, 'r', encoding='latin-1'))
  69. base_dir = Path(config_file).resolve().parent
  70. cfg_import['name'] = Path(config_file).stem
  71. if cfg_import['stage_dir'][:2] == '..':
  72. cfg_import['stage_dir'] = str(base_dir.joinpath(cfg_import['stage_dir']).resolve())
  73. if cfg_import['batch_dir'][:2] == '..':
  74. cfg_import['batch_dir'] = str(base_dir.joinpath(cfg_import['batch_dir']).resolve())
  75. cfg = DbCreateConfig(**cfg_import)
  76. df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=';', encoding='latin-1')
  77. config = df[df['target'].notnull()]
  78. print(config.head())
  79. source_db = database_inspect(cfg.source_dsn)
  80. source_tables = source_db.get_tables()
  81. print(source_db.get_prefix())
  82. target_db = database_inspect(cfg.target_dsn)
  83. target_tables = target_db.get_tables()
  84. for index, current_table in config.iterrows():
  85. with open(f"{cfg.batch_dir}/{current_table['target']}.bat", 'w', encoding='cp850') as f:
  86. f.write('@echo off \n')
  87. f.write('rem ==' + current_table['target'] + '==\n')
  88. if not current_table['target'] in target_tables:
  89. f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
  90. print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
  91. continue
  92. f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
  93. f.write(f"sqlcmd.exe {target_db.bcp_conn_params()} -p " +
  94. f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n")
  95. target_columns_list = target_db.get_columns(current_table['target'])
  96. if 'CLIENT_DB' in target_columns_list:
  97. target_columns_list.remove('CLIENT_DB')
  98. target_columns_list.append('Client_DB')
  99. target_columns = set(target_columns_list)
  100. for client_db, prefix in cfg.clients.items():
  101. source_table = current_table['source'].format(prefix)
  102. if source_table not in source_tables:
  103. source_table2 = source_db.convert_table(source_table)
  104. if source_table2 not in source_tables:
  105. f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
  106. print(f"Quell-Tabelle '{source_table}' existiert nicht!")
  107. continue
  108. source_columns = set(source_db.get_columns(source_table))
  109. intersect = source_columns.intersection(target_columns)
  110. # print("Auf beiden Seiten: " + ";".join(intersect))
  111. diff1 = source_columns.difference(target_columns)
  112. if len(diff1) > 0:
  113. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  114. diff2 = target_columns.difference(source_columns)
  115. if 'Client_DB' not in diff2:
  116. f.write("echo Spalte 'Client_DB' fehlt!\n")
  117. print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
  118. continue
  119. diff2.remove('Client_DB')
  120. if len(diff2) > 0:
  121. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  122. if not pd.isnull(current_table['query']):
  123. select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1])
  124. elif '.' in source_table or cfg.source_dsn['schema'] == '':
  125. select_query = f"SELECT T1.* FROM \\\"{source_table}\\\" T1 "
  126. else:
  127. select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
  128. if not pd.isnull(current_table['filter']):
  129. select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1])
  130. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  131. select_columns = ''
  132. for col in target_columns_list:
  133. if col in intersect:
  134. select_columns += f"T1.\"{col}\", "
  135. elif col == 'Client_DB':
  136. select_columns += "'" + client_db + "' as \"Client_DB\", "
  137. else:
  138. select_columns += "'' as \"" + col + "\", "
  139. select_query = select_query.replace("T1.*", select_columns[:-2])
  140. select_query_file = f"{cfg.batch_dir}/query/{current_table['target']}.sql"
  141. with open(select_query_file, 'w', encoding='cp65001') as fwh:
  142. fwh.write(select_query)
  143. select_query = select_query.replace("%", "%%") # batch-Problem
  144. select_query = escape(select_query)
  145. stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
  146. # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
  147. # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
  148. # print(select_query)
  149. bulk_copy = 'bcp' if cfg.source_dsn['driver'] == 'mssql' else 'cet'
  150. f.write(f"{bulk_copy} \"{select_query}\" queryout \"{stage_csv}\" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 " +
  151. f"-e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
  152. f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n")
  153. f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} " +
  154. f"-c -C 65001 -m 1000 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
  155. f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n")
  156. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", 'w', encoding='cp850') as f:
  157. f.write("@echo off & cd /d %~dp0 \n")
  158. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
  159. for index, current_table in config.iterrows():
  160. f.write(f"echo =={current_table['target']}==\n")
  161. f.write(f"echo {current_table['target']} >CON \n")
  162. f.write(f"call {current_table['target']}.bat\n\n")
  163. if __name__ == '__main__':
  164. plac.call(create)