from datetime import date, time from email.message import EmailMessage from sqlalchemy import Integer, create_engine, String, Date, Time from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session from sqlalchemy.dialects.mysql.base import LONGTEXT from sqlalchemy.exc import IntegrityError from imap_tools import MailBox import json class Base(DeclarativeBase): pass class StatusMeldung(Base): __tablename__ = "status_meldung" datum: Mapped[date] = mapped_column(Date, primary_key=True) kunde: Mapped[str] = mapped_column(String(50), primary_key=True) aufgabe: Mapped[str] = mapped_column(String(30)) start: Mapped[time] = mapped_column(Time, primary_key=True) ende: Mapped[time] = mapped_column(Time) fehlerbericht: Mapped[LONGTEXT] = mapped_column(LONGTEXT) anzahl: Mapped[int] = mapped_column(Integer, default=0) bearbeitet: Mapped[int] = mapped_column(Integer, default=0) kommentar_id: Mapped[int] = mapped_column(Integer, default=0) class MailImport: database = "mysql+pymysql://gaps:Gcbs12ma@192.168.2.41/tasks" mailbox = ("imap.1und1.de", "status@global-cube.de", "(6QNLU=7m3R?f2?6]JO4WH(K") def __init__(self): self.engine = create_engine(self.database) def open_mailbox(self) -> MailBox: return MailBox(self.mailbox[0]).login(self.mailbox[1], self.mailbox[2]) def mail_import(self): with ( Session(self.engine) as self.session, self.open_mailbox() as mb, ): mb.folder.set("Test") messages = mb.fetch(mark_seen=True, limit=100, headers_only=False) imported = [] for msg in messages: status = self.get_status_message(msg) self.session.add(status) try: self.session.commit() imported.append(msg.uid) except IntegrityError as e: self.session.rollback() print(e.args[0]) # mb.delete(imported) def get_status_message(self, msg: EmailMessage) -> StatusMeldung: subject = msg.subject.split(";") if len(subject) < 5: print(msg.subject) return None attachments = dict([(att.filename, att.payload) for att in msg.attachments]) fehlerbericht_json = json.loads(attachments.get("fehlerbericht.json", "")) fehlerbericht = json.dumps(fehlerbericht_json, indent=2) aufgabe = "" if len(subject) > 5: if subject[5][-4:] == ".bat": aufgabe = subject[5] elif ":" in subject[5]: aufgabe = "manuell" if ":" not in subject[1]: aufgabe = subject[1] subject[1] = "00:00" if aufgabe == "": files = [f["Name"] for f in fehlerbericht_json if ".bat" in f["Name"]] if len(files) > 0: aufgabe = files[0] return StatusMeldung( datum=date.fromisoformat(subject[3]), kunde=subject[0], aufgabe=aufgabe, start=time.fromisoformat(subject[1]), ende=time.fromisoformat(subject[2]), fehlerbericht=fehlerbericht, anzahl=int(subject[4]), ) def cleanup(self): pass if __name__ == "__main__": mi = MailImport() mi.mail_import() mi.cleanup()