Bladeren bron

db_create mit inkrementellem Ansatz

gc-server3 10 maanden geleden
bovenliggende
commit
9eeef239eb
2 gewijzigde bestanden met toevoegingen van 119 en 71 verwijderingen
  1. 80 57
      database/db_create.py
  2. 39 14
      database/model.py

+ 80 - 57
database/db_create.py

@@ -2,57 +2,61 @@ import json
 from pathlib import Path
 
 import pandas as pd
-from model import DatabaseInspect, load_config
+from model import DatabaseInspect, create_db_ini, load_config
 
 
 def get_import_config(filename: str, db_name: str):
     df = pd.read_csv(filename, sep=";", encoding="latin-1")
-    if "cols" not in df.columns:
-        df["target_db"] = db_name
+    if "dest" not in df.columns:
+        df["dest"] = df["target"]
+        df["dest_db"] = db_name
         df["cols"] = ""
-        df[["source", "target", "target_db", "filter", "query", "iterative", "cols"]].to_csv(
+        df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv(
             filename, sep=";", encoding="latin-1", index=False
         )
-    return df[df["target"].notnull()]
+    return df[df["dest"].notnull()]
 
 
 def create(config_file: str = "database/CARLO.json"):
     cfg = load_config(config_file)
+    create_db_ini(cfg)
     base_dir = str(Path(cfg.batch_dir).parent)
-    config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.target_dsn.database)
+    config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
 
     source_db = DatabaseInspect(cfg.source_dsn, source=True)
     source_tables = source_db.get_tables()
     print(json.dumps(source_db.get_prefix(), indent=2))
 
-    target_db = DatabaseInspect(cfg.target_dsn)
-    target_tables = target_db.get_tables()
+    dest_db = DatabaseInspect(cfg.dest_dsn)
+    dest_tables = dest_db.get_tables()
 
     for _, current_table in config.iterrows():
-        with open(f"{cfg.batch_dir}/{current_table['target']}.bat", "w", encoding="cp850") as f:
-            f.write("@echo off \n")
-            f.write("rem ==" + current_table["target"] + "==\n")
-
-            if not current_table["target"] in target_tables:
-                f.write(f"echo Ziel-Tabelle '{current_table['target']}' existiert nicht!\n")
-                print(f"Ziel-Tabelle '{current_table['target']}' existiert nicht!")
+        with open(f"{cfg.batch_dir}/{current_table['dest']}.bat", "w", encoding="cp850") as f:
+            full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
+            f.write("@echo off\n")
+            f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n')
+            f.write("rem ==" + current_table["dest"] + "==\n")
+
+            if not current_table["dest"] in dest_tables:
+                f.write(f"echo Ziel-Tabelle '{current_table['dest']}' existiert nicht!\n")
+                print(f"Ziel-Tabelle '{current_table['dest']}' existiert nicht!")
                 continue
 
-            f.write(f"del {cfg.stage_dir}\\{current_table['target']}*.* /Q /F >nul 2>nul \n")
-            f.write(
-                f"sqlcmd.exe {target_db.bcp_conn_params} -p "
-                + f"-Q \"TRUNCATE TABLE [{cfg.target_dsn.schema}].[{current_table['target']}]\" \n"
-            )
+            f.write(f"del {cfg.logs_dir}\\{current_table['dest']}*.* /Q /F >nul 2>nul\n\n")
+            f.write('if "%1"=="" (\n')
+            f.write(f'  call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n')
 
-            target_columns_list = target_db.get_columns(current_table["target"])
-            target_column_types = target_db.get_columns_is_typeof_str(current_table["target"])
+            dest_columns_list = dest_db.get_columns(current_table["dest"])
+            dest_column_types = dest_db.get_columns_is_typeof_str(current_table["dest"])
 
-            if "CLIENT_DB" in target_columns_list:
-                target_columns_list.remove("CLIENT_DB")
-                target_columns_list.append("Client_DB")
-            target_columns = set(target_columns_list)
+            if "CLIENT_DB" in dest_columns_list:
+                dest_columns_list.remove("CLIENT_DB")
+                dest_columns_list.append("Client_DB")
+            dest_columns = set(dest_columns_list)
+            select_queries: dict[str, str] = {}
 
             for client_db, prefix in cfg.clients.items():
+                table_client = f'{current_table["dest"]}_{client_db}'
                 source_table = current_table["source"].format(prefix)
                 if source_table not in source_tables:
                     source_table2 = source_db.convert_table(source_table)
@@ -63,15 +67,15 @@ def create(config_file: str = "database/CARLO.json"):
 
                 source_columns = set(source_db.get_columns(source_table))
 
-                intersect = source_columns.intersection(target_columns)
+                intersect = source_columns.intersection(dest_columns)
                 # print("Auf beiden Seiten: " + ";".join(intersect))
-                diff1 = source_columns.difference(target_columns)
+                diff1 = source_columns.difference(dest_columns)
                 if len(diff1) > 0:
                     f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
-                diff2 = target_columns.difference(source_columns)
+                diff2 = dest_columns.difference(source_columns)
                 if "Client_DB" not in diff2:
                     f.write("echo Spalte 'Client_DB' fehlt!\n")
-                    print(f"Ziel-Tabelle '{current_table['target']}' Spalte 'Client_DB' fehlt!")
+                    print(f"Ziel-Tabelle '{current_table['dest']}' Spalte 'Client_DB' fehlt!")
                     continue
                 diff2.remove("Client_DB")
                 if len(diff2) > 0:
@@ -90,9 +94,9 @@ def create(config_file: str = "database/CARLO.json"):
                     select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
                 # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
                 select_columns = ""
-                for col, is_char_type in zip(target_columns_list, target_column_types):
+                for col, is_char_type in zip(dest_columns_list, dest_column_types):
                     if col in intersect:
-                        if is_char_type:
+                        if False and is_char_type:  # vorerst deaktiviert
                             select_columns += f"dbo.cln(T1.[{col}]), "
                         else:
                             select_columns += f"T1.[{col}], "
@@ -105,39 +109,58 @@ def create(config_file: str = "database/CARLO.json"):
                 if "timestamp" in source_columns:
                     select_query += " ORDER BY T1.[timestamp] "
                 else:
-                    print(current_table["target"] + " hat kein timestamp-Feld")
-                pkey = target_db.get_pkey(current_table["target"])
-                if len(pkey) == 0:
-                    print(current_table["target"] + " hat keinen Primaerschluessel")
-                select_query = select_query.replace("%", "%%")  # batch-Problem
+                    print(current_table["dest"] + " hat kein timestamp-Feld")
 
-                stage_csv = f"{cfg.stage_dir}\\{current_table['target']}_{client_db}.csv"
-                logfile = f"{cfg.logs_dir}\\{current_table['target']}_{client_db}"
-                # insert_query = f"LOAD DATA INFILE '{stage_csv}' INTO TABLE {current_table['target']} FIELDS TERMINATED BY ','
-                # ENCLOSED BY '\"' LINES TERMINATED BY '\n';"
+                select_query = select_query.replace("%", "%%")  # batch-Problem
+                select_queries[table_client] = select_query
 
-                # print(select_query)
-                bulk_copy = "bcp" if cfg.source_dsn.driver == "mssql" else "cet"
-                f.write(
-                    f'{bulk_copy} "{select_query}" queryout "{stage_csv}" {source_db.bcp_conn_params} -c -C 65001 -m 1000 '
-                    + f'-e "{logfile}.queryout.log" > "{logfile}.bcp1.log" \n'
-                )
-                f.write(f'type "{logfile}.bcp1.log" | findstr -v "1000" \n')
+                f.write(f'  call bcp_queryout.bat "{table_client}" "{select_query}"\n')
                 f.write(
-                    f"bcp [{cfg.target_dsn.schema}].[{current_table['target']}] in \"{stage_csv}\" {target_db.bcp_conn_params} "
-                    + f'-c -C 65001 -m 1000 -e "{logfile}.in.log" > "{logfile}.bcp2.log" \n'
+                    f'  call bcp_in.bat "{table_client}" '
+                    f'"[{cfg.dest_dsn.schema}].[{current_table["dest"]}]" "{current_table["dest_db"]}"\n'
                 )
-                f.write(f'type "{logfile}.bcp2.log" | findstr -v "1000" \n')
 
-                f.write(f'del "{stage_csv}" /F >nul 2>nul \n')
+            f.write(") else (\n")
+            temp_table_name = f"[{cfg.temp_db}].[temp].[{current_table['dest']}]"
+            f.write(f'  call sql_query.bat "TRUNCATE TABLE {temp_table_name}"\n\n')
+            for client_db, prefix in cfg.clients.items():
+                table_client = f'{current_table["dest"]}_{client_db}'
+                select_query = select_queries[table_client]
+                if "WHERE" in select_query:
+                    select_query = select_query.replace("WHERE", "WHERE T1.[timestamp] > '%TS%' AND")
+                elif "ORDER" in select_query:
+                    select_query = select_query.replace("ORDER", "WHERE T1.[timestamp] > '%TS%' ORDER")
+                else:
+                    print("Dont know where to put WHERE")
+                f.write(f'  call sql_timestamp.bat "{full_table_name}" "{client_db}"\n')
+                f.write(f'  call bcp_queryout.bat "{table_client}" "{select_query}"\n')
+                f.write(f'  call bcp_in.bat "{table_client}" "[temp].[{current_table["dest"]}]" "{cfg.temp_db}"\n\n')
+
+            insert_query = f"INSERT INTO {full_table_name} SELECT * FROM {temp_table_name} T1"
+            pkey = dest_db.get_pkey(current_table["dest"])
+            if len(pkey) == 0:
+                print(current_table["dest"] + " hat keinen Primaerschluessel")
+                f.write(f"  rem {current_table['dest']} hat keinen Primaerschluessel")
+            else:
+                pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in pkey]
+                pkey_join = " AND ".join(pkey_join_list)
+                delete_query = f"DELETE T1 FROM {full_table_name} T1 INNER JOIN {temp_table_name} T2 ON {pkey_join}"
+                f.write(f'  call sql_query.bat "{delete_query}"\n')
+
+            f.write(f'  call sql_query.bat "{insert_query}"\n')
+
+            f.write(")\n\n")
+            for client_db, prefix in cfg.clients.items():
+                stage_csv = f"{cfg.stage_dir}\\{current_table['dest']}_{client_db}.csv"
+                f.write(f'del "{stage_csv}" /F >nul 2>nul\n')
 
     with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
-        f.write("@echo off & cd /d %~dp0 \n")
-        f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul \n\n")
+        f.write("@echo off & cd /d %~dp0\n")
+        f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
         for index, current_table in config.iterrows():
-            f.write(f"echo =={current_table['target']}==\n")
-            f.write(f"echo {current_table['target']} >CON \n")
-            f.write(f"call {current_table['target']}.bat\n\n")
+            f.write(f"echo =={current_table['dest']}==\n")
+            f.write(f"echo {current_table['dest']} >CON\n")
+            f.write(f"call {current_table['dest']}.bat\n\n")
 
 
 if __name__ == "__main__":

+ 39 - 14
database/model.py

@@ -15,6 +15,16 @@ class DsnConfig:
     driver: str = "mssql"
     schema: str = "import"
 
+    def conn_ini(self, db_type: str):
+        return "\n".join(
+            [
+                f'{db_type}_SERVER="{self.server}"',
+                f'{db_type}_USER="{self.user}"',
+                f'{db_type}_PASSWORD="{self.password}"',
+                f'{db_type}_DATABASE="{self.database}"',
+            ]
+        )
+
 
 @dataclass
 class DbCreateConfig:
@@ -23,10 +33,21 @@ class DbCreateConfig:
     clients: dict[str, str] = field(default_factory=lambda: {"1": "M und S Fahrzeughandel GmbH"})
     filter: list[str] = (["2018-01-01T00:00:00", "2022-01-01T00:00:00"],)
     source_dsn: DsnConfig = None
-    target_dsn: DsnConfig = None
+    dest_dsn: DsnConfig = None
+    temp_db: str = "CARLOX"
     stage_dir: str = "..\\temp"
     batch_dir: str = "..\\batch"
     logs_dir: str = "..\\logs"
+    scripts_dir: str = "C:\\GlobalCube\\Tasks\\scripts"
+
+    def conn_ini(self):
+        return "\n".join(
+            [
+                f'SQL_TEMP="{self.stage_dir}"',
+                f'SQL_BATCH="{self.batch_dir}"',
+                f'SQL_LOGS="{self.logs_dir}"',
+            ]
+        )
 
 
 class DatabaseInspect:
@@ -63,17 +84,6 @@ class DatabaseInspect:
         )
         # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
 
-    @property
-    def conn_ini(self):
-        return "\r\n".join(
-            [
-                f'{self.type}_SERVER="{self.dsn.server}"',
-                f'{self.type}_USER="{self.dsn.user}"',
-                f'{self.type}_PASSWORD="{self.dsn.password}"',
-                f'{self.type}_DATABASE="{self.dsn.database}"',
-            ]
-        )
-
     @property
     def bcp_conn_params(self):
         return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
@@ -145,11 +155,26 @@ def load_config(config_file: str):
     cfg_import["name"] = Path(config_file).stem
     if "logs_dir" not in cfg_import:
         cfg_import["logs_dir"] = "..\\logs"
+    if "scripts_dir" not in cfg_import:
+        cfg_import["scripts_dir"] = "C:\\GlobalCube\\Tasks\\scripts"
+    if "target_dsn" in cfg_import:
+        cfg_import["dest_dsn"] = cfg_import["target_dsn"]
+        del cfg_import["target_dsn"]
 
-    for folder in ["stage_dir", "batch_dir", "logs_dir"]:
+    for folder in ["stage_dir", "batch_dir", "logs_dir", "scripts_dir"]:
         if cfg_import[folder].startswith(".."):
             cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
         os.makedirs(cfg_import[folder], exist_ok=True)
     cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
-    cfg_import["target_dsn"] = DsnConfig(**cfg_import["target_dsn"])
+    cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
     return DbCreateConfig(**cfg_import)
+
+
+def create_db_ini(cfg: DbCreateConfig):
+    with open(cfg.scripts_dir + "/../DB.ini", "w", encoding="cp850") as fwh:
+        fwh.write(cfg.conn_ini())
+        fwh.write("\n\n")
+        fwh.write(cfg.source_dsn.conn_ini("SOURCE"))
+        fwh.write("\n\n")
+        fwh.write(cfg.source_dsn.conn_ini("DEST"))
+        fwh.write("\n")