model.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import json
  2. import os
  3. from dataclasses import dataclass, field
  4. from pathlib import Path
  5. import pyodbc
  6. @dataclass
  7. class DsnConfig:
  8. user: str = "sa"
  9. password: str = "Mffu3011#"
  10. server: str = "LOCALHOST\\GLOBALCUBE"
  11. database: str = "CARLO"
  12. driver: str = "mssql"
  13. schema: str = "import"
  14. @dataclass
  15. class DbCreateConfig:
  16. name: str = "CARLO"
  17. csv_file: str = "CARLO.csv"
  18. clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
  19. filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
  20. source_dsn: DsnConfig = None
  21. target_dsn: DsnConfig = None
  22. stage_dir: str = "..\\temp"
  23. batch_dir: str = "..\\batch"
  24. logs_dir: str = "..\\logs"
  25. class DatabaseInspect:
  26. tables = []
  27. def __init__(self, dsn: DsnConfig, source=False):
  28. self.dsn = dsn
  29. self.type = "SOURCE" if source else "DEST"
  30. self.cursor = self.connect()
  31. @property
  32. def conn_string(self):
  33. if self.dsn.driver == "mssql":
  34. return ";".join(
  35. [
  36. "Driver={SQL Server Native Client 11.0}",
  37. f"Server={self.dsn.server}",
  38. f"Database={self.dsn.database}",
  39. f"Uid={self.dsn.user}",
  40. f"Pwd={self.dsn.password}",
  41. ]
  42. )
  43. if self.dsn.driver == "mysql":
  44. return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
  45. return ";".join(
  46. [
  47. "Driver={PostgreSQL Unicode}",
  48. f"Server={self.dsn.server}",
  49. "Port=5432",
  50. f"Database={self.dsn.database}",
  51. f"Uid={self.dsn.user}",
  52. f"Pwd={self.dsn.password}",
  53. ]
  54. )
  55. # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
  56. @property
  57. def conn_ini(self):
  58. return "\r\n".join(
  59. [
  60. f'{self.type}_SERVER="{self.dsn.server}"',
  61. f'{self.type}_USER="{self.dsn.user}"',
  62. f'{self.type}_PASSWORD="{self.dsn.password}"',
  63. f'{self.type}_DATABASE="{self.dsn.database}"',
  64. ]
  65. )
  66. @property
  67. def bcp_conn_params(self):
  68. return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
  69. def connect(self):
  70. c = pyodbc.connect(self.conn_string)
  71. return c.cursor()
  72. def get_tables(self):
  73. tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
  74. views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
  75. self.tables = tables + views
  76. return self.tables
  77. def get_prefix(self):
  78. if (len(self.tables)) == 0:
  79. self.get_tables()
  80. source_tables_prefix = dict(enumerate(sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1))
  81. if len(source_tables_prefix) == 0:
  82. q = self.cursor.execute("select name FROM sys.databases")
  83. source_tables_prefix = [x[0] for x in q.fetchall()]
  84. return source_tables_prefix
  85. def get_columns(self, table):
  86. source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
  87. if len(source_insp_cols) == 0:
  88. q = self.cursor.execute(
  89. "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
  90. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  91. )
  92. source_insp_cols = [col[0] for col in q.fetchall()]
  93. return source_insp_cols
  94. def get_columns_is_typeof_str(self, table):
  95. source_insp_cols = [
  96. col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
  97. ]
  98. if len(source_insp_cols) == 0:
  99. q = self.cursor.execute(
  100. "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
  101. + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
  102. )
  103. source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
  104. return source_insp_cols
  105. def get_pkey(self, table):
  106. source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table)]
  107. if len(source_insp_cols) == 0:
  108. q = self.cursor.execute(
  109. "SELECT COLUMN_NAME "
  110. "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
  111. "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
  112. f"AND TABLE_NAME = '{self.convert_table(table)}' " # AND TABLE_SCHEMA = 'dbo'"
  113. )
  114. source_insp_cols = [col[0] for col in q.fetchall()]
  115. return source_insp_cols
  116. def convert_table(self, table):
  117. if "." in table:
  118. table = table.split(".")[-1]
  119. if "[" in table:
  120. table = table[1:-1]
  121. return table
  122. def load_config(config_file: str):
  123. cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
  124. base_dir = Path(config_file).resolve().parent
  125. cfg_import["name"] = Path(config_file).stem
  126. if "logs_dir" not in cfg_import:
  127. cfg_import["logs_dir"] = "..\\logs"
  128. for folder in ["stage_dir", "batch_dir", "logs_dir"]:
  129. if cfg_import[folder].startswith(".."):
  130. cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
  131. os.makedirs(cfg_import[folder], exist_ok=True)
  132. cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
  133. cfg_import["target_dsn"] = DsnConfig(**cfg_import["target_dsn"])
  134. return DbCreateConfig(**cfg_import)