123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import json
- import os
- import re
- from dataclasses import dataclass
- from datetime import datetime, timedelta
- import plac
- from imap_tools import AND, MailBox, MailMessage
- @dataclass
- class ImapCredentials:
- server: str
- username: str
- password: str
- class Imap:
- commands = ["cleanup", "move", "remove", "add_to_whitelist", "undelivered"]
- whitelist = {}
- blacklist = []
- credentials = {
- "archiv": ImapCredentials("mail.global-cube.com", "archiv", "Gc01am64!"),
- "versand": ImapCredentials("mail.global-cube.com", "versand", "y6!avXX3tQvr"),
- }
- def __init__(self):
- with open(os.path.dirname(__file__) + "/whitelist.json", "r") as frh:
- self.whitelist = json.load(frh)
- with open(os.path.dirname(__file__) + "/blacklist.json", "r") as frh:
- self.blacklist = json.load(frh)
- def connect(self, key):
- creds = self.credentials[key]
- return MailBox(creds.server).login(creds.username, creds.password)
- def cleanup(self):
- date_now = datetime.now()
- date_criteria = (date_now - timedelta(days=60)).date()
- msg_limit = 100
- with self.connect("archiv") as mb:
- folder_list = [f.name for f in mb.folder.list() if "Archive." in f.name]
- for folder in folder_list:
- msg_count = mb.folder.status(folder)["MESSAGES"]
- if msg_count < msg_limit:
- continue
- mb.folder.set(folder)
- uids = [0] * 101
- while len(uids) > 100:
- messages = mb.fetch(
- criteria=AND(date_lt=date_criteria), mark_seen=True, limit=1000, headers_only=True
- )
- uids = [msg.uid for msg in messages]
- mb.delete(uids[0 : (msg_count - msg_limit)])
- def move(self):
- with self.connect("archiv") as mb:
- messages = mb.fetch(
- criteria=AND(from_="@global-cube.com"), mark_seen=True, bulk=True, limit=1000, headers_only=True
- )
- for msg in messages:
- if msg.subject.count(";") > 3:
- # statusmail
- mb.delete([msg.uid])
- continue
- match = re.findall(r"\+(.*)@", msg.from_)
- if not match:
- # print(msg.from_, msg.to, msg.subject)
- continue
- domain = "@" + match[0]
- subfolder = "Archive." + domain.replace("@", "").replace(".", "_")
- if not mb.folder.exists(subfolder):
- mb.folder.create(subfolder)
- if self.is_valid_message(msg, domain):
- mb.move(msg.uid, subfolder)
- # else:
- # print(domain, ', '.join(msg.to), msg.subject)
- # print(msg.text)
- # print([att.filename for att in msg.attachments if att.filename.endswith('.pdf')])
- def is_valid_message(self, msg, domain):
- for to in msg.to:
- if domain in to or "@global-cube.de" in to or msg.from_ == to:
- continue
- if domain not in self.whitelist:
- return False
- valid_domain = [entry in to for entry in self.whitelist[domain]]
- if not any(valid_domain):
- return False
- return True
- def add_to_whitelist(self):
- with self.connect("archiv") as mb:
- mb.folder.set("whitelist")
- messages = mb.fetch(
- criteria=AND(from_="@global-cube.com"), mark_seen=True, bulk=True, limit=1000, headers_only=True
- )
- for msg in messages:
- match = re.findall(r"\+(.*)@", msg.from_)
- if not match:
- # print(msg.from_, msg.to, msg.subject)
- continue
- domain = "@" + match[0]
- if not self.is_valid_message(msg, domain):
- if domain not in self.whitelist:
- self.whitelist[domain] = []
- for to in msg.to:
- if domain not in to and to not in self.whitelist[domain]:
- self.whitelist[domain].append(to)
- with open(os.path.dirname(__file__) + "/whitelist.json", "w") as fwh:
- json.dump(self.whitelist, fwh, indent=2)
- uids = [msg.uid for msg in messages]
- mb.delete(uids)
- def remove_absence_messages(self):
- with self.connect("versand") as mb:
- messages = mb.fetch(
- criteria=AND(to="@global-cube.com"), mark_seen=False, bulk=True, limit=1000, headers_only=True
- )
- selected = []
- for msg in messages:
- for b in self.blacklist:
- if b in msg.subject or b in msg.text:
- selected.append(msg.uid)
- mb.delete(selected)
- def remove(self):
- self.remove_absence_messages()
- def undelivered(self):
- with self.conncet("versand") as mb:
- selected = []
- for msg in mb.fetch(
- criteria=AND(to="@global-cube.com"),
- mark_seen=False,
- bulk=True,
- limit=1000,
- ):
- for k in ["undeliverable", "returned", "zurückgeschickt"]:
- if k in msg.subject.lower():
- match = re.search(
- r"<([\w\-\+\.]+@[\w\-\.]+\.[\w\-\.]+)>(.*)",
- msg.text,
- re.DOTALL,
- )
- if match:
- mail_to = match[1]
- text = self.get_error_message(match[2])
- selected.append(";".join([mail_to, text]))
- if len(msg.attachments) > 0:
- msg2 = MailMessage.from_bytes(msg.attachments[0].payload)
- selected[-1] += ";" + ";".join([msg2.from_, msg2.subject, msg2.date_str])
- with open(os.path.dirname(__file__) + "/undelivered.csv", "w") as fwh:
- fwh.write("\n".join(selected))
- def get_error_message(self, text):
- m1 = re.search(r"550 5.1.1 (Mailbox <.*> does not exist)", text)
- if m1:
- return m1[1]
- m2 = re.search(r"(Host or domain name not found)", text)
- if m2:
- return m2[1]
- m3 = re.search(r"550 5.1.1 (User unknown: .*)\(", text)
- if m3:
- return m3[1]
- m4 = re.search(r"550 5.1.1 (.*)", text)
- if m4:
- return m4[1].replace("\n", "").replace("\r", "")
- m5 = re.search(r"(X-Microsoft-Antispam)", text)
- if m5:
- return m5[1]
- return text[:100]
- if __name__ == "__main__":
- plac.Interpreter.call(Imap)
- # Imap().undelivered()
|