from config import DSN from model import StatusMeldung from logfiles.systeminfo_log import SysteminfoLog import json import os from sqlalchemy import create_engine, select from sqlalchemy.orm import Session class PrepareLogfiles: def prepare_logfiles(self): with Session(create_engine(DSN)) as db_session: query = select(StatusMeldung).where(StatusMeldung.fehlerbericht.is_(None)) for sm in db_session.scalars(query): # whitelist = self.get_whitelist(sm.kunde_ref.whitelist) fehlerbericht = dict( [(f["Name"], f) for f in json.loads(sm.fehlerbericht_import)] ) # self.export_log_files(sm, fehlerbericht) # batch_log = [f for f in fehlerbericht if ".bat" in f["Name"]].pop(0) systeminfo_log = SysteminfoLog( fehlerbericht.get("system.info", {"Errors": [""]}) ) if systeminfo_log.ip_address == "0.0.0.0": print(sm.kunde) def export_log_files(self, status: StatusMeldung, fehlerbericht): kunde = status.kunde.replace(" ", "-").lower() path = f"fehlerbericht/temp/{status.datum}_{kunde}_{status.aufgabe}" os.makedirs(path, exist_ok=True) for file_name in os.listdir(path): file = path + file_name if os.path.isfile(file): os.remove(file) for name, f in fehlerbericht.items(): if f["Type"] != "Workflow": continue f["Errors"][0] = f["Errors"][0].replace("\r\n", "\n") with open(f"{path}/{name}.log", "w", encoding="latin-1") as fwh: fwh.write(f["Errors"][0]) def get_whitelist(self, whitelist2): whitelist = {"Layer": [], "Report": [], "User": []} if whitelist2 is not None and whitelist2 != "": whitelist.update(json.loads(whitelist2)) return whitelist if __name__ == "__main__": mi = PrepareLogfiles() mi.prepare_logfiles()