db_create.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import json
  2. from collections import namedtuple
  3. from pathlib import Path
  4. import pandas as pd
  5. import pyodbc
  6. # from re import escape
  7. # from numpy import select
  8. # from dataclasses import dataclass
  9. DbCreateConfig = namedtuple(
  10. "DbCreateConfig",
  11. "name csv_file clients filter source_dsn target_dsn stage_dir batch_dir",
  12. )
  13. DsnConfig = namedtuple("DsnConfig", "user password server database driver schema")
  14. cfg = DbCreateConfig(
  15. **{
  16. "name": "CARLO",
  17. "csv_file": "CARLO.csv",
  18. "clients": {"1": "M und S Fahrzeughandel GmbH"},
  19. "filter": ["01.01.2018", "01.01.2019"],
  20. "source_dsn": {
  21. "user": "sa",
  22. "password": "Mffu3011#",
  23. "server": "GC-SERVER1\\GLOBALCUBE",
  24. "database": "DE0017",
  25. "driver": "mssql",
  26. "schema": "dbo",
  27. },
  28. "target_dsn": {
  29. "user": "sa",
  30. "password": "Mffu3011#",
  31. "server": "GC-SERVER1\\GLOBALCUBE",
  32. "database": "CARLO2",
  33. "driver": "mssql",
  34. "schema": "import",
  35. },
  36. "stage_dir": "..\\temp",
  37. "batch_dir": "..\\batch",
  38. "logs_dir": "..\\logs",
  39. }
  40. )
  41. class database_inspect:
  42. tables = []
  43. def __init__(self, dsn):
  44. self.dsn = DsnConfig(**dsn)
  45. self.cursor = self.connect()
  46. def conn_string(self):
  47. if self.dsn.driver == "mssql":
  48. return (
  49. "Driver={SQL Server Native Client 11.0};"
  50. + f"Server={self.dsn.server};Database={self.dsn.database};Uid={self.dsn.user};Pwd={self.dsn.password}"
  51. )
  52. if self.dsn.driver == "mysql":
  53. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  54. return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  55. def bcp_conn_params(self):
  56. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  57. def connect(self):
  58. c = pyodbc.connect(self.conn_string())
  59. return c.cursor()
  60. def get_tables(self):
  61. tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
  62. views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
  63. self.tables = tables + views
  64. return self.tables
  65. def get_prefix(self):
  66. if (len(self.tables)) == 0:
  67. self.get_tables()
  68. source_tables_prefix = dict(
  69. enumerate(
  70. sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1
  71. )
  72. )
  73. if len(source_tables_prefix) == 0:
  74. q = self.cursor.execute("select name FROM sys.databases")
  75. source_tables_prefix = [x[0] for x in q.fetchall()]
  76. return source_tables_prefix
  77. def get_columns(self, table):
  78. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  79. if len(source_insp_cols) == 0:
  80. q = self.cursor.execute(
  81. "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
  82. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  83. )
  84. source_insp_cols = [col[0] for col in q.fetchall()]
  85. return source_insp_cols
  86. def convert_table(self, table):
  87. if "." in table:
  88. table = table.split(".")[-1]
  89. if "[" in table:
  90. table = table[1:-1]
  91. return table
  92. def load_config(config_file: str):
  93. cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
  94. base_dir = Path(config_file).resolve().parent
  95. cfg_import["name"] = Path(config_file).stem
  96. if cfg_import["stage_dir"][:2] == "..":
  97. cfg_import["stage_dir"] = str(
  98. base_dir.joinpath(cfg_import["stage_dir"]).resolve()
  99. )
  100. if cfg_import["batch_dir"][:2] == "..":
  101. cfg_import["batch_dir"] = str(
  102. base_dir.joinpath(cfg_import["batch_dir"]).resolve()
  103. )
  104. if "logs_dir" not in cfg_import:
  105. cfg_import["logs_dir"] = "..\\logs"
  106. if cfg_import["batch_dir"][:2] == "..":
  107. cfg_import["batch_dir"] = str(
  108. base_dir.joinpath(cfg_import["logs_dir"]).resolve()
  109. )
  110. return DbCreateConfig(**cfg_import)
  111. def create(config_file="dbtools/OPTIMA.json"): #
  112. cfg = load_config(config_file)
  113. base_dir = str(Path(cfg.batch_dir).parent)
  114. df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")
  115. config = df[df["target"].notnull()]
  116. # print(config.head())
  117. source_db = database_inspect(cfg.source_dsn)
  118. source_tables = source_db.get_tables()
  119. print(source_db.get_prefix())
  120. target_db = database_inspect(cfg.target_dsn)
  121. target_tables = target_db.get_tables()
  122. for index, current_table in config.iterrows():
  123. with open(
  124. f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850"
  125. ) as f:
  126. f.write("@echo off \n")
  127. f.write("rem ==" + current_table["target"] + "==\n")
  128. if not current_table["target"] in target_tables:
  129. f.write(
  130. f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n"
  131. )
  132. print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
  133. continue
  134. f.write(
  135. f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n"
  136. )
  137. f.write(
  138. f"sqlcmd.exe {target_db.bcp_conn_params()} -p "
  139. + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n"
  140. )
  141. target_columns_list = target_db.get_columns(current_table["target"])
  142. if "CLIENT_DB" in target_columns_list:
  143. target_columns_list.remove("CLIENT_DB")
  144. target_columns_list.append("Client_DB")
  145. target_columns = set(target_columns_list)
  146. for client_db, prefix in cfg.clients.items():
  147. source_table = current_table["source"].format(prefix)
  148. if source_table not in source_tables:
  149. source_table2 = source_db.convert_table(source_table)
  150. if source_table2 not in source_tables:
  151. f.write(
  152. f"echo Quell-Tabelle '{source_table}' existiert nicht!\n"
  153. )
  154. print(f"Quell-Tabelle '{source_table}' existiert nicht!")
  155. continue
  156. source_columns = set(source_db.get_columns(source_table))
  157. intersect = source_columns.intersection(target_columns)
  158. # print("Auf beiden Seiten: " + ";".join(intersect))
  159. diff1 = source_columns.difference(target_columns)
  160. if len(diff1) > 0:
  161. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  162. diff2 = target_columns.difference(source_columns)
  163. if "Client_DB" not in diff2:
  164. f.write("echo Spalte 'Client_DB' fehlt!\n")
  165. print(
  166. f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!"
  167. )
  168. continue
  169. diff2.remove("Client_DB")
  170. if len(diff2) > 0:
  171. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  172. if not pd.isnull(current_table["query"]):
  173. select_query = current_table["query"].format(
  174. prefix, cfg.filter[0], cfg.filter[1]
  175. )
  176. elif "." in source_table or cfg.source_dsn["schema"] == "":
  177. if source_table[0] != "[":
  178. source_table = f"[{source_table}]"
  179. select_query = f"SELECT T1.* FROM {source_table} T1 "
  180. else:
  181. select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
  182. if not pd.isnull(current_table["filter"]):
  183. select_query += " WHERE " + current_table["filter"].format(
  184. "", cfg.filter[0], cfg.filter[1]
  185. )
  186. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  187. select_columns = ""
  188. for col in target_columns_list:
  189. if col in intersect:
  190. select_columns += f"T1.[{col}], "
  191. elif col == "Client_DB":
  192. select_columns += "'" + client_db + '\' as \\"Client_DB\\", '
  193. else:
  194. select_columns += "'' as \\\"" + col + '\\", '
  195. select_query = select_query.replace("T1.*", select_columns[:-2])
  196. select_query = select_query.replace("%", "%%") # batch-Problem
  197. stage_csv = (
  198. f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
  199. )
  200. # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
  201. # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
  202. # print(select_query)
  203. bulk_copy = "bcp" if cfg.source_dsn["driver"] == "mssql" else "cet"
  204. f.write(
  205. f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 '
  206. + f'-e "{stage_csv[:-4]}.queryout.log" > "{stage_csv[:-4]}.bcp1.log" \n'
  207. )
  208. f.write(f'type "{stage_csv[:-4]}.bcp1.log" | findstr -v "1000" \n')
  209. f.write(
  210. f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} "
  211. + f'-c -C 65001 -m 1000 -e "{stage_csv[:-4]}.in.log" > "{stage_csv[:-4]}.bcp2.log" \n'
  212. )
  213. f.write(f'type "{stage_csv[:-4]}.bcp2.log" | findstr -v "1000" \n')
  214. f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
  215. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
  216. f.write("@echo off & cd /d %~dp0 \n")
  217. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
  218. for index, current_table in config.iterrows():
  219. f.write(f"echo =={current_table['target']}==\n")
  220. f.write(f"echo {current_table['target']} >CON \n")
  221. f.write(f"call {current_table['target']}.bat\n\n")
  222. if __name__ == "__main__":
  223. create()