prepare_logfiles.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from config import DSN
  2. from model import StatusMeldung
  3. from logfiles.systeminfo_log import SysteminfoLog
  4. import json
  5. import os
  6. from sqlalchemy import create_engine, select
  7. from sqlalchemy.orm import Session
  8. class PrepareLogfiles:
  9. def prepare_logfiles(self):
  10. with Session(create_engine(DSN)) as db_session:
  11. query = select(StatusMeldung).where(StatusMeldung.fehlerbericht.is_(None))
  12. for sm in db_session.scalars(query):
  13. # whitelist = self.get_whitelist(sm.kunde_ref.whitelist)
  14. fehlerbericht = dict(
  15. [(f["Name"], f) for f in json.loads(sm.fehlerbericht_import)]
  16. )
  17. # self.export_log_files(sm, fehlerbericht)
  18. # batch_log = [f for f in fehlerbericht if ".bat" in f["Name"]].pop(0)
  19. systeminfo_log = SysteminfoLog(
  20. fehlerbericht.get("system.info", {"Errors": [""]})
  21. )
  22. if systeminfo_log.ip_address == "0.0.0.0":
  23. print(sm.kunde)
  24. def export_log_files(self, status: StatusMeldung, fehlerbericht):
  25. kunde = status.kunde.replace(" ", "-").lower()
  26. path = f"fehlerbericht/temp/{status.datum}_{kunde}_{status.aufgabe}"
  27. os.makedirs(path, exist_ok=True)
  28. for file_name in os.listdir(path):
  29. file = path + file_name
  30. if os.path.isfile(file):
  31. os.remove(file)
  32. for name, f in fehlerbericht.items():
  33. if f["Type"] != "Workflow":
  34. continue
  35. f["Errors"][0] = f["Errors"][0].replace("\r\n", "\n")
  36. with open(f"{path}/{name}.log", "w", encoding="latin-1") as fwh:
  37. fwh.write(f["Errors"][0])
  38. def get_whitelist(self, whitelist2):
  39. whitelist = {"Layer": [], "Report": [], "User": []}
  40. if whitelist2 is not None and whitelist2 != "":
  41. whitelist.update(json.loads(whitelist2))
  42. return whitelist
  43. if __name__ == "__main__":
  44. mi = PrepareLogfiles()
  45. mi.prepare_logfiles()