12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- from config import DSN
- from model import StatusMeldung
- from logfiles.systeminfo_log import SysteminfoLog
- import json
- import os
- from sqlalchemy import create_engine, select
- from sqlalchemy.orm import Session
- class PrepareLogfiles:
- def prepare_logfiles(self):
- with Session(create_engine(DSN)) as db_session:
- query = select(StatusMeldung).where(StatusMeldung.fehlerbericht.is_(None))
- for sm in db_session.scalars(query):
- # whitelist = self.get_whitelist(sm.kunde_ref.whitelist)
- fehlerbericht = dict(
- [(f["Name"], f) for f in json.loads(sm.fehlerbericht_import)]
- )
- # self.export_log_files(sm, fehlerbericht)
- # batch_log = [f for f in fehlerbericht if ".bat" in f["Name"]].pop(0)
- systeminfo_log = SysteminfoLog(
- fehlerbericht.get("system.info", {"Errors": [""]})
- )
- if systeminfo_log.ip_address == "0.0.0.0":
- print(sm.kunde)
- def export_log_files(self, status: StatusMeldung, fehlerbericht):
- kunde = status.kunde.replace(" ", "-").lower()
- path = f"fehlerbericht/temp/{status.datum}_{kunde}_{status.aufgabe}"
- os.makedirs(path, exist_ok=True)
- for file_name in os.listdir(path):
- file = path + file_name
- if os.path.isfile(file):
- os.remove(file)
- for name, f in fehlerbericht.items():
- if f["Type"] != "Workflow":
- continue
- f["Errors"][0] = f["Errors"][0].replace("\r\n", "\n")
- with open(f"{path}/{name}.log", "w", encoding="latin-1") as fwh:
- fwh.write(f["Errors"][0])
- def get_whitelist(self, whitelist2):
- whitelist = {"Layer": [], "Report": [], "User": []}
- if whitelist2 is not None and whitelist2 != "":
- whitelist.update(json.loads(whitelist2))
- return whitelist
- if __name__ == "__main__":
- mi = PrepareLogfiles()
- mi.prepare_logfiles()
|