123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- import io
- import json
- import sys
- from dataclasses import dataclass
- from functools import cached_property
- from pathlib import Path
- import pandas as pd
- sys.path.insert(0, "C:\\Projekte\\tools")
- from database.model import ( # noqa:E402
- DatabaseInspect,
- DbCreateConfig,
- create_db_ini,
- load_config,
- )
- def get_import_config(filename: str, db_name: str):
- df = pd.read_csv(filename, sep=";", encoding="latin-1")
- if "dest" not in df.columns:
- df["dest"] = df["target"]
- df["dest_db"] = db_name
- df["cols"] = ""
- df[["source", "dest", "dest_db", "filter", "query", "iterative", "cols"]].to_csv(
- filename, sep=";", encoding="latin-1", index=False
- )
- return df[df["dest"].notnull()]
- @dataclass
- class SourceTable:
- source: str
- client_db: str
- prefix: str
- @property
- def stage_csv(self) -> str:
- return ""
- @property
- def table_client(self) -> str:
- return ""
- @property
- def table_name(self) -> str:
- return ""
- @property
- def select_query(self) -> str:
- return ""
- @dataclass
- class DestTable:
- source: str
- dest: str
- dest_db: str
- filter_: str
- query: str
- iterative: str
- cols: str
- source_tables: list[SourceTable] = None
- dest_inspect: DatabaseInspect = None
- def table_batch_file(self, batch_dir: str) -> str:
- return f"{batch_dir}/{self.dest}.bat"
- def full_table_name(self, schema_name: str) -> str:
- return f"[{self.dest_db}].[{schema_name}].[{self.dest}]"
- def temp_table_name(self, temp_db: str) -> str:
- return f"[{temp_db}].[temp].[{self.dest}]"
- @cached_property
- def columns_list(self) -> list[str]:
- res = self.dest_inspect.get_columns(self.dest)
- if "CLIENT_DB" in res:
- res.remove("CLIENT_DB")
- res.append("Client_DB")
- return res
- @cached_property
- def column_types(self) -> list[str]:
- return self.dest_inspect.get_columns_is_typeof_str(self.dest)
- @cached_property
- def primary_key(self) -> list[str]:
- return self.dest_inspect.get_pkey(self.dest, self.dest_db)
- @property
- def insert_query(self) -> str:
- return f"INSERT INTO {self.full_table_name} SELECT * FROM {self.temp_table_name} T1"
- @property
- def delete_query(self) -> str:
- # pkey = self.primary_key
- if len(self.primary_key) == 0:
- return ""
- pkey_join_list = [f"T1.[{col}] = T2.[{col}]" for col in self.primary_key]
- pkey_join = " AND ".join(pkey_join_list)
- return f"DELETE T1 FROM {self.full_table_name} T1 INNER JOIN {self.temp_table_name} T2 ON {pkey_join}"
- class SourceTable2(SourceTable):
- dest_table: DestTable
- cfg: DbCreateConfig
- _select_query: str = None
- source_inspect: DatabaseInspect = None
- info: str = ""
- @property
- def table_client(self) -> str:
- return f"{self.dest_table.dest}_{self.client_db}"
- @property
- def stage_csv(self) -> str:
- return f"{self.cfg.stage_dir}\\{self.table_client}.csv"
- @property
- def table_name(self) -> str:
- return self.source.format(self.prefix)
- @cached_property
- def select_query(self):
- f = io.StringIO()
- source_columns = set(self.source_inspect.get_columns(self.table_name))
- intersect = source_columns.intersection(self.dest_table.columns_list)
- # print("Auf beiden Seiten: " + ";".join(intersect))
- diff1 = source_columns.difference(self.dest_table.columns_list)
- if len(diff1) > 0:
- f.write("rem Nur in Quelle: " + ";".join(diff1) + "\n")
- diff2 = set(self.dest_table.columns_list).difference(source_columns)
- if "Client_DB" not in diff2:
- f.write("echo Spalte 'Client_DB' fehlt!\n")
- return
- diff2.remove("Client_DB")
- if len(diff2) > 0:
- f.write("rem Nur in Ziel: " + ";".join(diff2) + "\n")
- if not pd.isnull(self.dest_table.query):
- select_query = self.dest_table.query.format(self.prefix, self.cfg.filter[0], self.cfg.filter[1])
- elif "." in self.table_name or self.cfg.source_dsn.schema == "":
- if self.table_name[0] != "[":
- self.table_name = f"[{self.table_name}]"
- select_query = f"SELECT T1.* FROM {self.table_name} T1 "
- else:
- select_query = f"SELECT T1.* FROM [{self.cfg.source_dsn.schema}].[{self.table_name}] T1 "
- if not pd.isnull(self.dest_table.filter_):
- select_query += " WHERE " + self.dest_table.filter_.format("", self.cfg.filter[0], self.cfg.filter[1])
- elif "WHERE" not in select_query:
- select_query += " WHERE 1 = 1"
- # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
- select_columns = ""
- for col, is_char_type in zip(self.dest_table.columns_list, self.dest_table.column_types):
- if col in intersect:
- if False and is_char_type: # vorerst deaktiviert
- select_columns += f"dbo.cln(T1.[{col}]), "
- else:
- select_columns += f"T1.[{col}], "
- elif col == "Client_DB":
- select_columns += f"'{self.client_db}' as [Client_DB], "
- else:
- select_columns += "'' as [" + col + "], "
- select_query = select_query.replace("T1.*", select_columns[:-2])
- if "timestamp" in source_columns:
- select_query += " ORDER BY T1.[timestamp] "
- else:
- print(self.dest_table.dest + " hat kein timestamp-Feld")
- self.info = f.getvalue()
- return select_query
- def create(config_file: str = "database/CARLO.json"):
- cfg = load_config(config_file)
- create_db_ini(cfg)
- base_dir = str(Path(config_file).parent.parent.resolve())
- source_inspect = DatabaseInspect(cfg.source_dsn, source=True)
- print(json.dumps(source_inspect.get_prefix(), indent=2))
- SourceTable2.source_inspect = source_inspect
- SourceTable2.cfg = cfg
- dest_inspect = DatabaseInspect(cfg.dest_dsn)
- # DestTable.dest_inspect = dest_inspect
- config = get_import_config(f"{base_dir}/config/{cfg.csv_file}", cfg.dest_dsn.database)
- table_config = [DestTable(*row.values()) for row in config.to_dict(orient="records")]
- for dest_table in table_config:
- dest_table.dest_inspect = dest_inspect
- dest_table.source_tables = []
- for client_db, prefix in cfg.clients.items():
- st = SourceTable2(dest_table.source, client_db, prefix)
- st.dest_table = dest_table
- dest_table.source_tables.append(st)
- for dest_table in table_config:
- with open(dest_table.table_batch_file(cfg.batch_dir), "w", encoding="cp850") as f:
- full_table_name = dest_table.full_table_name(cfg.dest_dsn.schema)
- f.write("@echo off\n")
- f.write(f'call "{cfg.scripts_dir}\\config2.bat"\n')
- f.write("rem ==" + dest_table.dest + "==\n")
- if dest_table.dest not in dest_inspect.tables_list:
- f.write(f"echo Ziel-Tabelle '{dest_table.dest}' existiert nicht!\n")
- print(f"Ziel-Tabelle '{dest_table.dest}' existiert nicht!")
- continue
- f.write(f"del {cfg.logs_dir}\\{dest_table.dest}*.* /Q /F >nul 2>nul\n\n")
- f.write('if not "%1"=="" goto :increment\n')
- f.write("\n:full\n")
- f.write(f' call sql_query.bat "TRUNCATE TABLE {full_table_name}"\n')
- for source_table in dest_table.source_tables:
- if source_table.table_name not in source_inspect.tables_list:
- source_table2 = source_inspect.convert_table(source_table.table_name)
- if source_table2 not in source_inspect.tables_list:
- f.write(f"echo Quell-Tabelle '{source_table.table_name}' existiert nicht!\n")
- print(f"Quell-Tabelle '{source_table.table_name}' existiert nicht!")
- continue
- select_query = source_table.select_query.replace("%", "%%%%") # batch-Problem
- f.write(source_table.info)
- if select_query == "":
- print(f"Ziel-Tabelle '{dest_table.dest}' Spalte 'Client_DB' fehlt!")
- continue
- f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
- f.write(
- f' call bcp_in.bat "{source_table.table_client}" '
- f'"[{cfg.dest_dsn.schema}].[{dest_table.dest}]" "{dest_table.dest_db}"\n'
- )
- f.write(" goto :cleanup\n\n")
- f.write(":increment\n")
- f.write(f' call sql_query.bat "TRUNCATE TABLE {dest_table.temp_table_name}"\n\n')
- for source_table in dest_table.source_tables:
- select_query = source_table.select_query
- convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)"
- if "WHERE" in select_query:
- select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND")
- else:
- print("Dont know where to put WHERE")
- f.write(f' call sql_timestamp.bat "{source_table.table_client}" "{full_table_name}" "{client_db}"\n')
- f.write(f' call bcp_queryout.bat "{source_table.table_client}" "{select_query}"\n')
- f.write(
- f' call bcp_in.bat "{source_table.table_client}" "[temp].[{dest_table.dest}]" "{cfg.temp_db}"\n\n'
- )
- if dest_table.delete_query == "":
- print(dest_table.dest + " hat keinen Primaerschluessel")
- f.write(f" rem {dest_table.dest} hat keinen Primaerschluessel")
- else:
- f.write(f' call sql_query.bat "{dest_table.delete_query}"\n')
- f.write(f' call sql_query.bat "{dest_table.insert_query}"\n')
- f.write("\n:cleanup\n")
- for source_table in dest_table.source_tables:
- f.write(f' call delete.bat "{source_table.stage_csv}"\n')
- with open(f"{cfg.batch_dir}/_{cfg.name}.bat", "w", encoding="cp850") as f:
- f.write("@echo off & cd /d %~dp0\n")
- f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
- for dest_table in table_config:
- f.write(f"echo =={dest_table.dest}==\n")
- f.write(f"echo {dest_table.dest} >CON\n")
- f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat 1\n\n")
- with open(f"{cfg.batch_dir}/_{cfg.name}_full_load.bat", "w", encoding="cp850") as f:
- f.write("@echo off & cd /d %~dp0\n")
- f.write(f"del {cfg.stage_dir}\\*.* /Q /F >nul 2>nul\n\n")
- for dest_table in table_config:
- f.write(f"echo =={dest_table.dest}==\n")
- f.write(f"echo {dest_table.dest} >CON\n")
- f.write(f"call {cfg.batch_dir}\\{dest_table.dest}.bat\n\n")
- if __name__ == "__main__":
- create()
|