import re from dataclasses import dataclass from datetime import datetime from pathlib import Path @dataclass class BulkcopyResult: file_name: str timestamp: datetime imported: int = -1 exported: int = -1 ignored: int = 0 import_duration: float = 0.0 export_duration: float = 0.0 file_size: int = -1 @property def missing(self) -> int: return self.exported - self.imported - self.ignored def to_csv(self) -> str: return ( f"{self.file_name};{self.timestamp.strftime('%Y-%m-%dT%H:%M:%S')};" + f"{self.exported};{self.imported};{self.ignored};{self.missing};" + f"{self.export_duration};{self.import_duration};" + f"{self.file_size}" ) def __str__(self) -> str: return "\n".join( [ f"Filename: {self.file_name}", f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}", "", f"Exported: {self.exported:>7}", f"Imported: {self.imported:>7}", f"Ignored: {self.ignored:>7}", f"Missing: {self.missing:>7}", "", f"Duration: {self.export_duration:>11} s", f" {self.import_duration:>11} s", "", f"Filesize: {self.file_size:>7}", ] ) def check_logfiles(prefix: str, base_dir: str) -> BulkcopyResult: ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.bcp2.log").stat().st_mtime) result = BulkcopyResult(file_name=prefix, timestamp=ts) with open(f"{base_dir}/{prefix}.in.log", "r") as frh: result.ignored = len(frh.readlines()) // 2 # info output of export with open(f"{base_dir}/{prefix}.bcp1.log", "r", encoding="cp850", errors="ignore") as frh: raw_logs = frh.read() result.exported = rows_copied(raw_logs) result.export_duration = total_time(raw_logs) # info output of import with open(f"{base_dir}/{prefix}.bcp2.log", "r", encoding="cp850", errors="ignore") as frh: raw_logs = frh.read() result.imported = rows_copied(raw_logs) result.import_duration = total_time(raw_logs) csv_file = Path(f"{base_dir}/{prefix}.csv") if csv_file.exists(): result.file_size = csv_file.stat().st_size return result def rows_copied(raw_logs: str) -> int: match = re.search(r"(\d+) Zeilen kopiert.", raw_logs) if match: return int(match.group(1)) match = re.search(r"(\d+) rows copied.", raw_logs) if match: return int(match.group(1)) return -1 def total_time(raw_logs: str) -> float: match = re.search(r"Zeit .* gesamt\s*: (\d+)", raw_logs) if match: return int(match.group(1)) / 1000 match = re.search(r"Clock Time .* Total\s*: (\d+)", raw_logs) if match: return int(match.group(1)) / 1000 return 0.0 def check_directory(base_dir: str, res: list[BulkcopyResult] | None = None) -> list[BulkcopyResult]: if res is None: res = [] for folder in Path(base_dir).glob("*"): if not folder.is_dir(): continue res = check_directory(str(folder), res) for filename in Path(base_dir).glob("*.bcp2.log"): stem = filename.name[:-9] res.append(check_logfiles(stem, base_dir)) return res def export_log_csv(res: list[BulkcopyResult], output_file: str): with open(output_file, "w") as fwh: fwh.write("filename;timestamp;imported;exported;ignored;missing;import_duration;export_duration;file_size\n") for log in res: fwh.write(log.to_csv() + "\n") def bcp_log(logs_dir: str, output_file: str): res = check_directory(logs_dir) export_log_csv(res, output_file) if __name__ == "__main__": base_dir = str(Path(__file__).parent) bcp_log(base_dir + "/SQL/temp", base_dir + "/SQL/bcp.csv.log") # check_logfiles('ORDER_LINE_1')