model.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import json
  2. import os
  3. from dataclasses import dataclass, field
  4. from functools import cached_property
  5. from pathlib import Path
  6. import pyodbc
  7. from sqlalchemy import Engine, create_engine
  8. @dataclass
  9. class DsnConfig:
  10. user: str = "sa"
  11. password: str = "Mffu3011#"
  12. server: str = "LOCALHOST\\GLOBALCUBE"
  13. database: str = "CARLO"
  14. driver: str = "mssql"
  15. schema: str = "import"
  16. def conn_ini(self, db_type: str) -> str:
  17. return "\n".join(
  18. [
  19. f'{db_type}_SERVER="{self.server}"',
  20. f'{db_type}_USER="{self.user}"',
  21. f'{db_type}_PASSWORD="{self.password}"',
  22. f'{db_type}_DATABASE="{self.database}"',
  23. ]
  24. )
  25. @dataclass
  26. class DbCreateConfig:
  27. name: str = "CARLO"
  28. csv_file: str = "CARLO.csv"
  29. clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
  30. filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
  31. source_dsn: DsnConfig = None
  32. dest_dsn: DsnConfig = None
  33. temp_db: str = "CARLOX"
  34. stage_dir: str = "..\\temp"
  35. batch_dir: str = "..\\batch"
  36. logs_dir: str = "..\\logs"
  37. scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
  38. def conn_ini(self) -> str:
  39. return "\n".join(
  40. [
  41. f'SQL_TEMP="{self.stage_dir}"',
  42. f'SQL_BATCH="{self.batch_dir}"',
  43. f'SQL_LOGS="{self.logs_dir}"',
  44. ]
  45. )
  46. class DatabaseInspect:
  47. _cursor: pyodbc.Cursor = None
  48. _sqlalchemy_engine: Engine = None
  49. def __init__(self, dsn: DsnConfig, source=False):
  50. self.dsn = dsn
  51. self.type = "SOURCE" if source else "DEST"
  52. @property
  53. def conn_string(self) -> str:
  54. if self.dsn.driver == "mssql":
  55. return ";".join(
  56. [
  57. "Driver={SQL Server Native Client 11.0}",
  58. f"Server={self.dsn.server}",
  59. f"Database={self.dsn.database}",
  60. f"Uid={self.dsn.user}",
  61. f"Pwd={self.dsn.password}",
  62. ]
  63. )
  64. if self.dsn.driver == "mysql":
  65. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  66. return ";".join(
  67. [
  68. "Driver={PostgreSQL Unicode}",
  69. f"Server={self.dsn.server}",
  70. "Port=5432",
  71. f"Database={self.dsn.database}",
  72. f"Uid={self.dsn.user}",
  73. f"Pwd={self.dsn.password}",
  74. ]
  75. )
  76. # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  77. @property
  78. def conn_string_sqlalchemy(self) -> str:
  79. if self.dsn.driver == "mssql":
  80. return (
  81. f"mssql+pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?"
  82. "driver=SQL+Server+Native+Client+11.0"
  83. )
  84. if self.dsn.driver == "mysql":
  85. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  86. return f"pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?driver={self.dsn.driver}"
  87. @property
  88. def bcp_conn_params(self) -> str:
  89. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  90. @property
  91. def cursor(self) -> pyodbc.Cursor:
  92. if not self._cursor:
  93. self._cursor = self.connect()
  94. return self._cursor
  95. @property
  96. def sqlalchemy_engine(self) -> Engine:
  97. if not self._sqlalchemy_engine:
  98. self._sqlalchemy_engine = create_engine(self.conn_string_sqlalchemy)
  99. return self._sqlalchemy_engine
  100. def connect(self) -> pyodbc.Cursor:
  101. c = pyodbc.connect(self.conn_string)
  102. return c.cursor()
  103. @cached_property
  104. def tables_list(self) -> list[str]:
  105. tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
  106. views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
  107. return tables + views
  108. def get_prefix(self) -> dict[str, str]:
  109. source_tables_prefix = dict(
  110. enumerate(sorted(list(set([t.split("$")[0] for t in self.tables_list if "$" in t]))), 1)
  111. )
  112. if len(source_tables_prefix) == 0:
  113. q = self.cursor.execute("select name FROM sys.databases")
  114. source_tables_prefix = [x[0] for x in q.fetchall()]
  115. return source_tables_prefix
  116. def get_columns(self, table: str) -> list[str]:
  117. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  118. if len(source_insp_cols) == 0:
  119. q = self.cursor.execute(
  120. "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
  121. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  122. )
  123. source_insp_cols = [col[0] for col in q.fetchall()]
  124. return source_insp_cols
  125. def get_columns_is_typeof_str(self, table: str) -> list[str]:
  126. source_insp_cols = [
  127. col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
  128. ]
  129. if len(source_insp_cols) == 0:
  130. q = self.cursor.execute(
  131. "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
  132. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  133. )
  134. source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
  135. return source_insp_cols
  136. def get_pkey(self, table: str, catalog: str) -> list[str]:
  137. source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table, catalog=catalog)]
  138. if len(source_insp_cols) == 0:
  139. self.cursor.execute(f"USE {catalog}")
  140. q = self.cursor.execute(
  141. "SELECT COLUMN_NAME "
  142. "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
  143. "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
  144. f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'"
  145. )
  146. source_insp_cols = [col[0] for col in q.fetchall()]
  147. self.cursor.execute(f"USE {self.dsn.database}")
  148. return source_insp_cols
  149. def convert_table(self, table: str) -> str:
  150. if "." in table:
  151. table = table.split(".")[-1]
  152. if "[" in table:
  153. table = table[1:-1]
  154. return table
  155. def load_config(config_file: str) -> DbCreateConfig:
  156. cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
  157. base_dir = Path(config_file).resolve().parent
  158. cfg_import["name"] = Path(config_file).stem
  159. if "logs_dir" not in cfg_import:
  160. cfg_import["logs_dir"] = "..\\logs"
  161. if "scripts_dir" not in cfg_import:
  162. cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
  163. if "target_dsn" in cfg_import:
  164. cfg_import["dest_dsn"] = cfg_import["target_dsn"]
  165. del cfg_import["target_dsn"]
  166. for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
  167. if cfg_import[folder].startswith(".."):
  168. cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
  169. os.makedirs(cfg_import[folder], exist_ok=True)
  170. for folder in ["source", "dest", "diff"]:
  171. os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
  172. cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
  173. cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
  174. return DbCreateConfig(**cfg_import)
  175. def create_db_ini(cfg: DbCreateConfig) -> None:
  176. with open(cfg.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
  177. fwh.write(cfg.conn_ini())
  178. fwh.write("\n\n")
  179. fwh.write(cfg.source_dsn.conn_ini("SOURCE"))
  180. fwh.write("\n\n")
  181. fwh.write(cfg.dest_dsn.conn_ini("DEST"))
  182. fwh.write("\n")