浏览代码

compare weiter angepasst

gc-server3 9 月之前
父节点
当前提交
125e3c0f0a
共有 4 个文件被更改,包括 91 次插入9 次删除
  1. 76 7
      database/db_compare.py
  2. 2 2
      database/db_create.py
  3. 13 0
      database/model.py
  4. 二进制
      dist/gctools.exe

+ 76 - 7
database/db_compare.py

@@ -4,9 +4,14 @@ from pathlib import Path
 
 import pandas as pd
 import pyodbc
+from sqlalchemy import create_engine
 
 from database.db_create import get_import_config
-from database.model import DatabaseInspect, create_db_ini, load_config
+from database.model import DatabaseInspect, DbCreateConfig, create_db_ini, load_config
+
+
+def decode_ts(ts_binary):
+    return "0x" + codecs.encode(ts_binary, "hex_codec").decode()
 
 
 def compare(config_file: str = "database/CARLO.json"):
@@ -27,8 +32,6 @@ def compare(config_file: str = "database/CARLO.json"):
         dest_timestamp = {}
         if current_table["dest"] in dest_tables:
             full_table_name = f'[{current_table["dest_db"]}].[{cfg.dest_dsn.schema}].[{current_table["dest"]}]'
-            # pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
-            # cols = pkey + ["timestamp"]
             query_count_dest = f"SELECT [Client_DB], COUNT(*) as [Rows] FROM {full_table_name} GROUP BY [Client_DB]"
             q = dest_db.cursor.execute(query_count_dest)
             dest_row_count = dict([(col[0], col[1]) for col in q.fetchall()])
@@ -37,15 +40,12 @@ def compare(config_file: str = "database/CARLO.json"):
                 f"SELECT [Client_DB], max(timestamp) as [TS] FROM {full_table_name} GROUP BY [Client_DB]"
             )
             q = dest_db.cursor.execute(query_timestamp_dest)
-            dest_timestamp = dict(
-                [(col[0], "0x" + codecs.encode(col[1], "hex_codec").decode()) for col in q.fetchall()]
-            )
+            dest_timestamp = dict([(col[0], decode_ts(col[1])) for col in q.fetchall()])
 
         source_row_count = {}
         source_row_count_ts = {}
 
         for client_db, prefix in cfg.clients.items():
-            # table_client = f'{current_table["dest"]}_{client_db}'
             source_table = current_table["source"].format(prefix)
             source_table2 = source_table.split(".")[-1][1:-1]
 
@@ -61,6 +61,8 @@ def compare(config_file: str = "database/CARLO.json"):
 
                 if not pd.isnull(current_table["filter"]):
                     select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
+                elif "WHERE" not in select_query:
+                    select_query += " WHERE 1 = 1"
                 query_count_source = select_query.replace("T1.*", "COUNT(*) as [Rows]")
                 # print(query_count_source)
 
@@ -85,3 +87,70 @@ def compare(config_file: str = "database/CARLO.json"):
                 print(f"  Quelle:          {source_row_count.get(client_db, 0):>8}")
                 print(f"  Quelle (bis ts): {source_row_count_ts.get(client_db, 0):>8}")
                 print(f"  dest:            {dest_row_count.get(client_db, 0):>8}")
+                compare_details(current_table, client_db, source_db, dest_db, query_count_source, full_table_name, cfg)
+
+
+def compare_details(
+    current_table: pd.Series,
+    client_db: str,
+    source_db: DatabaseInspect,
+    dest_db: DatabaseInspect,
+    query_count_source: str,
+    full_table_name: str,
+    cfg: DbCreateConfig,
+):
+    table_client = f'{current_table["dest"]}_{client_db}'
+    pkey = dest_db.get_pkey(current_table["dest"], current_table["dest_db"])
+    cols = pkey + ["timestamp"]
+    if "Client_DB" in cols:
+        cols.remove("Client_DB")
+    if "CLIENT_DB" in cols:
+        cols.remove("CLIENT_DB")
+
+    query_cols = ", ".join([f"T1.[{c}]" for c in cols])
+    query_source = query_count_source.replace("COUNT(*) as [Rows]", query_cols)
+    query_source += f" ORDER BY {query_cols}"
+    query_dest = (
+        f"SELECT {query_cols} FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' ORDER BY {query_cols}"
+    )
+
+    source_file = f"{cfg.stage_dir}\\source\\{table_client}.csv"
+    source_data = pd.read_sql(query_source, create_engine(source_db.conn_string_sqlalchemy))
+    source_data["timestamp"] = source_data["timestamp"].apply(decode_ts)
+    source_data.to_csv(source_file, index=False)
+
+    dest_file = f"{cfg.stage_dir}\\dest\\{table_client}.csv"
+    dest_data = pd.read_sql(query_dest, create_engine(dest_db.conn_string_sqlalchemy))
+    dest_data["timestamp"] = dest_data["timestamp"].apply(decode_ts)
+    dest_data.to_csv(dest_file, index=False)
+
+    cols_without_ts = cols[:-1]
+
+    only_in_source_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_source.sql"
+    only_in_source = pd.merge(source_data, dest_data, how="left", on=cols_without_ts)
+    only_in_source = only_in_source[pd.isna(only_in_source["timestamp_y"])]
+    if only_in_source.shape[0] > 0:
+        ts_list = ", ".join(only_in_source.to_dict(orient="list")["timestamp_x"])
+        query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
+        query += f" AND T1.[timestamp] IN ({ts_list})"
+        with open(only_in_source_file, "w") as fwh:
+            fwh.write(query)
+
+    only_in_dest_file = f"{cfg.stage_dir}\\diff\\{table_client}_only_in_dest.sql"
+    only_in_dest = pd.merge(dest_data, source_data, how="left", on=cols_without_ts)
+    only_in_dest = only_in_dest[pd.isna(only_in_dest["timestamp_y"])]
+    if only_in_dest.shape[0] > 0:
+        ts_list = ", ".join(only_in_dest.to_dict(orient="list")["timestamp_x"])
+        query = f"SELECT T1.* FROM {full_table_name} T1 WHERE T1.[Client_DB] = '{client_db}' AND T1.[timestamp] IN ({ts_list})"
+        with open(only_in_dest_file, "w") as fwh:
+            fwh.write(query)
+
+    not_updated_file = f"{cfg.stage_dir}\\diff\\{table_client}_not_updated.sql"
+    not_updated = pd.merge(source_data, dest_data, how="inner", on=cols_without_ts)
+    not_updated = not_updated[not_updated["timestamp_x"] != not_updated["timestamp_y"]]
+    if not_updated.shape[0] > 0:
+        ts_list = ", ".join(not_updated.to_dict(orient="list")["timestamp_x"])
+        query = query_count_source.replace("COUNT(*) as [Rows]", "T1.*")
+        query += f" AND T1.[timestamp] IN ({ts_list})"
+        with open(not_updated_file, "w") as fwh:
+            fwh.write(query)

+ 2 - 2
database/db_create.py

@@ -94,6 +94,8 @@ def create(config_file: str = "database/CARLO.json"):
 
                 if not pd.isnull(current_table["filter"]):
                     select_query += " WHERE " + current_table["filter"].format("", cfg.filter[0], cfg.filter[1])
+                elif "WHERE" not in select_query:
+                    select_query += " WHERE 1 = 1"
                 # select_columns = "T1.[" + "], T1.[".join(intersect) + "],"
                 select_columns = ""
                 for col, is_char_type in zip(dest_columns_list, dest_column_types):
@@ -132,8 +134,6 @@ def create(config_file: str = "database/CARLO.json"):
                 convert_timestamp = "T1.[timestamp] > convert(binary(8), '%TS%', 1)"
                 if "WHERE" in select_query:
                     select_query = select_query.replace("WHERE", f"WHERE {convert_timestamp} AND")
-                elif "ORDER" in select_query:
-                    select_query = select_query.replace("ORDER", f"WHERE {convert_timestamp} ORDER")
                 else:
                     print("Dont know where to put WHERE")
                 f.write(f'  call sql_timestamp.bat "{table_client}" "{full_table_name}" "{client_db}"\n')

+ 13 - 0
database/model.py

@@ -84,6 +84,17 @@ class DatabaseInspect:
         )
         # f"DSN={self.dsn.server};UID={self.dsn.user};PWD={self.dsn.password}"
 
+    @property
+    def conn_string_sqlalchemy(self):
+        if self.dsn.driver == "mssql":
+            return (
+                f"mssql+pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?"
+                "driver=SQL+Server+Native+Client+11.0"
+            )
+        if self.dsn.driver == "mysql":
+            return f"mysql+pymysql://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?charset=utf8mb4"
+        return f"pyodbc://{self.dsn.user}:{self.dsn.password}@{self.dsn.server}/{self.dsn.database}?driver={self.dsn.driver}"
+
     @property
     def bcp_conn_params(self):
         return f"-S {self.dsn.server} -d {self.dsn.database} -U {self.dsn.user} -P {self.dsn.password}"
@@ -167,6 +178,8 @@ def load_config(config_file: str):
         if cfg_import[folder].startswith(".."):
             cfg_import[folder] = str(base_dir.joinpath(cfg_import[folder]).resolve())
         os.makedirs(cfg_import[folder], exist_ok=True)
+    for folder in ["source", "dest", "diff"]:
+        os.makedirs(cfg_import["stage_dir"] + "\\" + folder, exist_ok=True)
     cfg_import["source_dsn"] = DsnConfig(**cfg_import["source_dsn"])
     cfg_import["dest_dsn"] = DsnConfig(**cfg_import["dest_dsn"])
     return DbCreateConfig(**cfg_import)

二进制
dist/gctools.exe