123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import re
- from dataclasses import dataclass
- from pathlib import Path
- from datetime import datetime
- @dataclass
- class BulkcopyResult:
- filename: str
- timestamp: datetime
- imported: int = 0
- exported: int = 0
- ignored: int = 0
- import_duration: float = 0.0
- export_duration: float = 0.0
- @property
- def missing(self) -> int:
- return self.exported - self.imported - self.ignored
- def to_csv(self):
- return (
- f"{self.filename};{self.timestamp.strftime('%d.%m.%Y %H:%M:%S')};"
- + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
- + f"{self.export_duration};{self.import_duration}"
- )
- def __str__(self) -> str:
- return "\n".join(
- [
- f"Filename: {self.filename}",
- f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
- "",
- f"Exported: {self.exported:>7}",
- f"Imported: {self.imported:>7}",
- f"Ignored: {self.ignored:>7}",
- f"Missing: {self.missing:>7}",
- "",
- f"Duration: {self.export_duration:>11} s",
- f" {self.import_duration:>11} s",
- ]
- )
- def check_logfiles(prefix, base_dir):
- ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.in.log").stat().st_mtime)
- result = BulkcopyResult(filename=prefix, timestamp=ts)
- with open(f"{base_dir}/{prefix}.in.log", "r") as frh:
- result.ignored = len(frh.readlines())
- # info output of export
- with open(f"{base_dir}/{prefix}.bcp1.log", "r", encoding="cp850", errors="ignore") as frh:
- raw_logs = frh.read()
- match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
- result.exported = int(match.group(1)) if match else 0
- match2 = re.search(r"Zeit .* gesamt: (\d+)", raw_logs)
- result.export_duration = int(match2.group(1)) / 1000 if match2 else 0
- # info output of import
- with open(f"{base_dir}/{prefix}.bcp2.log", "r", encoding="cp850", errors="ignore") as frh:
- raw_logs = frh.read()
- match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
- result.imported = int(match.group(1)) if match else 0
- match2 = re.search(r"Zeit .* gesamt: (\d+)", raw_logs)
- result.import_duration = int(match2.group(1)) / 1000 if match2 else 0
- return result
- def check_directory(base_dir):
- res = []
- for filename in Path(base_dir).glob("*.bcp1.log"):
- stem = filename.name[:-9]
- res.append(check_logfiles(stem, base_dir).to_csv())
- with open(base_dir + "/info.log", "w") as fwh:
- fwh.write("filename;timestamp;imported;exported;ignored;import_duration;export_duration\n")
- fwh.write("\n".join(res))
- if __name__ == "__main__":
- base_dir = str(Path(__file__).parent)
- check_directory(base_dir + "/SQL/temp")
- # check_logfiles('ORDER_LINE_1')
|