bcp_log.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import re
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. from pathlib import Path
  5. @dataclass
  6. class BulkcopyResult:
  7. file_name: str
  8. timestamp: datetime
  9. imported: int = -1
  10. exported: int = -1
  11. ignored: int = 0
  12. import_duration: float = 0.0
  13. export_duration: float = 0.0
  14. file_size: int = -1
  15. @property
  16. def missing(self) -> int:
  17. return self.exported - self.imported - self.ignored
  18. def to_csv(self) -> str:
  19. return (
  20. f"{self.file_name};{self.timestamp.strftime('%Y-%m-%dT%H:%M:%S')};"
  21. + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
  22. + f"{self.export_duration};{self.import_duration};"
  23. + f"{self.file_size}"
  24. )
  25. def __str__(self) -> str:
  26. return "\n".join(
  27. [
  28. f"Filename: {self.file_name}",
  29. f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
  30. "",
  31. f"Exported: {self.exported:>7}",
  32. f"Imported: {self.imported:>7}",
  33. f"Ignored: {self.ignored:>7}",
  34. f"Missing: {self.missing:>7}",
  35. "",
  36. f"Duration: {self.export_duration:>11} s",
  37. f" {self.import_duration:>11} s",
  38. "",
  39. f"Filesize: {self.file_size:>7}",
  40. ]
  41. )
  42. def check_logfiles(prefix: str, base_dir: str) -> BulkcopyResult:
  43. ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.bcp2.log").stat().st_mtime)
  44. result = BulkcopyResult(file_name=prefix, timestamp=ts)
  45. in_log = Path(f"{base_dir}\\{prefix}.in.log")
  46. if in_log.exists():
  47. with in_log.open("r") as frh:
  48. result.ignored = len(frh.readlines()) // 2
  49. # info output of export
  50. bcp1_log = Path(f"{base_dir}\\{prefix}.bcp1.log")
  51. if bcp1_log.exists():
  52. with bcp1_log.open("r", encoding="cp850", errors="ignore") as frh:
  53. raw_logs = frh.read()
  54. result.exported = rows_copied(raw_logs)
  55. result.export_duration = total_time(raw_logs)
  56. # info output of import
  57. bcp2_log = Path(f"{base_dir}\\{prefix}.bcp2.log")
  58. if bcp2_log.exists():
  59. with bcp2_log.open("r", encoding="cp850", errors="ignore") as frh:
  60. raw_logs = frh.read()
  61. result.imported = rows_copied(raw_logs)
  62. result.import_duration = total_time(raw_logs)
  63. csv_file = Path(f"{base_dir}\\{prefix}.csv")
  64. if csv_file.exists():
  65. result.file_size = csv_file.stat().st_size
  66. return result
  67. def rows_copied(raw_logs: str) -> int:
  68. match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
  69. if match:
  70. return int(match.group(1))
  71. match = re.search(r"(\d+) rows copied.", raw_logs)
  72. if match:
  73. return int(match.group(1))
  74. return -1
  75. def total_time(raw_logs: str) -> float:
  76. match = re.search(r"Zeit .* gesamt\s*: (\d+)", raw_logs)
  77. if match:
  78. return int(match.group(1)) / 1000
  79. match = re.search(r"Clock Time .* Total\s*: (\d+)", raw_logs)
  80. if match:
  81. return int(match.group(1)) / 1000
  82. return 0.0
  83. def check_directory(base_dir: str, res: list[BulkcopyResult] | None = None) -> list[BulkcopyResult]:
  84. if res is None:
  85. res = []
  86. for folder in Path(base_dir).glob("*"):
  87. if not folder.is_dir():
  88. continue
  89. res = check_directory(str(folder), res)
  90. for filename in Path(base_dir).glob("*.bcp2.log"):
  91. stem = filename.name[:-9]
  92. res.append(check_logfiles(stem, base_dir))
  93. return res
  94. def export_log_csv(res: list[BulkcopyResult], output_file: str):
  95. with open(output_file, "w") as fwh:
  96. fwh.write("filename;timestamp;imported;exported;ignored;missing;import_duration;export_duration;file_size\n")
  97. for log in res:
  98. fwh.write(log.to_csv() + "\n")
  99. def bcp_log(logs_dir: str, output_file: str):
  100. res = check_directory(logs_dir)
  101. export_log_csv(res, output_file)
  102. if __name__ == "__main__":
  103. base_dir = str(Path(__file__).parent)
  104. bcp_log(base_dir + "/SQL/temp", base_dir + "/SQL/bcp.csv.log")
  105. # check_logfiles('ORDER_LINE_1')