from imap_tools import MailBox, AND, MailMessage import re import json import os import plac from datetime import datetime, date from dataclasses import dataclass @dataclass class ImapCredentials: server: str username: str password: str class Imap: commands = ["cleanup", "move", "remove", "add_to_whitelist", "undelivered"] whitelist = {} blacklist = [] credentials = { "archiv": ImapCredentials("mail.global-cube.com", "archiv", "Gc01am64!"), "versand": ImapCredentials("mail.global-cube.com", "versand", "y6!avXX3tQvr"), } def __init__(self): with open(os.path.dirname(__file__) + "/whitelist.json", "r") as frh: self.whitelist = json.load(frh) with open(os.path.dirname(__file__) + "/blacklist.json", "r") as frh: self.blacklist = json.load(frh) def connect(self, key): creds = self.credentials[key] return MailBox(creds.server).login(creds.username, creds.password) def cleanup(self): date_now = datetime.now() date_criteria = date(date_now.year, date_now.month - 1, 1) msg_limit = 100 with self.connect("archiv") as mb: folder_list = [f.name for f in mb.folder.list() if "Archive." in f.name] for folder in folder_list: msg_count = mb.folder.status(folder)["MESSAGES"] if msg_count < msg_limit: continue mb.folder.set(folder) uids = [0] * 101 while len(uids) > 100: messages = mb.fetch( criteria=AND(date_lt=date_criteria), mark_seen=True, limit=1000, headers_only=True ) uids = [msg.uid for msg in messages] mb.delete(uids[0 : (msg_count - msg_limit)]) def move(self): with self.connect("archiv") as mb: messages = mb.fetch( criteria=AND(from_="@global-cube.com"), mark_seen=True, bulk=True, limit=1000, headers_only=True ) for msg in messages: if msg.subject.count(";") > 3: # statusmail mb.delete([msg.uid]) continue match = re.findall(r"\+(.*)@", msg.from_) if not match: # print(msg.from_, msg.to, msg.subject) continue domain = "@" + match[0] subfolder = "Archive." + domain.replace("@", "").replace(".", "_") if not mb.folder.exists(subfolder): mb.folder.create(subfolder) if self.is_valid_message(msg, domain): mb.move(msg.uid, subfolder) # else: # print(domain, ', '.join(msg.to), msg.subject) # print(msg.text) # print([att.filename for att in msg.attachments if att.filename.endswith('.pdf')]) def is_valid_message(self, msg, domain): for to in msg.to: if domain in to or "@global-cube.de" in to or msg.from_ == to: continue if domain not in self.whitelist: return False valid_domain = [entry in to for entry in self.whitelist[domain]] if not any(valid_domain): return False return True def add_to_whitelist(self): with self.connect("archiv") as mb: mb.folder.set("whitelist") messages = mb.fetch( criteria=AND(from_="@global-cube.com"), mark_seen=True, bulk=True, limit=1000, headers_only=True ) for msg in messages: match = re.findall(r"\+(.*)@", msg.from_) if not match: # print(msg.from_, msg.to, msg.subject) continue domain = "@" + match[0] if not self.is_valid_message(msg, domain): if domain not in self.whitelist: self.whitelist[domain] = [] for to in msg.to: if domain not in to and to not in self.whitelist[domain]: self.whitelist[domain].append(to) with open(os.path.dirname(__file__) + "/whitelist.json", "w") as fwh: json.dump(self.whitelist, fwh, indent=2) uids = [msg.uid for msg in messages] mb.delete(uids) def remove_absence_messages(self): with self.connect("versand") as mb: messages = mb.fetch( criteria=AND(to="@global-cube.com"), mark_seen=False, bulk=True, limit=1000, headers_only=True ) selected = [] for msg in messages: for b in self.blacklist: if b in msg.subject or b in msg.text: selected.append(msg.uid) mb.delete(selected) def remove(self): self.remove_absence_messages() def undelivered(self): with self.conncet("versand") as mb: selected = [] for msg in mb.fetch( criteria=AND(to="@global-cube.com"), mark_seen=False, bulk=True, limit=1000, ): for k in ["undeliverable", "returned", "zurückgeschickt"]: if k in msg.subject.lower(): match = re.search( r"<([\w\-\+\.]+@[\w\-\.]+\.[\w\-\.]+)>(.*)", msg.text, re.DOTALL, ) if match: mail_to = match[1] text = self.get_error_message(match[2]) selected.append(";".join([mail_to, text])) if len(msg.attachments) > 0: msg2 = MailMessage.from_bytes(msg.attachments[0].payload) selected[-1] += ";" + ";".join([msg2.from_, msg2.subject, msg2.date_str]) with open(os.path.dirname(__file__) + "/undelivered.csv", "w") as fwh: fwh.write("\n".join(selected)) def get_error_message(self, text): m1 = re.search(r"550 5.1.1 (Mailbox <.*> does not exist)", text) if m1: return m1[1] m2 = re.search(r"(Host or domain name not found)", text) if m2: return m2[1] m3 = re.search(r"550 5.1.1 (User unknown: .*)\(", text) if m3: return m3[1] m4 = re.search(r"550 5.1.1 (.*)", text) if m4: return m4[1].replace("\n", "").replace("\r", "") m5 = re.search(r"(X-Microsoft-Antispam)", text) if m5: return m5[1] return text[:100] if __name__ == "__main__": plac.Interpreter.call(Imap) # Imap().undelivered()