Эх сурвалжийг харах

Query-Erstellung in Klassen ausgelagert

gc-server3 9 сар өмнө
parent
commit
54107082e9

+ 2 - 2
database/CARLO.json

@@ -1,7 +1,7 @@
 {
-    "csv_file": "CARLO.csv",
+    "csv_file": "..\\database\\CARLO.csv",
     "clients": {"1": "M und S Fahrzeughandel GmbH"},
-    "filter": ["01.01.2018", "01.01.2019"],
+    "filter": ["2018-01-01T00:00:00", "2019-01-01T00:00:00"],
     "source_dsn": {"user": "sa", "password": "Mffu3011#", "server": "192.168.2.41\\GLOBALCUBE", "database": "DE0017", "driver": "mssql", "schema": "dbo"},
     "target_dsn": {"user": "sa", "password": "Mffu3011#", "server": "192.168.2.41\\GLOBALCUBE", "database": "CARLO2", "driver": "mssql", "schema": "import"},
     "stage_dir": "C:\\GlobalCube\\System\\CARLO\\SQL\\temp",

+ 2 - 2
database/db_compare.py

@@ -20,11 +20,11 @@ def compare(config_file: str = "database/CARLO.json"):
     config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
 
     source_db = DatabaseInspect(cfg.source_dsn, source=True)
-    source_tables = source_db.get_tables()
+    source_tables = source_db.tables_list()
     print(json.dumps(source_db.get_prefix(), indent=2))
 
     dest_db = DatabaseInspect(cfg.dest_dsn)
-    dest_tables = dest_db.get_tables()
+    dest_tables = dest_db.tables_list()
 
     for _, current_table in config.iterrows():
         dest_row_count = {}

+ 220 - 114
database/db_create.py

@@ -1,9 +1,19 @@
+import io
 import json
+import sys
+from dataclasses import dataclass
+from functools import cached_property
 from pathlib import Path
 
 import pandas as pd
 
-from database.model import DatabaseInspect, create_db_ini, load_config
+sys.path.insert(0, "C:\\Projekte\\tools")
+from database.model import (  # noqa:E402
+    DatabaseInspect,
+    DbCreateConfig,
+    create_db_ini,
+    load_config,
+)
 
 
 def get_import_config(filename: str, db_name: str):
@@ -18,161 +28,257 @@ def get_import_config(filename: str, db_name: str):
     return df[df["dest"].notnull()]
 
 
+@dataclass
+class SourceTable:
+    source: str
+    client_db: str
+    prefix: str
+
+    @property
+    def stage_csv(self) -> str:
+        return ""
+
+    @property
+    def table_client(self) -> str:
+        return ""
+
+    @property
+    def table_name(self) -> str:
+        return ""
+
+    @property
+    def select_query(self) -> str:
+        return ""
+
+
+@dataclass
+class DestTable:
+    source: str
+    dest: str
+    dest_db: str
+    filter_: str
+    query: str
+    iterative: str
+    cols: str
+    source_tables: list[SourceTable] = None
+    dest_inspect: DatabaseInspect = None
+
+    def table_batch_file(self, batch_dir: str) -> str:
+        return f"{batch_dir}/{self.dest}.bat"
+
+    def full_table_name(self, schema_name: str) -> str:
+        return f"[{self.dest_db}].[{schema_name}].[{self.dest}]"
+
+    def temp_table_name(self, temp_db: str) -> str:
+        return f"[{temp_db}].[temp].[{self.dest}]"
+
+    @cached_property
+    def columns_list(self) -> list[str]:
+        res = self.dest_inspect.get_columns(self.dest)
+        if "CLIENT_DB" in res:
+            res.remove("CLIENT_DB")
+            res.append("Client_DB")
+        return res
+
+    @cached_property
+    def column_types(self) -> list[str]:
+        return self.dest_inspect.get_columns_is_typeof_str(self.dest)
+
+    @cached_property
+    def primary_key(self) -> list[str]:
+        return self.dest_inspect.get_pkey(self.dest, self.dest_db)
+
+    @property
+    def insert_query(self) -> str:
+        return f"INSERT INTO {self.full_table_name} SELECT * FROM {self.temp_table_name} T1"
+
+    @property
+    def delete_query(self) -> str:
+        # pkey = self.primary_key
+        if len(self.primary_key) == 0:
+            return ""
+        pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
+        pkey_join = " AND ".join(pkey_join_list)
+        return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
+
+
+class SourceTable2(SourceTable):
+    dest_table: DestTable
+    cfg: DbCreateConfig
+
+    _select_query: str = None
+    source_inspect: DatabaseInspect = None
+    info: str = ""
+
+    @property
+    def table_client(self) -> str:
+        return f"{self.dest_table.dest}_{self.client_db}"
+
+    @property
+    def stage_csv(self) -> str:
+        return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
+
+    @property
+    def table_name(self) -> str:
+        return self.source.format(self.prefix)
+
+    @cached_property
+    def select_query(self):
+        f = io.StringIO()
+        source_columns = set(self.source_inspect.get_columns(self.table_name))
+
+        intersect = source_columns.intersection(self.dest_table.columns_list)
+        # print("Auf beiden Seiten: " + ";".join(intersect))
+        diff1 = source_columns.difference(self.dest_table.columns_list)
+        if len(diff1) > 0:
+            f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
+        diff2 = set(self.dest_table.columns_list).difference(source_columns)
+        if "Client_DB" not in diff2:
+            f.write("echo Spalte 'Client_DB' fehlt!\n")
+            return
+        diff2.remove("Client_DB")
+        if len(diff2) > 0:
+            f.write("rem Nur in Ziel:   " + ";".join(diff2) + "\n")
+
+        if not pd.isnull(self.dest_table.query):
+            select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
+        elif "." in self.table_name or self.cfg.source_dsn.schema == "":
+            if self.table_name[0] != "[":
+                self.table_name = f"[{self.table_name}]"
+            select_query = f"SELECT T1.* FROM {self.table_name} T1 "
+        else:
+            select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
+
+        if not pd.isnull(self.dest_table.filter_):
+            select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
+        elif "WHERE" not in select_query:
+            select_query += " WHERE 1 = 1"
+        # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
+        select_columns = ""
+        for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
+            if col in intersect:
+                if False and is_char_type:  # vorerst deaktiviert
+                    select_columns += f"dbo.cln(T1.[{col}]), "
+                else:
+                    select_columns += f"T1.[{col}], "
+            elif col == "Client_DB":
+                select_columns += f"'{self.client_db}' as [Client_DB], "
+            else:
+                select_columns += "'' as [" + col + "], "
+
+        select_query = select_query.replace("T1.*", select_columns[:-2])
+        if "timestamp" in source_columns:
+            select_query += " ORDER BY T1.[timestamp] "
+        else:
+            print(self.dest_table.dest + " hat kein timestamp-Feld")
+        self.info = f.getvalue()
+        return select_query
+
+
 def create(config_file: str = "database/CARLO.json"):
     cfg = load_config(config_file)
     create_db_ini(cfg)
     base_dir = str(Path(config_file).parent.parent.resolve())
-    config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
 
-    source_db = DatabaseInspect(cfg.source_dsn, source=True)
-    source_tables = source_db.get_tables()
-    print(json.dumps(source_db.get_prefix(), indent=2))
+    source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
+    print(json.dumps(source_inspect.get_prefix(), indent=2))
+    SourceTable2.source_inspect = source_inspect
+    SourceTable2.cfg = cfg
 
-    dest_db = DatabaseInspect(cfg.dest_dsn)
-    dest_tables = dest_db.get_tables()
+    dest_inspect = DatabaseInspect(cfg.dest_dsn)
+    # DestTable.dest_inspect = dest_inspect
 
-    for _, current_table in config.iterrows():
-        with open(f"{cfg.batch_dir}/{current_table['dest']}.bat", "w", encoding="cp850") as f:
-            full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
+    config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
+    table_config = [DestTable(*row.values()) for row in config.to_dict(orient="records")]
+    for dest_table in table_config:
+        dest_table.dest_inspect = dest_inspect
+        dest_table.source_tables = []
+        for client_db, prefix in cfg.clients.items():
+            st = SourceTable2(dest_table.source, client_db, prefix)
+            st.dest_table = dest_table
+            dest_table.source_tables.append(st)
+
+    for dest_table in table_config:
+        with open(dest_table.table_batch_file(cfg.batch_dir), "w", encoding="cp850") as f:
+            full_table_name = dest_table.full_table_name(cfg.dest_dsn.schema)
             f.write("@echo off\n")
             f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n')
-            f.write("rem ==" + current_table["dest"] + "==\n")
+            f.write("rem ==" + dest_table.dest + "==\n")
 
-            if not current_table["dest"] in dest_tables:
-                f.write(f"echo Ziel-Tabelle '{current_table['dest']}' existiert nicht!\n")
-                print(f"Ziel-Tabelle '{current_table['dest']}' existiert nicht!")
+            if dest_table.dest not in dest_inspect.tables_list:
+                f.write(f"echo Ziel-Tabelle '{dest_table.dest}' existiert nicht!\n")
+                print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
                 continue
 
-            f.write(f"del {cfg.logs_dir}\\{current_table['dest']}*.* /Q /F >nul 2>nul\n\n")
+            f.write(f"del {cfg.logs_dir}\\{dest_table.dest}*.* /Q /F >nul 2>nul\n\n")
             f.write('if not "%1"=="" goto :increment\n')
             f.write("\n:full\n")
             f.write(f'  call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n')
 
-            dest_columns_list = dest_db.get_columns(current_table["dest"])
-            dest_column_types = dest_db.get_columns_is_typeof_str(current_table["dest"])
-
-            if "CLIENT_DB" in dest_columns_list:
-                dest_columns_list.remove("CLIENT_DB")
-                dest_columns_list.append("Client_DB")
-            dest_columns = set(dest_columns_list)
-            select_queries: dict[str, str] = {}
-
-            for client_db, prefix in cfg.clients.items():
-                table_client = f'{current_table["dest"]}_{client_db}'
-                source_table = current_table["source"].format(prefix)
-                if source_table not in source_tables:
-                    source_table2 = source_db.convert_table(source_table)
-                    if source_table2 not in source_tables:
-                        f.write(f"echo Quell-Tabelle '{source_table}' existiert nicht!\n")
-                        print(f"Quell-Tabelle '{source_table}' existiert nicht!")
+            for source_table in dest_table.source_tables:
+                if source_table.table_name not in source_inspect.tables_list:
+                    source_table2 = source_inspect.convert_table(source_table.table_name)
+                    if source_table2 not in source_inspect.tables_list:
+                        f.write(f"echo Quell-Tabelle '{source_table.table_name}' existiert nicht!\n")
+                        print(f"Quell-Tabelle '{source_table.table_name}' existiert nicht!")
                         continue
 
-                source_columns = set(source_db.get_columns(source_table))
-
-                intersect = source_columns.intersection(dest_columns)
-                # print("Auf beiden Seiten: " + ";".join(intersect))
-                diff1 = source_columns.difference(dest_columns)
-                if len(diff1) > 0:
-                    f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
-                diff2 = dest_columns.difference(source_columns)
-                if "Client_DB" not in diff2:
-                    f.write("echo Spalte 'Client_DB' fehlt!\n")
-                    print(f"Ziel-Tabelle '{current_table['dest']}' Spalte 'Client_DB' fehlt!")
+                select_query = source_table.select_query.replace("%", "%%%%")  # batch-Problem
+                f.write(source_table.info)
+                if select_query == "":
+                    print(f"Ziel-Tabelle '{dest_table.dest}' Spalte 'Client_DB' fehlt!")
                     continue
-                diff2.remove("Client_DB")
-                if len(diff2) > 0:
-                    f.write("rem Nur in Ziel:   " + ";".join(diff2) + "\n")
-
-                if not pd.isnull(current_table["query"]):
-                    select_query = current_table["query"].format(prefix, cfg.filter[0], cfg.filter[1])
-                elif "." in source_table or cfg.source_dsn.schema == "":
-                    if source_table[0] != "[":
-                        source_table = f"[{source_table}]"
-                    select_query = f"SELECT T1.* FROM {source_table} T1 "
-                else:
-                    select_query = f"SELECT T1.* FROM [{cfg.source_dsn.schema}].[{source_table}] T1 "
-
-                if not pd.isnull(current_table["filter"]):
-                    select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
-                elif "WHERE" not in select_query:
-                    select_query += " WHERE 1 = 1"
-                # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
-                select_columns = ""
-                for col, is_char_type in zip(dest_columns_list, dest_column_types):
-                    if col in intersect:
-                        if False and is_char_type:  # vorerst deaktiviert
-                            select_columns += f"dbo.cln(T1.[{col}]), "
-                        else:
-                            select_columns += f"T1.[{col}], "
-                    elif col == "Client_DB":
-                        select_columns += f"'{client_db}' as [Client_DB], "
-                    else:
-                        select_columns += "'' as [" + col + "], "
-
-                select_query = select_query.replace("T1.*", select_columns[:-2])
-                if "timestamp" in source_columns:
-                    select_query += " ORDER BY T1.[timestamp] "
-                else:
-                    print(current_table["dest"] + " hat kein timestamp-Feld")
-
-                select_query = select_query.replace("%", "%%%%")  # batch-Problem
-                select_queries[table_client] = select_query
 
-                f.write(f'  call bcp_queryout.bat "{table_client}" "{select_query}"\n')
+                f.write(f'  call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
                 f.write(
-                    f'  call bcp_in.bat "{table_client}" '
-                    f'"[{cfg.dest_dsn.schema}].[{current_table["dest"]}]" "{current_table["dest_db"]}"\n'
+                    f'  call bcp_in.bat "{source_table.table_client}" '
+                    f'"[{cfg.dest_dsn.schema}].[{dest_table.dest}]" "{dest_table.dest_db}"\n'
                 )
 
             f.write("  goto :cleanup\n\n")
             f.write(":increment\n")
-            temp_table_name = f"[{cfg.temp_db}].[temp].[{current_table['dest']}]"
-            f.write(f'  call sql_query.bat "TRUNCATE TABLE {temp_table_name}"\n\n')
-            for client_db, prefix in cfg.clients.items():
-                table_client = f'{current_table["dest"]}_{client_db}'
-                select_query = select_queries[table_client]
+
+            f.write(f'  call sql_query.bat "TRUNCATE TABLE {dest_table.temp_table_name}"\n\n')
+            for source_table in dest_table.source_tables:
+                select_query = source_table.select_query
                 convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)"
                 if "WHERE" in select_query:
                     select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND")
                 else:
                     print("Dont know where to put WHERE")
-                f.write(f'  call sql_timestamp.bat "{table_client}" "{full_table_name}" "{client_db}"\n')
-                f.write(f'  call bcp_queryout.bat "{table_client}" "{select_query}"\n')
-                f.write(f'  call bcp_in.bat "{table_client}" "[temp].[{current_table["dest"]}]" "{cfg.temp_db}"\n\n')
-
-            insert_query = f"INSERT INTO {full_table_name} SELECT * FROM {temp_table_name} T1"
-            pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
-            if len(pkey) == 0:
-                print(current_table["dest"] + " hat keinen Primaerschluessel")
-                f.write(f"  rem {current_table['dest']} hat keinen Primaerschluessel")
+                f.write(f'  call sql_timestamp.bat "{source_table.table_client}" "{full_table_name}" "{client_db}"\n')
+                f.write(f'  call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
+                f.write(
+                    f'  call bcp_in.bat "{source_table.table_client}" "[temp].[{dest_table.dest}]" "{cfg.temp_db}"\n\n'
+                )
+
+            if dest_table.delete_query == "":
+                print(dest_table.dest + " hat keinen Primaerschluessel")
+                f.write(f"  rem {dest_table.dest} hat keinen Primaerschluessel")
             else:
-                pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in pkey]
-                pkey_join = " AND ".join(pkey_join_list)
-                delete_query = f"DELETE T1 FROM {full_table_name} T1 INNER JOIN {temp_table_name} T2 ON {pkey_join}"
-                f.write(f'  call sql_query.bat "{delete_query}"\n')
+                f.write(f'  call sql_query.bat "{dest_table.delete_query}"\n')
 
-            f.write(f'  call sql_query.bat "{insert_query}"\n')
+            f.write(f'  call sql_query.bat "{dest_table.insert_query}"\n')
 
             f.write("\n:cleanup\n")
-            for client_db, prefix in cfg.clients.items():
-                stage_csv = f"{cfg.stage_dir}\\{current_table['dest']}_{client_db}.csv"
-                f.write(f'  del "{stage_csv}" /F >nul 2>nul\n')
+            for source_table in dest_table.source_tables:
+                f.write(f'  call delete.bat "{source_table.stage_csv}"\n')
 
     with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
         f.write("@echo off & cd /d %~dp0\n")
         f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
-        for index, current_table in config.iterrows():
-            f.write(f"echo =={current_table['dest']}==\n")
-            f.write(f"echo {current_table['dest']} >CON\n")
-            f.write(f"call {cfg.batch_dir}\\{current_table['dest']}.bat 1\n\n")
+        for dest_table in table_config:
+            f.write(f"echo =={dest_table.dest}==\n")
+            f.write(f"echo {dest_table.dest} >CON\n")
+            f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat 1\n\n")
 
     with open(f"{cfg.batch_dir}/_{cfg.name}_full_load.bat", "w", encoding="cp850") as f:
         f.write("@echo off & cd /d %~dp0\n")
         f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
-        for index, current_table in config.iterrows():
-            f.write(f"echo =={current_table['dest']}==\n")
-            f.write(f"echo {current_table['dest']} >CON\n")
-            f.write(f"call {cfg.batch_dir}\\{current_table['dest']}.bat\n\n")
+        for dest_table in table_config:
+            f.write(f"echo =={dest_table.dest}==\n")
+            f.write(f"echo {dest_table.dest} >CON\n")
+            f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat\n\n")
 
 
 if __name__ == "__main__":

+ 25 - 24
database/model.py

@@ -1,10 +1,11 @@
 import json
 import os
 from dataclasses import dataclass, field
+from functools import cached_property
 from pathlib import Path
 
 import pyodbc
-from sqlalchemy import create_engine
+from sqlalchemy import Engine, create_engine
 
 
 @dataclass
@@ -16,7 +17,7 @@ class DsnConfig:
     driver: str = "mssql"
     schema: str = "import"
 
-    def conn_ini(self, db_type: str):
+    def conn_ini(self, db_type: str) -> str:
         return "\n".join(
             [
                 f'{db_type}_SERVER="{self.server}"',
@@ -41,7 +42,7 @@ class DbCreateConfig:
     logs_dir: str = "..\\logs"
     scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
 
-    def conn_ini(self):
+    def conn_ini(self) -> str:
         return "\n".join(
             [
                 f'SQL_TEMP="{self.stage_dir}"',
@@ -52,15 +53,15 @@ class DbCreateConfig:
 
 
 class DatabaseInspect:
-    tables = []
+    _cursor: pyodbc.Cursor = None
+    _sqlalchemy_engine: Engine = None
 
     def __init__(self, dsn: DsnConfig, source=False):
         self.dsn = dsn
         self.type = "SOURCE" if source else "DEST"
-        self.cursor = self.connect()
 
     @property
-    def conn_string(self):
+    def conn_string(self) -> str:
         if self.dsn.driver == "mssql":
             return ";".join(
                 [
@@ -86,7 +87,7 @@ class DatabaseInspect:
         # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
 
     @property
-    def conn_string_sqlalchemy(self):
+    def conn_string_sqlalchemy(self) -> str:
         if self.dsn.driver == "mssql":
             return (
                 f"mssql+pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?"
@@ -97,41 +98,41 @@ class DatabaseInspect:
         return f"pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?driver={self.dsn.driver}"
 
     @property
-    def bcp_conn_params(self):
+    def bcp_conn_params(self) -> str:
         return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
 
     @property
-    def cursor(self):
+    def cursor(self) -> pyodbc.Cursor:
         if not self._cursor:
             self._cursor = self.connect()
         return self._cursor
 
     @property
-    def sqlalchemy_engine(self):
+    def sqlalchemy_engine(self) -> Engine:
         if not self._sqlalchemy_engine:
             self._sqlalchemy_engine = create_engine(self.conn_string_sqlalchemy)
         return self._sqlalchemy_engine
 
-    def connect(self):
+    def connect(self) -> pyodbc.Cursor:
         c = pyodbc.connect(self.conn_string)
         return c.cursor()
 
-    def get_tables(self):
+    @cached_property
+    def tables_list(self) -> list[str]:
         tables = [x[2] for x in self.cursor.tables(tableType="TABLE")]
         views = [x[2] for x in self.cursor.tables(tableType="VIEW")]
-        self.tables = tables + views
-        return self.tables
+        return tables + views
 
-    def get_prefix(self):
-        if (len(self.tables)) == 0:
-            self.get_tables()
-        source_tables_prefix = dict(enumerate(sorted(list(set([t.split("$")[0] for t in self.tables if "$" in t]))), 1))
+    def get_prefix(self) -> dict[str, str]:
+        source_tables_prefix = dict(
+            enumerate(sorted(list(set([t.split("$")[0] for t in self.tables_list if "$" in t]))), 1)
+        )
         if len(source_tables_prefix) == 0:
             q = self.cursor.execute("select name FROM sys.databases")
             source_tables_prefix = [x[0] for x in q.fetchall()]
         return source_tables_prefix
 
-    def get_columns(self, table):
+    def get_columns(self, table: str) -> list[str]:
         source_insp_cols = [col.column_name for col in self.cursor.columns(table=table)]
         if len(source_insp_cols) == 0:
             q = self.cursor.execute(
@@ -141,7 +142,7 @@ class DatabaseInspect:
             source_insp_cols = [col[0] for col in q.fetchall()]
         return source_insp_cols
 
-    def get_columns_is_typeof_str(self, table):
+    def get_columns_is_typeof_str(self, table: str) -> list[str]:
         source_insp_cols = [
             col.data_type in [pyodbc.SQL_CHAR, pyodbc.SQL_VARCHAR] for col in self.cursor.columns(table=table)
         ]
@@ -153,7 +154,7 @@ class DatabaseInspect:
             source_insp_cols = [len(col[0]) > 0 for col in q.fetchall()]
         return source_insp_cols
 
-    def get_pkey(self, table, catalog):
+    def get_pkey(self, table: str, catalog: str) -> list[str]:
         source_insp_cols = [col.column_name for col in self.cursor.primaryKeys(table=table, catalog=catalog)]
         if len(source_insp_cols) == 0:
             self.cursor.execute(f"USE {catalog}")
@@ -167,7 +168,7 @@ class DatabaseInspect:
             self.cursor.execute(f"USE {self.dsn.database}")
         return source_insp_cols
 
-    def convert_table(self, table):
+    def convert_table(self, table: str) -> str:
         if "." in table:
             table = table.split(".")[-1]
         if "[" in table:
@@ -175,7 +176,7 @@ class DatabaseInspect:
         return table
 
 
-def load_config(config_file: str):
+def load_config(config_file: str) -> DbCreateConfig:
     cfg_import = json.load(open(config_file, "r", encoding="latin-1"))
     base_dir = Path(config_file).resolve().parent
     cfg_import["name"] = Path(config_file).stem
@@ -198,7 +199,7 @@ def load_config(config_file: str):
     return DbCreateConfig(**cfg_import)
 
 
-def create_db_ini(cfg: DbCreateConfig):
+def create_db_ini(cfg: DbCreateConfig) -> None:
     with open(cfg.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
         fwh.write(cfg.conn_ini())
         fwh.write("\n\n")