from imap_tools import MailBox, AND import re import json import os import plac from datetime import datetime, date class Imap: commands = ["cleanup", "move", "remove", "add_to_whitelist"] whitelist = {} blacklist = [] def __init__(self): with open(os.path.dirname(__file__) + "/whitelist.json", "r") as frh: self.whitelist = json.load(frh) with open(os.path.dirname(__file__) + "/blacklist.json", "r") as frh: self.blacklist = json.load(frh) def cleanup(self): date_now = datetime.now() date_criteria = date(date_now.year, date_now.month - 1, 1) msg_limit = 100 with MailBox("mail.global-cube.com").login("archiv", "Gc01am64!") as mb: folder_list = [f.name for f in mb.folder.list() if "Archive." in f.name] for folder in folder_list: msg_count = mb.folder.status(folder)["MESSAGES"] if msg_count < msg_limit: continue mb.folder.set(folder) messages = mb.fetch( criteria=AND(date_lt=date_criteria), mark_seen=True, headers_only=True, ) uids = [msg.uid for msg in messages] mb.delete(uids[0 : (msg_count - msg_limit)]) def move(self): with MailBox("mail.global-cube.com").login("archiv", "Gc01am64!") as mb: messages = mb.fetch( criteria=AND(from_="@global-cube.com"), mark_seen=True, bulk=True, limit=1000, headers_only=True, ) for msg in messages: match = re.findall(r"\+(.*)@", msg.from_) if not match: # print(msg.from_, msg.to, msg.subject) continue domain = "@" + match[0] subfolder = "Archive." + domain.replace("@", "").replace(".", "_") if not mb.folder.exists(subfolder): mb.folder.create(subfolder) if self.is_valid_message(msg, domain): mb.move(msg.uid, subfolder) # else: # print(domain, ', '.join(msg.to), msg.subject) # print(msg.text) # print([att.filename for att in msg.attachments if att.filename.endswith('.pdf')]) def is_valid_message(self, msg, domain): for to in msg.to: if domain in to or "@global-cube.de" in to or msg.from_ == to: continue if domain not in self.whitelist: return False valid_domain = [entry in to for entry in self.whitelist[domain]] if not any(valid_domain): return False return True def add_to_whitelist(self): with MailBox("mail.global-cube.com").login("archiv", "Gc01am64!") as mb: mb.folder.set("whitelist") messages = mb.fetch( criteria=AND(from_="@global-cube.com"), mark_seen=True, bulk=True, limit=1000, headers_only=True, ) for msg in messages: match = re.findall(r"\+(.*)@", msg.from_) if not match: # print(msg.from_, msg.to, msg.subject) continue domain = "@" + match[0] if not self.is_valid_message(msg, domain): if domain not in self.whitelist: self.whitelist[domain] = [] for to in msg.to: if domain not in to and to not in self.whitelist[domain]: self.whitelist[domain].append(to) with open(os.path.dirname(__file__) + "/whitelist.json", "w") as fwh: json.dump(self.whitelist, fwh, indent=2) uids = [msg.uid for msg in messages] mb.delete(uids) def remove_absence_messages(self): with MailBox("mail.global-cube.com").login("versand", "y6!avXX3tQvr") as mb: messages = mb.fetch( criteria=AND(to="@global-cube.com"), mark_seen=False, bulk=True, limit=1000, headers_only=True, ) selected = [] for msg in messages: for b in self.blacklist: if b in msg.subject or b in msg.text: selected.append(msg.uid) mb.delete(selected) def remove(self): self.remove_absence_messages() if __name__ == "__main__": plac.Interpreter.call(Imap) # Imap().remove_absence_messages()