import re from config import DSN from model import StatusMeldung from logfiles.systeminfo_log import SysteminfoLog import json import os from sqlalchemy import create_engine, select from sqlalchemy.orm import Session class PrepareLogfiles: def prepare_logfiles(self): with Session(create_engine(DSN)) as db_session: query = select(StatusMeldung).where(StatusMeldung.fehlerbericht.is_(None)) for sm in db_session.scalars(query): self.convert_logfile(sm) db_session.commit() def convert_logfile(self, sm: StatusMeldung): whitelist_str = sm.kunde_ref.whitelist if sm.kunde_ref is not None else "" whitelist = self.get_whitelist(whitelist_str) fehlerbericht = dict( [(f["Name"], f) for f in json.loads(sm.fehlerbericht_import)] ) for name, report in fehlerbericht.items(): if ".bat" in name: e = report["Errors"][0] e = e.replace( "!! Bitte E-Mailadresse fuer Statusbericht angeben !!", "! Bitte E-Mailadresse fuer Statusbericht angeben !", ) e = e.replace( "!! SMTP-Konfiguration bitte anpassen !!", "! SMTP-Konfiguration bitte anpassen !", ) e, err_count = re.subn(r"(!![^\n]*!!)", r"\1", e) e, err_count2 = re.subn( r"\((\w{2}\d{4})\)( [^\r]*)", r"(\1)\2", e, ) e, _ = re.subn( r"Error \= \[Micro.* NULL.*\.", "", e, ) err_count3 = 0 e, err_count3 = re.subn( r"(Error \= \[Micro[^\r]*)", r"\1", e, ) if sum([err_count, err_count2, err_count3]) > 0: report["ErrorLevel"] = 2 report["Errors"] = [e] + re.findall(r"[^\n]*", e) sm.anzahl += 1 elif ".dtsx" in name: e = re.sub(r"\r\n", "\n", report["Errors"][0]) e = re.sub(r"(\w*)ende\n", "§", e) e = re.sub(r"Copyright ", "§", e) loglevel_warn = [b for b in e.split("§") if "Status:" not in b] loglevel_error = [b for b in loglevel_warn if "Fehler:" in b] loglevel_combined = [ "" + b + "" if "Fehler:" in b else b for b in loglevel_warn ] loglevel_combined[-1] = loglevel_combined[-1].replace( "DTSER_FAILURE", "DTSER_FAILURE" ) if ( len(loglevel_error) > 0 or "DTSER_SUCCESS" not in loglevel_combined[-1] ): report["ErrorLevel"] = 2 sm.anzahl += 1 report["Errors"][0] = "\n".join(loglevel_combined) elif len(report["Errors"]) > 0 and report["Type"] in [ "Portal", "Versand", ]: report["Errors2"] = [ e for e in report["Errors"] if self.in_whitelist(e, whitelist) ] report["Errors"] = [ e for e in report["Errors"] if e not in report["Errors2"] ] if len(report["Errors"]) == 0: report["ErrorLevel"] = 3 sm.anzahl -= 1 sm.fehlerbericht = json.dumps(list(fehlerbericht.values()), indent=2) # self.export_log_files(sm, fehlerbericht) # batch_log = [f for f in fehlerbericht if ".bat" in f["Name"]].pop(0) systeminfo_log = SysteminfoLog( fehlerbericht.get("system.info", {"Errors": [""]}) ) # if systeminfo_log.ip_address == "0.0.0.0": # print(sm.kunde) def in_whitelist(self, e, whitelist): return any( [ e["Layer"] in whitelist["Layer"], e["Report"] in whitelist["Report"], e["User"] in whitelist["User"], ] ) def export_log_files(self, status: StatusMeldung, fehlerbericht): kunde = status.kunde.replace(" ", "-").lower() path = f"fehlerbericht/temp/{status.datum}_{kunde}_{status.aufgabe}" os.makedirs(path, exist_ok=True) for file_name in os.listdir(path): file = path + file_name if os.path.isfile(file): os.remove(file) for name, f in fehlerbericht.items(): if f["Type"] != "Workflow": continue f["Errors"][0] = f["Errors"][0].replace("\r\n", "\n") with open(f"{path}/{name}.log", "w", encoding="latin-1") as fwh: fwh.write(f["Errors"][0]) def get_whitelist(self, whitelist2): whitelist = {"Layer": [], "Report": [], "User": []} if whitelist2 is not None and whitelist2 != "": whitelist.update(json.loads(whitelist2)) return whitelist def main(): mi = PrepareLogfiles() mi.prepare_logfiles() if __name__ == "__main__": main() # import cProfile # cProfile.run( # "main()", # "fehlerbericht/prepare_logfiles.prof", # )