bcp_log.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import re
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. from pathlib import Path
  5. @dataclass
  6. class BulkcopyResult:
  7. file_name: str
  8. timestamp: datetime
  9. exported: int = -1
  10. imported: int = -1
  11. ignored: int = 0
  12. export_duration: float = 0.0
  13. import_duration: float = 0.0
  14. export_errors: str = ""
  15. import_errors: str = ""
  16. file_size: int = -1
  17. @property
  18. def missing(self) -> int:
  19. return self.exported - self.imported - self.ignored
  20. def to_csv(self) -> str:
  21. return (
  22. f"{self.file_name};{self.timestamp.strftime('%Y-%m-%dT%H:%M:%S')};"
  23. + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
  24. + f"{self.export_duration};{self.import_duration};"
  25. + f"{self.export_errors};{self.import_errors};{self.file_size}"
  26. )
  27. def __str__(self) -> str:
  28. return "\n".join(
  29. [
  30. f"Filename: {self.file_name}",
  31. f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
  32. "",
  33. f"Exported: {self.exported:>7}",
  34. f"Imported: {self.imported:>7}",
  35. f"Ignored: {self.ignored:>7}",
  36. f"Missing: {self.missing:>7}",
  37. "",
  38. f"Duration: {self.export_duration:>11} s",
  39. f" {self.import_duration:>11} s",
  40. "",
  41. f"Filesize: {self.file_size:>7}",
  42. ]
  43. )
  44. def check_logfiles(prefix: str, base_dir: str) -> BulkcopyResult:
  45. ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.bcp2.log").stat().st_mtime)
  46. result = BulkcopyResult(file_name=prefix, timestamp=ts)
  47. in_log = Path(f"{base_dir}\\{prefix}.in.log")
  48. if in_log.exists():
  49. with in_log.open("r") as frh:
  50. result.ignored = len(frh.readlines()) // 2
  51. # info output of export
  52. bcp1_log = Path(f"{base_dir}\\{prefix}.bcp1.log")
  53. if bcp1_log.exists():
  54. with bcp1_log.open("r", encoding="cp850", errors="ignore") as frh:
  55. raw_logs = frh.read()
  56. result.exported = rows_copied(raw_logs)
  57. result.export_duration = total_time(raw_logs)
  58. result.export_errors = sql_errors(raw_logs)
  59. # info output of import
  60. bcp2_log = Path(f"{base_dir}\\{prefix}.bcp2.log")
  61. if bcp2_log.exists():
  62. with bcp2_log.open("r", encoding="cp850", errors="ignore") as frh:
  63. raw_logs = frh.read()
  64. result.imported = rows_copied(raw_logs)
  65. result.import_duration = total_time(raw_logs)
  66. result.import_errors = sql_errors(raw_logs)
  67. csv_file = Path(f"{base_dir}\\{prefix}.csv")
  68. if csv_file.exists():
  69. result.file_size = csv_file.stat().st_size
  70. return result
  71. def rows_copied(raw_logs: str) -> int:
  72. match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
  73. if match:
  74. return int(match.group(1))
  75. match = re.search(r"(\d+) rows copied.", raw_logs)
  76. if match:
  77. return int(match.group(1))
  78. return -1
  79. def total_time(raw_logs: str) -> float:
  80. match = re.search(r"Zeit .* gesamt\s*: (\d+)", raw_logs)
  81. if match:
  82. return int(match.group(1)) / 1000
  83. match = re.search(r"Clock Time .* Total\s*: (\d+)", raw_logs)
  84. if match:
  85. return int(match.group(1)) / 1000
  86. return 0.0
  87. def sql_errors(raw_logs: str) -> str:
  88. match = re.findall(r"SQLState = (\w+),", raw_logs)
  89. if match:
  90. return ",".join(set(match).difference({"S1000", "22001", "22003", "22005"}))
  91. return ""
  92. def check_directory(base_dir: str, res: list[BulkcopyResult] | None = None) -> list[BulkcopyResult]:
  93. if res is None:
  94. res = []
  95. for folder in Path(base_dir).glob("*"):
  96. if not folder.is_dir():
  97. continue
  98. res = check_directory(str(folder), res)
  99. for filename in Path(base_dir).glob("*.bcp2.log"):
  100. stem = filename.name[:-9]
  101. res.append(check_logfiles(stem, base_dir))
  102. return res
  103. def export_log_csv(res: list[BulkcopyResult], output_file: str):
  104. with open(output_file, "w") as fwh:
  105. fwh.write(
  106. "filename;timestamp;exported;imported;ignored;missing;export_duration;import_duration;export_errors;import_errors;file_size\n"
  107. )
  108. for log in res:
  109. fwh.write(log.to_csv() + "\n")
  110. def bcp_log(logs_dir: str, output_file: str):
  111. res = check_directory(logs_dir)
  112. export_log_csv(res, output_file)
  113. if __name__ == "__main__":
  114. base_dir = str(Path(__file__).parent)
  115. bcp_log(base_dir + "/SQL/temp", base_dir + "/SQL/bcp.csv.log")
  116. # check_logfiles('ORDER_LINE_1')