model.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. import io
  2. import json
  3. import os
  4. from collections import Counter
  5. from dataclasses import dataclass, field
  6. from functools import cached_property
  7. from pathlib import Path
  8. import pandas as pd
  9. import pyodbc
  10. from sqlalchemy import Engine, MetaData, create_engine
  11. @dataclass
  12. class DsnConfig:
  13. user: str = "sa"
  14. password: str = "Mffu3011#"
  15. server: str = "LOCALHOST\\GLOBALCUBE"
  16. database: str = "CARLO"
  17. driver: str = "mssql"
  18. schema: str = "import"
  19. def conn_ini(self, db_type: str) -> str:
  20. return "\n".join(
  21. [
  22. f'{db_type}_SERVER="{self.server}"',
  23. f'{db_type}_USER="{self.user}"',
  24. f'{db_type}_PASSWORD="{self.password}"',
  25. f'{db_type}_DATABASE="{self.database}"',
  26. ]
  27. )
  28. @property
  29. def conn_string_pyodbc(self) -> str:
  30. if self.driver == "mssql":
  31. return ";".join(
  32. [
  33. "Driver={SQL Server Native Client 11.0}",
  34. f"Server={self.server}",
  35. f"Database={self.database}",
  36. f"Uid={self.user}",
  37. f"Pwd={self.password}",
  38. ]
  39. )
  40. if self.driver == "mysql":
  41. return f"mysql+pymysql://{self.user}:{self.password}@{self.server}/{self.database}?charset=utf8mb4"
  42. return ";".join(
  43. [
  44. "Driver={PostgreSQL Unicode}",
  45. f"Server={self.server}",
  46. "Port=5432",
  47. f"Database={self.database}",
  48. f"Uid={self.user}",
  49. f"Pwd={self.password}",
  50. ]
  51. )
  52. # f"DSN={self.server};UID={self.user};PWD={self.password}"
  53. @property
  54. def conn_string_sqlalchemy(self) -> str:
  55. if self.driver == "mssql":
  56. return (
  57. f"mssql+pyodbc://{self.user}:{self.password}@{self.server}/{self.database}?"
  58. "driver=SQL+Server+Native+Client+11.0"
  59. )
  60. if self.driver == "mysql":
  61. return f"mysql+pymysql://{self.user}:{self.password}@{self.server}/{self.database}?charset=utf8mb4"
  62. return f"pyodbc://{self.user}:{self.password}@{self.server}/{self.database}?driver={self.driver}"
  63. @property
  64. def bcp_conn_params(self) -> str:
  65. return f"-S {self.server} -d {self.database} -U {self.user} -P {self.password}"
  66. class DatabaseInspect:
  67. _cursor: pyodbc.Cursor = None
  68. _sqlalchemy_engine: Engine = None
  69. def __init__(self, dsn: DsnConfig, source=False):
  70. self.dsn = dsn
  71. self.type = "SOURCE" if source else "DEST"
  72. @property
  73. def conn_string(self) -> str:
  74. return self.dsn.conn_string_pyodbc
  75. @property
  76. def conn_string_sqlalchemy(self) -> str:
  77. return self.dsn.conn_string_sqlalchemy
  78. @property
  79. def bcp_conn_params(self) -> str:
  80. return self.dsn.bcp_conn_params
  81. @property
  82. def cursor(self) -> pyodbc.Cursor:
  83. if not self._cursor:
  84. self._cursor = self.connect()
  85. return self._cursor
  86. @property
  87. def sqlalchemy_engine(self) -> Engine:
  88. if not self._sqlalchemy_engine:
  89. self._sqlalchemy_engine = create_engine(self.conn_string_sqlalchemy)
  90. return self._sqlalchemy_engine
  91. def connect(self) -> pyodbc.Cursor:
  92. c = pyodbc.connect(self.conn_string)
  93. return c.cursor()
  94. @cached_property
  95. def tables_list(self) -> list[str]:
  96. tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
  97. views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
  98. return tables + views
  99. def get_prefix(self) -> dict[str, str]:
  100. prefix_count = Counter([t.split("$")[0] for t in self.tables_list if "$" in t])
  101. source_tables_prefix = {}
  102. for i, p in enumerate(prefix_count.most_common(), 1):
  103. if p[1] > 10 and len(p[0]) > 0:
  104. source_tables_prefix[str(i)] = p[0]
  105. if len(source_tables_prefix) > 0:
  106. return source_tables_prefix
  107. q = self.cursor.execute("select name FROM sys.databases")
  108. databases = list(sorted([x[0] for x in q.fetchall()]))
  109. if "deop00" in databases:
  110. # Special case for OPTIMA
  111. databases.remove("deop00")
  112. for i, p in enumerate(databases, 1):
  113. if p.startswith("deop"):
  114. source_tables_prefix[str(i)] = p
  115. for i, p in enumerate(databases, len(source_tables_prefix.keys()) + 1):
  116. if p.startswith("de") and p not in source_tables_prefix.values():
  117. source_tables_prefix[str(i)] = p
  118. return source_tables_prefix
  119. def get_columns(self, table: str) -> list[str]:
  120. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  121. if len(source_insp_cols) == 0:
  122. q = self.cursor.execute(
  123. "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
  124. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  125. )
  126. source_insp_cols = [col[0] for col in q.fetchall()]
  127. return source_insp_cols
  128. def get_columns_is_typeof_str(self, table: str) -> list[str]:
  129. source_insp_cols = [
  130. col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
  131. ]
  132. if len(source_insp_cols) == 0:
  133. q = self.cursor.execute(
  134. "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
  135. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  136. )
  137. source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
  138. return source_insp_cols
  139. def get_pkey(self, table: str, catalog: str) -> list[str]:
  140. source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table, catalog=catalog)]
  141. if len(source_insp_cols) == 0:
  142. self.cursor.execute(f"USE {catalog}")
  143. q = self.cursor.execute(
  144. "SELECT COLUMN_NAME "
  145. "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
  146. "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
  147. f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'"
  148. )
  149. source_insp_cols = [col[0] for col in q.fetchall()]
  150. self.cursor.execute(f"USE {self.dsn.database}")
  151. return source_insp_cols
  152. def convert_table(self, table: str) -> str:
  153. if "." in table:
  154. table = table.split(".")[-1]
  155. if "[" in table:
  156. table = table[1:-1]
  157. return table
  158. @dataclass
  159. class DbCreateConfig:
  160. name: str = "CARLO"
  161. csv_file: str = "..\\config\\CARLO.csv"
  162. clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
  163. filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
  164. source_dsn: DsnConfig = None
  165. dest_dsn: DsnConfig = None
  166. temp_db: str = "CARLOX"
  167. stage_dir: str = "..\\temp"
  168. batch_dir: str = "..\\batch"
  169. logs_dir: str = "..\\logs"
  170. sql_import_full_dir: str = "..\\exec\\import_full"
  171. sql_import_inc_dir: str = "..\\exec\\import_inc"
  172. scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
  173. source_inspect: DatabaseInspect = None
  174. dest_inspect: DatabaseInspect = None
  175. _sqlalchemy_engine: Engine = None
  176. @property
  177. def conn_ini(self) -> str:
  178. return "\n".join(
  179. [
  180. f'SQL_TEMP="{self.stage_dir}"',
  181. f'SQL_BATCH="{self.batch_dir}"',
  182. f'SQL_LOGS="{self.logs_dir}"',
  183. ]
  184. )
  185. @property
  186. def temp_db_sqlalchemy_engine(self) -> Engine:
  187. if not self._sqlalchemy_engine:
  188. self._sqlalchemy_engine = create_engine(
  189. f"mssql+pyodbc://{self.dest_dsn.user}:{self.dest_dsn.password}@{self.dest_dsn.server}/{self.temp_db}?"
  190. "driver=SQL+Server+Native+Client+11.0"
  191. )
  192. source_meta = MetaData()
  193. source_meta.reflect(bind=self._sqlalchemy_engine, schema="temp")
  194. return self._sqlalchemy_engine
  195. @staticmethod
  196. def load_config(config_file: str):
  197. cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
  198. base_dir = Path(config_file).resolve().parent
  199. cfg_import["name"] = Path(config_file).stem
  200. if "logs_dir" not in cfg_import:
  201. cfg_import["logs_dir"] = "..\\logs"
  202. if "scripts_dir" not in cfg_import:
  203. cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
  204. if "target_dsn" in cfg_import:
  205. cfg_import["dest_dsn"] = cfg_import["target_dsn"]
  206. del cfg_import["target_dsn"]
  207. if "sql_import_full_dir" not in cfg_import:
  208. cfg_import["sql_import_full_dir"] = "..\\exec\\import_full"
  209. if "sql_import_inc_dir" not in cfg_import:
  210. cfg_import["sql_import_inc_dir"] = "..\\exec\\import_inc"
  211. for folder in [
  212. "stage_dir",
  213. "batch_dir",
  214. "logs_dir",
  215. "scripts_dir",
  216. "sql_import_full_dir",
  217. "sql_import_inc_dir",
  218. ]:
  219. if cfg_import[folder].startswith(".."):
  220. cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
  221. os.makedirs(cfg_import[folder], exist_ok=True)
  222. for folder in ["source", "dest", "diff"]:
  223. os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
  224. if ":" not in cfg_import["csv_file"]:
  225. cfg_import["csv_file"] = str((base_dir / cfg_import["csv_file"]).resolve())
  226. cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
  227. cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
  228. cfg = DbCreateConfig(**cfg_import)
  229. cfg.source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
  230. cfg.dest_inspect = DatabaseInspect(cfg.dest_dsn, source=False)
  231. DbCreateConfig._cfg = cfg
  232. return cfg
  233. @staticmethod
  234. def get_instance():
  235. return DbCreateConfig._cfg
  236. def create_db_ini(self) -> None:
  237. with open(self.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
  238. fwh.write(self.conn_ini)
  239. fwh.write("\n\n")
  240. fwh.write(self.source_dsn.conn_ini("SOURCE"))
  241. fwh.write("\n\n")
  242. fwh.write(self.dest_dsn.conn_ini("DEST"))
  243. fwh.write("\n")
  244. @dataclass
  245. class SourceTable:
  246. source: str
  247. client_db: str
  248. prefix: str
  249. @property
  250. def stage_csv(self) -> str:
  251. return ""
  252. @property
  253. def table_client(self) -> str:
  254. return ""
  255. @property
  256. def table_name(self) -> str:
  257. return ""
  258. @property
  259. def select_query(self) -> str:
  260. return ""
  261. @dataclass
  262. class DestTable:
  263. source: str
  264. dest: str
  265. dest_db: str
  266. filter_: str
  267. query: str
  268. iterative: str
  269. cols: str
  270. source_tables: list[SourceTable] = None
  271. dest_inspect: DatabaseInspect = None
  272. cfg: DbCreateConfig = None
  273. @property
  274. def table_batch_file(self) -> str:
  275. return f"{self.cfg.batch_dir}/{self.dest}.bat"
  276. @property
  277. def full_table_name(self) -> str:
  278. return f"[{self.dest_db}].[{self.cfg.dest_dsn.schema}].[{self.dest}]"
  279. @property
  280. def temp_table_name(self) -> str:
  281. return f"[{self.cfg.temp_db}].[temp].[{self.dest}]"
  282. @cached_property
  283. def columns_list(self) -> list[str]:
  284. res = self.dest_inspect.get_columns(self.dest)
  285. if "CLIENT_DB" in res:
  286. res.remove("CLIENT_DB")
  287. res.append("Client_DB")
  288. return res
  289. @cached_property
  290. def column_types(self) -> list[str]:
  291. return self.dest_inspect.get_columns_is_typeof_str(self.dest)
  292. @cached_property
  293. def primary_key(self) -> list[str]:
  294. return self.dest_inspect.get_pkey(self.dest, self.dest_db)
  295. @property
  296. def insert_query(self) -> str:
  297. return f"INSERT INTO {self.full_table_name} with (TABLOCK) SELECT * FROM {self.temp_table_name} T1"
  298. @property
  299. def delete_query(self) -> str:
  300. # pkey = self.primary_key
  301. if len(self.primary_key) == 0:
  302. return ""
  303. pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
  304. pkey_join = " AND ".join(pkey_join_list)
  305. return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
  306. class SourceTable2(SourceTable):
  307. dest_table: DestTable
  308. cfg: DbCreateConfig
  309. _select_query: str = None
  310. source_inspect: DatabaseInspect = None
  311. @cached_property
  312. def info(self) -> str:
  313. f = io.StringIO()
  314. # print("Auf beiden Seiten: " + ";".join(intersect))
  315. diff1 = self.source_columns.difference(self.dest_table.columns_list)
  316. if len(diff1) > 0:
  317. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  318. diff2 = set(self.dest_table.columns_list).difference(self.source_columns)
  319. if "Client_DB" not in diff2:
  320. f.write("echo Spalte 'Client_DB' fehlt!\n")
  321. return
  322. diff2.remove("Client_DB")
  323. if len(diff2) > 0:
  324. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  325. return f.getvalue()
  326. @property
  327. def table_client(self) -> str:
  328. return f"{self.dest_table.dest}_{self.client_db}"
  329. @property
  330. def stage_csv(self) -> str:
  331. return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
  332. @property
  333. def table_name(self) -> str:
  334. return self.source.format(self.prefix)
  335. @cached_property
  336. def source_columns(self) -> set[str]:
  337. return set(self.source_inspect.get_columns(self.table_name))
  338. @cached_property
  339. def select_query(self):
  340. if not pd.isnull(self.dest_table.query):
  341. select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
  342. elif "." in self.table_name or self.cfg.source_dsn.schema == "":
  343. if self.table_name[0] != "[":
  344. self.table_name = f"[{self.table_name}]"
  345. select_query = f"SELECT T1.* FROM {self.table_name} T1 "
  346. else:
  347. select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
  348. if not pd.isnull(self.dest_table.filter_):
  349. select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
  350. elif "WHERE" not in select_query:
  351. select_query += " WHERE 1 = 1"
  352. if "timestamp" not in self.source_columns:
  353. print(self.dest_table.dest + " hat kein timestamp-Feld")
  354. return select_query
  355. @property
  356. def select_query_with_columns(self) -> str:
  357. res = self.select_query.replace("T1.*", self.select_columns)
  358. # if "timestamp" in self.source_columns:
  359. # res += " ORDER BY T1.[timestamp] "
  360. return res
  361. @property
  362. def select_columns(self):
  363. intersect = self.source_columns.intersection(self.dest_table.columns_list)
  364. res = ""
  365. for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
  366. if col in intersect:
  367. if False and is_char_type: # vorerst deaktiviert
  368. res += f"dbo.cln(T1.[{col}]), "
  369. else:
  370. res += f"T1.[{col}], "
  371. elif col == "Client_DB":
  372. res += f"'{self.client_db}' as [Client_DB], "
  373. else:
  374. res += "'' as [" + col + "], "
  375. res = res[:-2]
  376. return res