db_create.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import pandas as pd
  2. import json
  3. from pathlib import Path
  4. from collections import namedtuple
  5. import pyodbc
  6. # from re import escape
  7. # from numpy import select
  8. # from dataclasses import dataclass
  9. DbCreateConfig = namedtuple(
  10. "DbCreateConfig",
  11. "name csv_file clients filter source_dsn target_dsn stage_dir batch_dir logs_dir",
  12. )
  13. DsnConfig = namedtuple("DsnConfig", "user password server database driver schema")
  14. cfg = DbCreateConfig(
  15. **{
  16. "name": "CARLO",
  17. "csv_file": "CARLO.csv",
  18. "clients": {"1": "M und S Fahrzeughandel GmbH"},
  19. "filter": ["01.01.2018", "01.01.2019"],
  20. "source_dsn": {
  21. "user": "sa",
  22. "password": "Mffu3011#",
  23. "server": "GC-SERVER1\\GLOBALCUBE",
  24. "database": "DE0017",
  25. "driver": "mssql",
  26. "schema": "dbo",
  27. },
  28. "target_dsn": {
  29. "user": "sa",
  30. "password": "Mffu3011#",
  31. "server": "GC-SERVER1\\GLOBALCUBE",
  32. "database": "CARLO2",
  33. "driver": "mssql",
  34. "schema": "import",
  35. },
  36. "stage_dir": "..\\temp",
  37. "batch_dir": "..\\batch",
  38. "logs_dir": "..\\logs",
  39. }
  40. )
  41. class database_inspect:
  42. tables = []
  43. def __init__(self, dsn, source=False):
  44. self.dsn = DsnConfig(**dsn)
  45. self.type = "SOURCE" if source else "DEST"
  46. self.cursor = self.connect()
  47. def conn_string(self):
  48. if self.dsn.driver == "mssql":
  49. return ";".join(
  50. [
  51. "Driver={SQL Server Native Client 11.0}",
  52. f"Server={self.dsn.server}",
  53. f"Database={self.dsn.database}",
  54. f"Uid={self.dsn.user}",
  55. f"Pwd={self.dsn.password}",
  56. ]
  57. )
  58. if self.dsn.driver == "mysql":
  59. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  60. return f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  61. def conn_ini(self):
  62. return "\r\n".join(
  63. [
  64. f'{self.type}_SERVER="{self.dsn.server}"',
  65. f'{self.type}_USER="{self.dsn.user}"',
  66. f'{self.type}_PASSWORD="{self.dsn.password}"',
  67. f'{self.type}_DATABASE="{self.dsn.database}"',
  68. ]
  69. )
  70. def bcp_conn_params(self):
  71. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  72. def connect(self):
  73. c = pyodbc.connect(self.conn_string())
  74. return c.cursor()
  75. def get_tables(self):
  76. tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
  77. views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
  78. self.tables = tables + views
  79. return self.tables
  80. def get_prefix(self):
  81. if (len(self.tables)) == 0:
  82. self.get_tables()
  83. source_tables_prefix = dict(
  84. enumerate(
  85. sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1
  86. )
  87. )
  88. if len(source_tables_prefix) == 0:
  89. q = self.cursor.execute("select name FROM sys.databases")
  90. source_tables_prefix = [x[0] for x in q.fetchall()]
  91. return source_tables_prefix
  92. def get_columns(self, table):
  93. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  94. if len(source_insp_cols) == 0:
  95. q = self.cursor.execute(
  96. "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
  97. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  98. )
  99. source_insp_cols = [col[0] for col in q.fetchall()]
  100. return source_insp_cols
  101. def get_columns_is_typeof_str(self, table):
  102. source_insp_cols = [
  103. col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR]
  104. for col in self.cursor.columns(table=table)
  105. ]
  106. if len(source_insp_cols) == 0:
  107. q = self.cursor.execute(
  108. "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
  109. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  110. )
  111. source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
  112. return source_insp_cols
  113. def convert_table(self, table):
  114. if "." in table:
  115. table = table.split(".")[-1]
  116. if "[" in table:
  117. table = table[1:-1]
  118. return table
  119. def load_config(config_file: str):
  120. cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
  121. base_dir = Path(config_file).resolve().parent
  122. cfg_import["name"] = Path(config_file).stem
  123. if cfg_import["stage_dir"][:2] == "..":
  124. cfg_import["stage_dir"] = str(
  125. base_dir.joinpath(cfg_import["stage_dir"]).resolve()
  126. )
  127. if cfg_import["batch_dir"][:2] == "..":
  128. cfg_import["batch_dir"] = str(
  129. base_dir.joinpath(cfg_import["batch_dir"]).resolve()
  130. )
  131. if "logs_dir" not in cfg_import:
  132. cfg_import["logs_dir"] = "..\\logs"
  133. if cfg_import["batch_dir"][:2] == "..":
  134. cfg_import["batch_dir"] = str(
  135. base_dir.joinpath(cfg_import["logs_dir"]).resolve()
  136. )
  137. return DbCreateConfig(**cfg_import)
  138. def create(config_file="dbtools/OPTIMA.json"): #
  139. cfg = load_config(config_file)
  140. base_dir = str(Path(cfg.batch_dir).parent)
  141. df = pd.read_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")
  142. if "cols" not in df.columns:
  143. df["target_db"] = ""
  144. df["cols"] = ""
  145. df.to_csv(f"{base_dir}/{cfg.csv_file}", sep=";", encoding="latin-1")
  146. config = df[df["target"].notnull()]
  147. # print(config.head())
  148. source_db = database_inspect(cfg.source_dsn, source=True)
  149. source_tables = source_db.get_tables()
  150. print(source_db.get_prefix())
  151. target_db = database_inspect(cfg.target_dsn)
  152. target_tables = target_db.get_tables()
  153. for _, current_table in config.iterrows():
  154. with open(
  155. f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850"
  156. ) as f:
  157. f.write("@echo off \n")
  158. f.write("rem ==" + current_table["target"] + "==\n")
  159. if not current_table["target"] in target_tables:
  160. f.write(
  161. f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n"
  162. )
  163. print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
  164. continue
  165. f.write(
  166. f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n"
  167. )
  168. f.write(
  169. f"sqlcmd.exe {target_db.bcp_conn_params()} -p "
  170. + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn['schema']}].[{current_table['target']}]\" \n"
  171. )
  172. target_columns_list = target_db.get_columns(current_table["target"])
  173. target_column_types = target_db.get_columns_is_typeof_str(
  174. current_table["target"]
  175. )
  176. if "CLIENT_DB" in target_columns_list:
  177. target_columns_list.remove("CLIENT_DB")
  178. target_columns_list.append("Client_DB")
  179. target_columns = set(target_columns_list)
  180. for client_db, prefix in cfg.clients.items():
  181. source_table = current_table["source"].format(prefix)
  182. if source_table not in source_tables:
  183. source_table2 = source_db.convert_table(source_table)
  184. if source_table2 not in source_tables:
  185. f.write(
  186. f"echo Quell-Tabelle '{source_table}' existiert nicht!\n"
  187. )
  188. print(f"Quell-Tabelle '{source_table}' existiert nicht!")
  189. continue
  190. source_columns = set(source_db.get_columns(source_table))
  191. intersect = source_columns.intersection(target_columns)
  192. # print("Auf beiden Seiten: " + ";".join(intersect))
  193. diff1 = source_columns.difference(target_columns)
  194. if len(diff1) > 0:
  195. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  196. diff2 = target_columns.difference(source_columns)
  197. if "Client_DB" not in diff2:
  198. f.write("echo Spalte 'Client_DB' fehlt!\n")
  199. print(
  200. f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!"
  201. )
  202. continue
  203. diff2.remove("Client_DB")
  204. if len(diff2) > 0:
  205. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  206. if not pd.isnull(current_table["query"]):
  207. select_query = current_table["query"].format(
  208. prefix, cfg.filter[0], cfg.filter[1]
  209. )
  210. elif "." in source_table or cfg.source_dsn["schema"] == "":
  211. if source_table[0] != "[":
  212. source_table = f"[{source_table}]"
  213. select_query = f"SELECT T1.* FROM {source_table} T1 "
  214. else:
  215. select_query = f"SELECT T1.* FROM [{cfg.source_dsn['schema']}].[{source_table}] T1 "
  216. if not pd.isnull(current_table["filter"]):
  217. select_query += " WHERE " + current_table["filter"].format(
  218. "", cfg.filter[0], cfg.filter[1]
  219. )
  220. # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
  221. select_columns = ""
  222. for col, col_type in zip(target_columns_list, target_column_types):
  223. if col in intersect:
  224. if col_type:
  225. select_columns += f"dbo.cln(T1.[{col}]), "
  226. else:
  227. select_columns += f"T1.[{col}], "
  228. elif col == "Client_DB":
  229. select_columns += f"'{client_db}' as \\\"Client_DB\\\", "
  230. else:
  231. select_columns += "'' as \\\"" + col + '\\", '
  232. select_query = select_query.replace("T1.*", select_columns[:-2])
  233. select_query = select_query.replace("%", "%%") # batch-Problem
  234. stage_csv = (
  235. f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
  236. )
  237. # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
  238. # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
  239. # print(select_query)
  240. bulk_copy = "bcp" if cfg.source_dsn["driver"] == "mssql" else "cet"
  241. f.write(
  242. f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params()} -c -C 65001 -m 1000 '
  243. + f'-e "{stage_csv[:-4]}.queryout.log" > "{stage_csv[:-4]}.bcp1.log" \n'
  244. )
  245. f.write(f'type "{stage_csv[:-4]}.bcp1.log" | findstr -v "1000" \n')
  246. f.write(
  247. f"bcp [{cfg.target_dsn['schema']}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params()} "
  248. + f'-c -C 65001 -m 1000 -e "{stage_csv[:-4]}.in.log" > "{stage_csv[:-4]}.bcp2.log" \n'
  249. )
  250. f.write(f'type "{stage_csv[:-4]}.bcp2.log" | findstr -v "1000" \n')
  251. f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
  252. with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
  253. f.write("@echo off & cd /d %~dp0 \n")
  254. f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
  255. for index, current_table in config.iterrows():
  256. f.write(f"echo =={current_table['target']}==\n")
  257. f.write(f"echo {current_table['target']} >CON \n")
  258. f.write(f"call {current_table['target']}.bat\n\n")
  259. if __name__ == "__main__":
  260. create()