db_create.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import json
  2. from collections import namedtuple
  3. from pathlib import Path
  4. import pandas as pd
  5. import plac
  6. import pyodbc
  7. DbCreateConfig = namedtuple('DbCreateConfig', 'name csv_file clients filter source_dsn target_dsn stage_dir batch_dir')
  8. DsnConfig = namedtuple('DsnConfig', 'user password server database driver schema')
  9. cfg = DbCreateConfig(**{
  10. 'name': 'CARLO',
  11. 'csv_file': 'CARLO.csv',
  12. 'clients': {'1': 'M und S Fahrzeughandel GmbH'},
  13. 'filter': ['01.01.2018', '01.01.2019'],
  14. 'source_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE',
  15. 'database': 'DE0017', 'driver': 'mssql', 'schema': 'dbo'},
  16. 'target_dsn': {'user': 'sa', 'password': 'Mffu3011#', 'server': 'GC-SERVER1\\GLOBALCUBE',
  17. 'database': 'CARLO2', 'driver': 'mssql', 'schema': 'import'},
  18. 'stage_dir': '..\\temp',
  19. 'batch_dir': '..\\batch'
  20. })
  21. class database_inspect():
  22. tables = []
  23. def __init__(self, dsn):
  24. self.dsn = DsnConfig(**dsn)
  25. self.cursor = self.connect()
  26. def conn_string(self):
  27. if self.dsn.driver == 'mssql':
  28. return 'Driver={SQL Server Native Client 11.0};' + \
  29. f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}"
  30. if self.dsn.driver == 'mysql':
  31. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  32. return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  33. def bcp_conn_params(self):
  34. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  35. def connect(self):
  36. c = pyodbc.connect(self.conn_string())
  37. return c.cursor()
  38. def get_tables(self):
  39. tables = [x[2] for x in self.cursor.tables(tableType='TABLE')]
  40. views = [x[2] for x in self.cursor.tables(tableType='VIEW')]
  41. self.tables = tables + views
  42. return self.tables
  43. def get_prefix(self):
  44. if (len(self.tables)) == 0:
  45. self.get_tables()
  46. source_tables_prefix = dict(enumerate(sorted(list(set([t.split('$')[0] for t in self.tables if '$' in t]))), 1))
  47. if len(source_tables_prefix) == 0:
  48. q = self.cursor.execute('select name FROM sys.databases')
  49. source_tables_prefix = [x[0] for x in q.fetchall()]
  50. return source_tables_prefix
  51. def get_columns(self, table):
  52. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  53. if len(source_insp_cols) == 0:
  54. q = self.cursor.execute('SELECT COLUMN_NAME as column_name FROM information_schema.columns ' +
  55. f"WHERE TABLE_NAME = '{self.convert_table(table)}'")
  56. source_insp_cols = [col[0] for col in q.fetchall()]
  57. return source_insp_cols
  58. def convert_table(self, table):
  59. if '.' in table:
  60. table = table.split('.')[-1]
  61. if '[' in table:
  62. table = table[1:-1]
  63. return table
  64. @plac.pos('config_file', '', type=str)
  65. def create(config_file='dbtools/OPTIMA.json'):
  66. cfg_import = json.load(open(config_file, 'r', encoding='latin-1'))
  67. base_dir = Path(config_file).resolve().parent
  68. cfg_import['name'] = Path(config_file).stem
  69. if cfg_import['stage_dir'][:2] == '..':
  70. cfg_import['stage_dir'] = str(base_dir.joinpath(cfg_import['stage_dir']).resolve())
  71. if cfg_import['batch_dir'][:2] == '..':
  72. cfg_import['batch_dir'] = str(base_dir.joinpath(cfg_import['batch_dir']).resolve())
  73. cfg = DbCreateConfig(**cfg_import)
  74. df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=';', encoding='latin-1')
  75. config = df[df['target'].notnull()]
  76. print(config.head())
  77. source_db = database_inspect(cfg.source_dsn)
  78. source_tables = source_db.get_tables()
  79. print(source_db.get_prefix())
  80. target_db = database_inspect(cfg.target_dsn)
  81. target_tables = target_db.get_tables()
  82. for index, current_table in config.iterrows():
  83. with open(f"{cfg.batch_dir}/{current_table['target']}.bat", 'w', encoding='cp850') as f:
  84. f.write('@echo off \n')
  85. f.write('rem ==' + current_table['target'] + '==\n')
  86. if not current_table['target'] in target_tables:
  87. f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
  88. print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
  89. continue
  90. f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
  91. f.write(f"sqlcmd.exe {target_db.bcp_conn_params()} -p " +
  92. f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n")
  93. target_columns_list = target_db.get_columns(current_table['target'])
  94. if 'CLIENT_DB' in target_columns_list:
  95. target_columns_list.remove('CLIENT_DB')
  96. target_columns_list.append('Client_DB')
  97. target_columns = set(target_columns_list)
  98. for client_db, prefix in cfg.clients.items():
  99. source_table = current_table['source'].format(prefix)
  100. if source_table not in source_tables:
  101. source_table2 = source_db.convert_table(source_table)
  102. if source_table2 not in source_tables:
  103. f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
  104. print(f"Quell-Tabelle '{source_table}' existiert nicht!")
  105. continue
  106. source_columns = set(source_db.get_columns(source_table))
  107. intersect = source_columns.intersection(target_columns)
  108. # print("Auf beiden Seiten: " + ";".join(intersect))
  109. diff1 = source_columns.difference(target_columns)
  110. if len(diff1) > 0:
  111. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  112. diff2 = target_columns.difference(source_columns)
  113. if 'Client_DB' not in diff2:
  114. f.write("echo Spalte 'Client_DB' fehlt!\n")
  115. print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
  116. continue
  117. diff2.remove('Client_DB')
  118. if len(diff2) > 0:
  119. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  120. if not pd.isnull(current_table['query']):
  121. select_query = current_table['query'].format(prefix, cfg.filter[0], cfg.filter[1])
  122. elif '.' in source_table or cfg.source_dsn['schema'] == '':
  123. select_query = f"SELECT T1.* FROM \\\"{source_table}\\\" T1 "
  124. else:
  125. select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
  126. if not pd.isnull(current_table['filter']):
  127. select_query += " WHERE " + current_table['filter'].format("", cfg.filter[0], cfg.filter[1])
  128. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  129. select_columns = ''
  130. for col in target_columns_list:
  131. if col in intersect:
  132. select_columns += f"T1.\\\"{col}\\\", "
  133. elif col == 'Client_DB':
  134. select_columns += "'" + client_db + "' as \\\"Client_DB\\\", "
  135. else:
  136. select_columns += "'' as \\\"" + col + "\\\", "
  137. select_query = select_query.replace("T1.*", select_columns[:-2])
  138. select_query = select_query.replace("%", "%%") # batch-Problem
  139. stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
  140. # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
  141. # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
  142. # print(select_query)
  143. bulk_copy = 'bcp' if cfg.source_dsn['driver'] == 'mssql' else 'cet'
  144. f.write(f"{bulk_copy} \"{select_query}\" queryout \"{stage_csv}\" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 " +
  145. f"-e \"{stage_csv[:-4]}.queryout.log\" > \"{stage_csv[:-4]}.bcp1.log\" \n")
  146. f.write(f"type \"{stage_csv[:-4]}.bcp1.log\" | findstr -v \"1000\" \n")
  147. f.write(f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} " +
  148. f"-c -C 65001 -m 1000 -e \"{stage_csv[:-4]}.in.log\" > \"{stage_csv[:-4]}.bcp2.log\" \n")
  149. f.write(f"type \"{stage_csv[:-4]}.bcp2.log\" | findstr -v \"1000\" \n")
  150. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", 'w', encoding='cp850') as f:
  151. f.write("@echo off & cd /d %~dp0 \n")
  152. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
  153. for index, current_table in config.iterrows():
  154. f.write(f"echo =={current_table['target']}==\n")
  155. f.write(f"echo {current_table['target']} >CON \n")
  156. f.write(f"call {current_table['target']}.bat\n\n")
  157. if __name__ == '__main__':
  158. plac.call(create)