model.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. import io
  2. import json
  3. import os
  4. from dataclasses import dataclass, field
  5. from functools import cached_property
  6. from pathlib import Path
  7. import pandas as pd
  8. import pyodbc
  9. from sqlalchemy import Engine, MetaData, create_engine
  10. @dataclass
  11. class DsnConfig:
  12. user: str = "sa"
  13. password: str = "Mffu3011#"
  14. server: str = "LOCALHOST\\GLOBALCUBE"
  15. database: str = "CARLO"
  16. driver: str = "mssql"
  17. schema: str = "import"
  18. def conn_ini(self, db_type: str) -> str:
  19. return "\n".join(
  20. [
  21. f'{db_type}_SERVER="{self.server}"',
  22. f'{db_type}_USER="{self.user}"',
  23. f'{db_type}_PASSWORD="{self.password}"',
  24. f'{db_type}_DATABASE="{self.database}"',
  25. ]
  26. )
  27. class DatabaseInspect:
  28. _cursor: pyodbc.Cursor = None
  29. _sqlalchemy_engine: Engine = None
  30. def __init__(self, dsn: DsnConfig, source=False):
  31. self.dsn = dsn
  32. self.type = "SOURCE" if source else "DEST"
  33. @property
  34. def conn_string(self) -> str:
  35. if self.dsn.driver == "mssql":
  36. return ";".join(
  37. [
  38. "Driver={SQL Server Native Client 11.0}",
  39. f"Server={self.dsn.server}",
  40. f"Database={self.dsn.database}",
  41. f"Uid={self.dsn.user}",
  42. f"Pwd={self.dsn.password}",
  43. ]
  44. )
  45. if self.dsn.driver == "mysql":
  46. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  47. return ";".join(
  48. [
  49. "Driver={PostgreSQL Unicode}",
  50. f"Server={self.dsn.server}",
  51. "Port=5432",
  52. f"Database={self.dsn.database}",
  53. f"Uid={self.dsn.user}",
  54. f"Pwd={self.dsn.password}",
  55. ]
  56. )
  57. # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  58. @property
  59. def conn_string_sqlalchemy(self) -> str:
  60. if self.dsn.driver == "mssql":
  61. return (
  62. f"mssql+pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?"
  63. "driver=SQL+Server+Native+Client+11.0"
  64. )
  65. if self.dsn.driver == "mysql":
  66. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  67. return f"pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?driver={self.dsn.driver}"
  68. @property
  69. def bcp_conn_params(self) -> str:
  70. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  71. @property
  72. def cursor(self) -> pyodbc.Cursor:
  73. if not self._cursor:
  74. self._cursor = self.connect()
  75. return self._cursor
  76. @property
  77. def sqlalchemy_engine(self) -> Engine:
  78. if not self._sqlalchemy_engine:
  79. self._sqlalchemy_engine = create_engine(self.conn_string_sqlalchemy)
  80. return self._sqlalchemy_engine
  81. def connect(self) -> pyodbc.Cursor:
  82. c = pyodbc.connect(self.conn_string)
  83. return c.cursor()
  84. @cached_property
  85. def tables_list(self) -> list[str]:
  86. tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
  87. views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
  88. return tables + views
  89. def get_prefix(self) -> dict[str, str]:
  90. source_tables_prefix = dict(
  91. enumerate(sorted(list(set([t.split("$")[0] for t in self.tables_list if "$" in t]))), 1)
  92. )
  93. if len(source_tables_prefix) == 0:
  94. q = self.cursor.execute("select name FROM sys.databases")
  95. source_tables_prefix = [x[0] for x in q.fetchall()]
  96. return source_tables_prefix
  97. def get_columns(self, table: str) -> list[str]:
  98. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  99. if len(source_insp_cols) == 0:
  100. q = self.cursor.execute(
  101. "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
  102. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  103. )
  104. source_insp_cols = [col[0] for col in q.fetchall()]
  105. return source_insp_cols
  106. def get_columns_is_typeof_str(self, table: str) -> list[str]:
  107. source_insp_cols = [
  108. col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
  109. ]
  110. if len(source_insp_cols) == 0:
  111. q = self.cursor.execute(
  112. "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
  113. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  114. )
  115. source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
  116. return source_insp_cols
  117. def get_pkey(self, table: str, catalog: str) -> list[str]:
  118. source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table, catalog=catalog)]
  119. if len(source_insp_cols) == 0:
  120. self.cursor.execute(f"USE {catalog}")
  121. q = self.cursor.execute(
  122. "SELECT COLUMN_NAME "
  123. "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
  124. "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
  125. f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'"
  126. )
  127. source_insp_cols = [col[0] for col in q.fetchall()]
  128. self.cursor.execute(f"USE {self.dsn.database}")
  129. return source_insp_cols
  130. def convert_table(self, table: str) -> str:
  131. if "." in table:
  132. table = table.split(".")[-1]
  133. if "[" in table:
  134. table = table[1:-1]
  135. return table
  136. @dataclass
  137. class DbCreateConfig:
  138. name: str = "CARLO"
  139. csv_file: str = "..\\config\\CARLO.csv"
  140. clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
  141. filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
  142. source_dsn: DsnConfig = None
  143. dest_dsn: DsnConfig = None
  144. temp_db: str = "CARLOX"
  145. stage_dir: str = "..\\temp"
  146. batch_dir: str = "..\\batch"
  147. logs_dir: str = "..\\logs"
  148. scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
  149. source_inspect: DatabaseInspect = None
  150. dest_inspect: DatabaseInspect = None
  151. _sqlalchemy_engine: Engine = None
  152. @property
  153. def conn_ini(self) -> str:
  154. return "\n".join(
  155. [
  156. f'SQL_TEMP="{self.stage_dir}"',
  157. f'SQL_BATCH="{self.batch_dir}"',
  158. f'SQL_LOGS="{self.logs_dir}"',
  159. ]
  160. )
  161. @property
  162. def temp_db_sqlalchemy_engine(self) -> Engine:
  163. if not self._sqlalchemy_engine:
  164. self._sqlalchemy_engine = create_engine(
  165. f"mssql+pyodbc://{self.dest_dsn.user}:{self.dest_dsn.password}@{self.dest_dsn.server}/{self.temp_db}?"
  166. "driver=SQL+Server+Native+Client+11.0"
  167. )
  168. source_meta = MetaData()
  169. source_meta.reflect(bind=self._sqlalchemy_engine, schema="temp")
  170. return self._sqlalchemy_engine
  171. @staticmethod
  172. def load_config(config_file: str):
  173. cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
  174. base_dir = Path(config_file).resolve().parent
  175. cfg_import["name"] = Path(config_file).stem
  176. if "logs_dir" not in cfg_import:
  177. cfg_import["logs_dir"] = "..\\logs"
  178. if "scripts_dir" not in cfg_import:
  179. cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
  180. if "target_dsn" in cfg_import:
  181. cfg_import["dest_dsn"] = cfg_import["target_dsn"]
  182. del cfg_import["target_dsn"]
  183. for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
  184. if cfg_import[folder].startswith(".."):
  185. cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
  186. os.makedirs(cfg_import[folder], exist_ok=True)
  187. for folder in ["source", "dest", "diff"]:
  188. os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
  189. if ":" not in cfg_import["csv_file"]:
  190. cfg_import["csv_file"] = str((base_dir / cfg_import["csv_file"]).resolve())
  191. cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
  192. cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
  193. cfg = DbCreateConfig(**cfg_import)
  194. cfg.source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
  195. cfg.dest_inspect = DatabaseInspect(cfg.dest_dsn, source=False)
  196. DbCreateConfig._cfg = cfg
  197. return cfg
  198. @staticmethod
  199. def get_instance():
  200. return DbCreateConfig._cfg
  201. def create_db_ini(self) -> None:
  202. with open(self.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
  203. fwh.write(self.conn_ini)
  204. fwh.write("\n\n")
  205. fwh.write(self.source_dsn.conn_ini("SOURCE"))
  206. fwh.write("\n\n")
  207. fwh.write(self.dest_dsn.conn_ini("DEST"))
  208. fwh.write("\n")
  209. @dataclass
  210. class SourceTable:
  211. source: str
  212. client_db: str
  213. prefix: str
  214. @property
  215. def stage_csv(self) -> str:
  216. return ""
  217. @property
  218. def table_client(self) -> str:
  219. return ""
  220. @property
  221. def table_name(self) -> str:
  222. return ""
  223. @property
  224. def select_query(self) -> str:
  225. return ""
  226. @dataclass
  227. class DestTable:
  228. source: str
  229. dest: str
  230. dest_db: str
  231. filter_: str
  232. query: str
  233. iterative: str
  234. cols: str
  235. source_tables: list[SourceTable] = None
  236. dest_inspect: DatabaseInspect = None
  237. cfg: DbCreateConfig = None
  238. @property
  239. def table_batch_file(self) -> str:
  240. return f"{self.cfg.batch_dir}/{self.dest}.bat"
  241. @property
  242. def full_table_name(self) -> str:
  243. return f"[{self.dest_db}].[{self.cfg.dest_dsn.schema}].[{self.dest}]"
  244. @property
  245. def temp_table_name(self) -> str:
  246. return f"[{self.cfg.temp_db}].[temp].[{self.dest}]"
  247. @cached_property
  248. def columns_list(self) -> list[str]:
  249. res = self.dest_inspect.get_columns(self.dest)
  250. if "CLIENT_DB" in res:
  251. res.remove("CLIENT_DB")
  252. res.append("Client_DB")
  253. return res
  254. @cached_property
  255. def column_types(self) -> list[str]:
  256. return self.dest_inspect.get_columns_is_typeof_str(self.dest)
  257. @cached_property
  258. def primary_key(self) -> list[str]:
  259. return self.dest_inspect.get_pkey(self.dest, self.dest_db)
  260. @property
  261. def insert_query(self) -> str:
  262. return f"INSERT INTO {self.full_table_name} with (TABLOCK) SELECT * FROM {self.temp_table_name} T1"
  263. @property
  264. def delete_query(self) -> str:
  265. # pkey = self.primary_key
  266. if len(self.primary_key) == 0:
  267. return ""
  268. pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
  269. pkey_join = " AND ".join(pkey_join_list)
  270. return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
  271. class SourceTable2(SourceTable):
  272. dest_table: DestTable
  273. cfg: DbCreateConfig
  274. _select_query: str = None
  275. source_inspect: DatabaseInspect = None
  276. info: str = ""
  277. @property
  278. def table_client(self) -> str:
  279. return f"{self.dest_table.dest}_{self.client_db}"
  280. @property
  281. def stage_csv(self) -> str:
  282. return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
  283. @property
  284. def table_name(self) -> str:
  285. return self.source.format(self.prefix)
  286. @cached_property
  287. def source_columns(self) -> set[str]:
  288. return set(self.source_inspect.get_columns(self.table_name))
  289. @cached_property
  290. def select_query(self):
  291. f = io.StringIO()
  292. # print("Auf beiden Seiten: " + ";".join(intersect))
  293. diff1 = self.source_columns.difference(self.dest_table.columns_list)
  294. if len(diff1) > 0:
  295. f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
  296. diff2 = set(self.dest_table.columns_list).difference(self.source_columns)
  297. if "Client_DB" not in diff2:
  298. f.write("echo Spalte 'Client_DB' fehlt!\n")
  299. return
  300. diff2.remove("Client_DB")
  301. if len(diff2) > 0:
  302. f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
  303. if not pd.isnull(self.dest_table.query):
  304. select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
  305. elif "." in self.table_name or self.cfg.source_dsn.schema == "":
  306. if self.table_name[0] != "[":
  307. self.table_name = f"[{self.table_name}]"
  308. select_query = f"SELECT T1.* FROM {self.table_name} T1 "
  309. else:
  310. select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
  311. if not pd.isnull(self.dest_table.filter_):
  312. select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
  313. elif "WHERE" not in select_query:
  314. select_query += " WHERE 1 = 1"
  315. if "timestamp" not in self.source_columns:
  316. print(self.dest_table.dest + " hat kein timestamp-Feld")
  317. self.info = f.getvalue()
  318. return select_query
  319. @property
  320. def select_query_with_columns(self) -> str:
  321. res = self.select_query.replace("T1.*", self.select_columns)
  322. if "timestamp" in self.source_columns:
  323. res += " ORDER BY T1.[timestamp] "
  324. return res
  325. @property
  326. def select_columns(self):
  327. intersect = self.source_columns.intersection(self.dest_table.columns_list)
  328. res = ""
  329. for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
  330. if col in intersect:
  331. if False and is_char_type: # vorerst deaktiviert
  332. res += f"dbo.cln(T1.[{col}]), "
  333. else:
  334. res += f"T1.[{col}], "
  335. elif col == "Client_DB":
  336. res += f"'{self.client_db}' as [Client_DB], "
  337. else:
  338. res += "'' as [" + col + "], "
  339. res = res[:-2]
  340. return res