123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- from datetime import date, time
- from email.message import EmailMessage
- from sqlalchemy import Integer, create_engine, String, Date, Time
- from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
- from sqlalchemy.dialects.mysql.base import LONGTEXT
- from sqlalchemy.exc import IntegrityError
- from imap_tools import MailBox
- import json
- class Base(DeclarativeBase):
- pass
- class StatusMeldung(Base):
- __tablename__ = "status_meldung"
- datum: Mapped[date] = mapped_column(Date, primary_key=True)
- kunde: Mapped[str] = mapped_column(String(50), primary_key=True)
- aufgabe: Mapped[str] = mapped_column(String(30))
- start: Mapped[time] = mapped_column(Time, primary_key=True)
- ende: Mapped[time] = mapped_column(Time)
- fehlerbericht: Mapped[LONGTEXT] = mapped_column(LONGTEXT)
- anzahl: Mapped[int] = mapped_column(Integer, default=0)
- bearbeitet: Mapped[int] = mapped_column(Integer, default=0)
- kommentar_id: Mapped[int] = mapped_column(Integer, default=0)
- class MailImport:
- database = "mysql+pymysql://gaps:Gcbs12ma@192.168.2.41/tasks"
- mailbox = ("imap.1und1.de", "status@global-cube.de", "(6QNLU=7m3R?f2?6]JO4WH(K")
- def __init__(self):
- self.engine = create_engine(self.database)
- def open_mailbox(self) -> MailBox:
- return MailBox(self.mailbox[0]).login(self.mailbox[1], self.mailbox[2])
- def mail_import(self):
- with (
- Session(self.engine) as self.session,
- self.open_mailbox() as mb,
- ):
- mb.folder.set("Test")
- messages = mb.fetch(mark_seen=True, limit=100, headers_only=False)
- imported = []
- for msg in messages:
- status = self.get_status_message(msg)
- self.session.add(status)
- try:
- self.session.commit()
- imported.append(msg.uid)
- except IntegrityError as e:
- self.session.rollback()
- print(e.args[0])
- # mb.delete(imported)
- def get_status_message(self, msg: EmailMessage) -> StatusMeldung:
- subject = msg.subject.split(";")
- if len(subject) < 5:
- print(msg.subject)
- return None
- attachments = dict([(att.filename, att.payload) for att in msg.attachments])
- fehlerbericht_json = json.loads(attachments.get("fehlerbericht.json", ""))
- fehlerbericht = json.dumps(fehlerbericht_json, indent=2)
- aufgabe = ""
- if len(subject) > 5:
- if subject[5][-4:] == ".bat":
- aufgabe = subject[5]
- elif ":" in subject[5]:
- aufgabe = "manuell"
- if ":" not in subject[1]:
- aufgabe = subject[1]
- subject[1] = "00:00"
- if aufgabe == "":
- files = [f["Name"] for f in fehlerbericht_json if ".bat" in f["Name"]]
- if len(files) > 0:
- aufgabe = files[0]
- return StatusMeldung(
- datum=date.fromisoformat(subject[3]),
- kunde=subject[0],
- aufgabe=aufgabe,
- start=time.fromisoformat(subject[1]),
- ende=time.fromisoformat(subject[2]),
- fehlerbericht=fehlerbericht,
- anzahl=int(subject[4]),
- )
- def cleanup(self):
- pass
- if __name__ == "__main__":
- mi = MailImport()
- mi.mail_import()
- mi.cleanup()
|