bcp_log.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import re
  2. from dataclasses import dataclass
  3. from pathlib import Path
  4. from datetime import datetime
  5. @dataclass
  6. class BulkcopyResult:
  7. filename: str
  8. timestamp: datetime
  9. imported: int = 0
  10. exported: int = 0
  11. ignored: int = 0
  12. import_duration: float = 0.0
  13. export_duration: float = 0.0
  14. @property
  15. def missing(self) -> int:
  16. return self.exported - self.imported - self.ignored
  17. def to_csv(self):
  18. return (
  19. f"{self.filename};{self.timestamp.strftime('%d.%m.%Y %H:%M:%S')};"
  20. + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
  21. + f"{self.export_duration};{self.import_duration}"
  22. )
  23. def __str__(self) -> str:
  24. return "\n".join(
  25. [
  26. f"Filename: {self.filename}",
  27. f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
  28. "",
  29. f"Exported: {self.exported:>7}",
  30. f"Imported: {self.imported:>7}",
  31. f"Ignored: {self.ignored:>7}",
  32. f"Missing: {self.missing:>7}",
  33. "",
  34. f"Duration: {self.export_duration:>11} s",
  35. f" {self.import_duration:>11} s",
  36. ]
  37. )
  38. def check_logfiles(prefix, base_dir):
  39. ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.in.log").stat().st_mtime)
  40. result = BulkcopyResult(filename=prefix, timestamp=ts)
  41. with open(f"{base_dir}/{prefix}.in.log", "r") as frh:
  42. result.ignored = len(frh.readlines())
  43. # info output of export
  44. with open(f"{base_dir}/{prefix}.bcp1.log", "r", encoding="cp850", errors="ignore") as frh:
  45. raw_logs = frh.read()
  46. match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
  47. result.exported = int(match.group(1)) if match else 0
  48. match2 = re.search(r"Zeit .* gesamt: (\d+)", raw_logs)
  49. result.export_duration = int(match2.group(1)) / 1000 if match2 else 0
  50. # info output of import
  51. with open(f"{base_dir}/{prefix}.bcp2.log", "r", encoding="cp850", errors="ignore") as frh:
  52. raw_logs = frh.read()
  53. match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
  54. result.imported = int(match.group(1)) if match else 0
  55. match2 = re.search(r"Zeit .* gesamt: (\d+)", raw_logs)
  56. result.import_duration = int(match2.group(1)) / 1000 if match2 else 0
  57. return result
  58. def check_directory(base_dir):
  59. res = []
  60. for filename in Path(base_dir).glob("*.bcp1.log"):
  61. stem = filename.name[:-9]
  62. res.append(check_logfiles(stem, base_dir).to_csv())
  63. with open(base_dir + "/info.log", "w") as fwh:
  64. fwh.write("filename;timestamp;imported;exported;ignored;import_duration;export_duration\n")
  65. fwh.write("\n".join(res))
  66. if __name__ == "__main__":
  67. base_dir = str(Path(__file__).parent)
  68. check_directory(base_dir + "/SQL/temp")
  69. # check_logfiles('ORDER_LINE_1')