123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- import re
- from dataclasses import dataclass
- from datetime import datetime
- from pathlib import Path
- @dataclass
- class BulkcopyResult:
- file_name: str
- timestamp: datetime
- imported: int = -1
- exported: int = -1
- ignored: int = 0
- import_duration: float = 0.0
- export_duration: float = 0.0
- file_size: int = -1
- @property
- def missing(self) -> int:
- return self.exported - self.imported - self.ignored
- def to_csv(self) -> str:
- return (
- f"{self.file_name};{self.timestamp.strftime('%Y-%m-%dT%H:%M:%S')};"
- + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
- + f"{self.export_duration};{self.import_duration};"
- + f"{self.file_size}"
- )
- def __str__(self) -> str:
- return "\n".join(
- [
- f"Filename: {self.file_name}",
- f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
- "",
- f"Exported: {self.exported:>7}",
- f"Imported: {self.imported:>7}",
- f"Ignored: {self.ignored:>7}",
- f"Missing: {self.missing:>7}",
- "",
- f"Duration: {self.export_duration:>11} s",
- f" {self.import_duration:>11} s",
- "",
- f"Filesize: {self.file_size:>7}",
- ]
- )
- def check_logfiles(prefix: str, base_dir: str) -> BulkcopyResult:
- ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.bcp2.log").stat().st_mtime)
- result = BulkcopyResult(file_name=prefix, timestamp=ts)
- with open(f"{base_dir}/{prefix}.in.log", "r") as frh:
- result.ignored = len(frh.readlines()) // 2
- # info output of export
- with open(f"{base_dir}/{prefix}.bcp1.log", "r", encoding="cp850", errors="ignore") as frh:
- raw_logs = frh.read()
- result.exported = rows_copied(raw_logs)
- result.export_duration = total_time(raw_logs)
- # info output of import
- with open(f"{base_dir}/{prefix}.bcp2.log", "r", encoding="cp850", errors="ignore") as frh:
- raw_logs = frh.read()
- result.imported = rows_copied(raw_logs)
- result.import_duration = total_time(raw_logs)
- csv_file = Path(f"{base_dir}/{prefix}.csv")
- if csv_file.exists():
- result.file_size = csv_file.stat().st_size
- return result
- def rows_copied(raw_logs: str) -> int:
- match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
- if match:
- return int(match.group(1))
- match = re.search(r"(\d+) rows copied.", raw_logs)
- if match:
- return int(match.group(1))
- return -1
- def total_time(raw_logs: str) -> float:
- match = re.search(r"Zeit .* gesamt\s*: (\d+)", raw_logs)
- if match:
- return int(match.group(1)) / 1000
- match = re.search(r"Clock Time .* Total\s*: (\d+)", raw_logs)
- if match:
- return int(match.group(1)) / 1000
- return 0.0
- def check_directory(base_dir: str, res: list[BulkcopyResult] | None = None) -> list[BulkcopyResult]:
- if res is None:
- res = []
- for folder in Path(base_dir).glob("*"):
- if not folder.is_dir():
- continue
- res = check_directory(str(folder), res)
- for filename in Path(base_dir).glob("*.bcp2.log"):
- stem = filename.name[:-9]
- res.append(check_logfiles(stem, base_dir))
- return res
- def export_log_csv(res: list[BulkcopyResult], output_file: str):
- with open(output_file, "w") as fwh:
- fwh.write("filename;timestamp;imported;exported;ignored;missing;import_duration;export_duration;file_size\n")
- for log in res:
- fwh.write(log.to_csv() + "\n")
- def bcp_log(logs_dir: str, output_file: str):
- res = check_directory(logs_dir)
- export_log_csv(res, output_file)
- if __name__ == "__main__":
- base_dir = str(Path(__file__).parent)
- bcp_log(base_dir + "/SQL/temp", base_dir + "/SQL/bcp.csv.log")
- # check_logfiles('ORDER_LINE_1')
|