bcp_log.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import re
  2. from dataclasses import dataclass
  3. from pathlib import Path
  4. from datetime import datetime
  5. @dataclass
  6. class BulkcopyResult:
  7. filename: str
  8. timestamp: datetime
  9. imported: int = 0
  10. exported: int = 0
  11. ignored: int = 0
  12. import_duration: float = 0.0
  13. export_duration: float = 0.0
  14. @property
  15. def missing(self) -> int:
  16. return self.exported - self.imported - self.ignored
  17. def to_csv(self):
  18. return (
  19. f"{self.filename};{self.timestamp.strftime('%d.%m.%Y %H:%M:%S')};"
  20. + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
  21. + f"{self.export_duration};{self.import_duration}"
  22. )
  23. def __str__(self) -> str:
  24. return "\n".join(
  25. [
  26. f"Filename: {self.filename}",
  27. f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
  28. "",
  29. f"Exported: {self.exported:>7}",
  30. f"Imported: {self.imported:>7}",
  31. f"Ignored: {self.ignored:>7}",
  32. f"Missing: {self.missing:>7}",
  33. "",
  34. f"Duration: {self.export_duration:>11} s",
  35. f" {self.import_duration:>11} s",
  36. ]
  37. )
  38. def check_logfiles(prefix, base_dir):
  39. ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.in.log").stat().st_mtime)
  40. result = BulkcopyResult(filename=prefix, timestamp=ts)
  41. with open(f"{base_dir}/{prefix}.in.log", "r") as frh:
  42. result.ignored = len(frh.readlines())
  43. # info output of export
  44. with open(
  45. f"{base_dir}/{prefix}.bcp1.log", "r", encoding="cp850", errors="ignore"
  46. ) as frh:
  47. raw_logs = frh.read()
  48. match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
  49. result.exported = int(match.group(1)) if match else 0
  50. match2 = re.search(r"Zeit .* gesamt: (\d+)", raw_logs)
  51. result.export_duration = int(match2.group(1)) / 1000 if match2 else 0
  52. # info output of import
  53. with open(
  54. f"{base_dir}/{prefix}.bcp2.log", "r", encoding="cp850", errors="ignore"
  55. ) as frh:
  56. raw_logs = frh.read()
  57. match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
  58. result.imported = int(match.group(1)) if match else 0
  59. match2 = re.search(r"Zeit .* gesamt: (\d+)", raw_logs)
  60. result.import_duration = int(match2.group(1)) / 1000 if match2 else 0
  61. return result
  62. def check_directory(base_dir):
  63. res = []
  64. for filename in Path(base_dir).glob("*.bcp1.log"):
  65. stem = filename.name[:-9]
  66. res.append(check_logfiles(stem, base_dir).to_csv())
  67. with open(base_dir + "/info.log", "w") as fwh:
  68. fwh.write(
  69. "filename;timestamp;imported;exported;ignored;import_duration;export_duration\n"
  70. )
  71. fwh.write("\n".join(res))
  72. if __name__ == "__main__":
  73. check_directory("/home/robert/projekte/python/dbtools/SQL/temp")
  74. # check_logfiles('ORDER_LINE_1')