浏览代码

dataclasses in model ausgelagert

gc-server3 10 月之前
父节点
当前提交
dc857bbfb8
共有 2 个文件被更改,包括 156 次插入152 次删除
  1. 1 152
      database/db_create.py
  2. 155 0
      database/model.py

+ 1 - 152
database/db_create.py

@@ -1,159 +1,8 @@
 import json
-import os
-from dataclasses import dataclass, field
 from pathlib import Path
 
 import pandas as pd
-import pyodbc
-
-
-@dataclass
-class DsnConfig:
-    user: str = "sa"
-    password: str = "Mffu3011#"
-    server: str = "LOCALHOST\\GLOBALCUBE"
-    database: str = "CARLO"
-    driver: str = "mssql"
-    schema: str = "import"
-
-
-@dataclass
-class DbCreateConfig:
-    name: str = "CARLO"
-    csv_file: str = "CARLO.csv"
-    clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
-    filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
-    source_dsn: DsnConfig = None
-    target_dsn: DsnConfig = None
-    stage_dir: str = "..\\temp"
-    batch_dir: str = "..\\batch"
-    logs_dir: str = "..\\logs"
-
-
-class DatabaseInspect:
-    tables = []
-
-    def __init__(self, dsn: DsnConfig, source=False):
-        self.dsn = dsn
-        self.type = "SOURCE" if source else "DEST"
-        self.cursor = self.connect()
-
-    @property
-    def conn_string(self):
-        if self.dsn.driver == "mssql":
-            return ";".join(
-                [
-                    "Driver={SQL Server Native Client 11.0}",
-                    f"Server={self.dsn.server}",
-                    f"Database={self.dsn.database}",
-                    f"Uid={self.dsn.user}",
-                    f"Pwd={self.dsn.password}",
-                ]
-            )
-        if self.dsn.driver == "mysql":
-            return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
-        return ";".join(
-            [
-                "Driver={PostgreSQL Unicode}",
-                f"Server={self.dsn.server}",
-                "Port=5432",
-                f"Database={self.dsn.database}",
-                f"Uid={self.dsn.user}",
-                f"Pwd={self.dsn.password}",
-            ]
-        )
-        # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
-
-    @property
-    def conn_ini(self):
-        return "\r\n".join(
-            [
-                f'{self.type}_SERVER="{self.dsn.server}"',
-                f'{self.type}_USER="{self.dsn.user}"',
-                f'{self.type}_PASSWORD="{self.dsn.password}"',
-                f'{self.type}_DATABASE="{self.dsn.database}"',
-            ]
-        )
-
-    @property
-    def bcp_conn_params(self):
-        return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
-
-    def connect(self):
-        c = pyodbc.connect(self.conn_string)
-        return c.cursor()
-
-    def get_tables(self):
-        tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
-        views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
-        self.tables = tables + views
-        return self.tables
-
-    def get_prefix(self):
-        if (len(self.tables)) == 0:
-            self.get_tables()
-        source_tables_prefix = dict(enumerate(sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1))
-        if len(source_tables_prefix) == 0:
-            q = self.cursor.execute("select name FROM sys.databases")
-            source_tables_prefix = [x[0] for x in q.fetchall()]
-        return source_tables_prefix
-
-    def get_columns(self, table):
-        source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
-        if len(source_insp_cols) == 0:
-            q = self.cursor.execute(
-                "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
-                + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
-            )
-            source_insp_cols = [col[0] for col in q.fetchall()]
-        return source_insp_cols
-
-    def get_columns_is_typeof_str(self, table):
-        source_insp_cols = [
-            col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
-        ]
-        if len(source_insp_cols) == 0:
-            q = self.cursor.execute(
-                "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
-                + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
-            )
-            source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
-        return source_insp_cols
-
-    def get_pkey(self, table):
-        source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table)]
-        if len(source_insp_cols) == 0:
-            q = self.cursor.execute(
-                "SELECT COLUMN_NAME "
-                "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
-                "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
-                f"AND TABLE_NAME = '{self.convert_table(table)}' "  # AND TABLE_SCHEMA = 'dbo'"
-            )
-            source_insp_cols = [col[0] for col in q.fetchall()]
-        return source_insp_cols
-
-    def convert_table(self, table):
-        if "." in table:
-            table = table.split(".")[-1]
-        if "[" in table:
-            table = table[1:-1]
-        return table
-
-
-def load_config(config_file: str):
-    cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
-    base_dir = Path(config_file).resolve().parent
-    cfg_import["name"] = Path(config_file).stem
-    if "logs_dir" not in cfg_import:
-        cfg_import["logs_dir"] = "..\\logs"
-
-    for folder in ["stage_dir", "batch_dir", "logs_dir"]:
-        if cfg_import[folder].startswith(".."):
-            cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
-        os.makedirs(cfg_import[folder], exist_ok=True)
-    cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
-    cfg_import["target_dsn"] = DsnConfig(**cfg_import["target_dsn"])
-    return DbCreateConfig(**cfg_import)
+from model import DatabaseInspect, load_config
 
 
 def get_import_config(filename: str, db_name: str):

+ 155 - 0
database/model.py

@@ -0,0 +1,155 @@
+import json
+import os
+from dataclasses import dataclass, field
+from pathlib import Path
+
+import pyodbc
+
+
+@dataclass
+class DsnConfig:
+    user: str = "sa"
+    password: str = "Mffu3011#"
+    server: str = "LOCALHOST\\GLOBALCUBE"
+    database: str = "CARLO"
+    driver: str = "mssql"
+    schema: str = "import"
+
+
+@dataclass
+class DbCreateConfig:
+    name: str = "CARLO"
+    csv_file: str = "CARLO.csv"
+    clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
+    filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
+    source_dsn: DsnConfig = None
+    target_dsn: DsnConfig = None
+    stage_dir: str = "..\\temp"
+    batch_dir: str = "..\\batch"
+    logs_dir: str = "..\\logs"
+
+
+class DatabaseInspect:
+    tables = []
+
+    def __init__(self, dsn: DsnConfig, source=False):
+        self.dsn = dsn
+        self.type = "SOURCE" if source else "DEST"
+        self.cursor = self.connect()
+
+    @property
+    def conn_string(self):
+        if self.dsn.driver == "mssql":
+            return ";".join(
+                [
+                    "Driver={SQL Server Native Client 11.0}",
+                    f"Server={self.dsn.server}",
+                    f"Database={self.dsn.database}",
+                    f"Uid={self.dsn.user}",
+                    f"Pwd={self.dsn.password}",
+                ]
+            )
+        if self.dsn.driver == "mysql":
+            return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
+        return ";".join(
+            [
+                "Driver={PostgreSQL Unicode}",
+                f"Server={self.dsn.server}",
+                "Port=5432",
+                f"Database={self.dsn.database}",
+                f"Uid={self.dsn.user}",
+                f"Pwd={self.dsn.password}",
+            ]
+        )
+        # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
+
+    @property
+    def conn_ini(self):
+        return "\r\n".join(
+            [
+                f'{self.type}_SERVER="{self.dsn.server}"',
+                f'{self.type}_USER="{self.dsn.user}"',
+                f'{self.type}_PASSWORD="{self.dsn.password}"',
+                f'{self.type}_DATABASE="{self.dsn.database}"',
+            ]
+        )
+
+    @property
+    def bcp_conn_params(self):
+        return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
+
+    def connect(self):
+        c = pyodbc.connect(self.conn_string)
+        return c.cursor()
+
+    def get_tables(self):
+        tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
+        views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
+        self.tables = tables + views
+        return self.tables
+
+    def get_prefix(self):
+        if (len(self.tables)) == 0:
+            self.get_tables()
+        source_tables_prefix = dict(enumerate(sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1))
+        if len(source_tables_prefix) == 0:
+            q = self.cursor.execute("select name FROM sys.databases")
+            source_tables_prefix = [x[0] for x in q.fetchall()]
+        return source_tables_prefix
+
+    def get_columns(self, table):
+        source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
+        if len(source_insp_cols) == 0:
+            q = self.cursor.execute(
+                "SELECT COLUMN_NAME as column_name FROM information_schema.columns "
+                + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
+            )
+            source_insp_cols = [col[0] for col in q.fetchall()]
+        return source_insp_cols
+
+    def get_columns_is_typeof_str(self, table):
+        source_insp_cols = [
+            col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
+        ]
+        if len(source_insp_cols) == 0:
+            q = self.cursor.execute(
+                "SELECT COLLATION_NAME as column_collation FROM information_schema.columns "
+                + f"WHERE TABLE_NAME = '{self.convert_table(table)}'"
+            )
+            source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
+        return source_insp_cols
+
+    def get_pkey(self, table):
+        source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table)]
+        if len(source_insp_cols) == 0:
+            q = self.cursor.execute(
+                "SELECT COLUMN_NAME "
+                "FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
+                "WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 "
+                f"AND TABLE_NAME = '{self.convert_table(table)}' "  # AND TABLE_SCHEMA = 'dbo'"
+            )
+            source_insp_cols = [col[0] for col in q.fetchall()]
+        return source_insp_cols
+
+    def convert_table(self, table):
+        if "." in table:
+            table = table.split(".")[-1]
+        if "[" in table:
+            table = table[1:-1]
+        return table
+
+
+def load_config(config_file: str):
+    cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
+    base_dir = Path(config_file).resolve().parent
+    cfg_import["name"] = Path(config_file).stem
+    if "logs_dir" not in cfg_import:
+        cfg_import["logs_dir"] = "..\\logs"
+
+    for folder in ["stage_dir", "batch_dir", "logs_dir"]:
+        if cfg_import[folder].startswith(".."):
+            cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
+        os.makedirs(cfg_import[folder], exist_ok=True)
+    cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
+    cfg_import["target_dsn"] = DsnConfig(**cfg_import["target_dsn"])
+    return DbCreateConfig(**cfg_import)