import dataclasses import json from pathlib import Path import jinja2 import typer from O365 import Account from O365.address_book import Contact from smtp_mail import SmtpMailer # from typing import Self app = typer.Typer() client_id = "925f74dc-f96a-4718-9ca7-d6cc3fa43e1e" client_secret = "SMn8Q~rVUnbYEAtEZZ6jcQElOIU9tDQUgv1VwcRz" account = Account( (client_id, client_secret), auth_flow_type="credentials", tenant_id="2ad0dff5-07ce-4cc2-a852-99ce8b91c218" ) mailboxes = [ "winter@global-cube.net", "bedner@global-cube.net", "brandt@global-cube.net", # 'd.ankenbrand@global-cube.de', # 'gawliczek@global-cube.de', "m.geiss@global-cube.net", "matarrelli@global-cube.net", "winkler@global-cube.net", "karaca@global-cube.net", ] base_dir = Path(__file__).parent / "data" class EnhancedJSONEncoder(json.JSONEncoder): def default(self, o): if dataclasses.is_dataclass(o): return dataclasses.asdict(o) return super().default(o) @dataclasses.dataclass class SerialEmail: address: str name: str @dataclasses.dataclass class SerialContact: display_name: str name: str surname: str title: str job_title: str company_name: str department: str office_location: str business_phones: str mobile_phone: str home_phones: str emails: list[SerialEmail] business_address: str other_address: str categories: str personal_notes: str def __lt__(self, other): return self.display_name < other.display_name def __eq__(self, other): return self.display_name == other.display_name @staticmethod def from_o365_contact(p_from: Contact): return SerialContact( display_name=p_from.display_name or "", name=p_from.name, surname=p_from.surname, title=p_from.title, job_title=p_from.job_title, company_name=p_from.company_name, department=p_from.department, office_location=p_from.office_location, business_phones=p_from.business_phones, mobile_phone=p_from.mobile_phone, home_phones=p_from.home_phones, emails=[SerialEmail(e.address, e.name) for e in p_from.emails], business_address=p_from.business_address, other_address=p_from.other_address, categories=p_from.categories, personal_notes=p_from.personal_notes, ) def __hash__(self) -> int: return hash(json.dumps(dataclasses.asdict(self))) def to_o365_contact(self, p_to: Contact): p_to.display_name = self.display_name p_to.name = self.name p_to.surname = self.surname p_to.title = self.title p_to.job_title = self.job_title p_to.company_name = self.company_name p_to.department = self.department p_to.office_location = self.office_location p_to.business_phones = self.business_phones p_to.mobile_phone = self.mobile_phone p_to.home_phones = self.home_phones p_to.emails.clear() for rcp in self.emails: p_to.emails.add(rcp) p_to.business_address = self.business_address # p_to.home_address = self.home_address p_to.other_address = self.other_address p_to.categories = self.categories p_to.personal_notes = self.personal_notes return p_to def copy_contact(p_from: Contact, p_to: Contact) -> SerialContact: p_to.display_name = p_from.display_name p_to.name = p_from.name p_to.surname = p_from.surname p_to.title = p_from.title p_to.job_title = p_from.job_title p_to.company_name = p_from.company_name p_to.department = p_from.department p_to.office_location = p_from.office_location p_to.business_phones = p_from.business_phones p_to.mobile_phone = p_from.mobile_phone p_to.home_phones = p_from.home_phones p_to.emails.clear() for rcp in p_from.emails: p_to.emails.add(rcp) p_to.business_address = p_from.business_address # p_to.home_address = p_from.home_address p_to.other_address = p_from.other_address p_to.categories = p_from.categories p_to.personal_notes = p_from.personal_notes p_to.save() return SerialContact.from_o365_contact(p_to) @app.command() def sync_contacts(): account.authenticate() shared = account.address_book(resource="adressbuch@global-cube.net", address_book="personal") shared_contacts = {p.display_name: p for p in shared.get_contacts(limit=None)} shared_contacts_serial = [SerialContact.from_o365_contact(p) for p in shared_contacts.values()] shared_contacts_serial.sort() json.dump(shared_contacts_serial, (base_dir / "shared.json").open("w"), indent=2, cls=EnhancedJSONEncoder) shared_contacts_hash = {c.display_name: hash(c) for c in shared_contacts_serial} shared_contacts_prev = json.load((base_dir / "shared_hash.json").open("r")) shared_contacts_combined = get_contacts_combined(shared_contacts_hash, shared_contacts_prev) delete_candidates = set() for mailbox in mailboxes: print(mailbox) mailbox_prefix = mailbox.split("@")[0] personal = account.address_book(resource=mailbox, address_book="personal") personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)} personal_contacts_serial = [SerialContact.from_o365_contact(p) for p in personal_contacts.values()] personal_contacts_serial.sort() personal_contacts_hash = {c.display_name: hash(c) for c in personal_contacts_serial} extra_contacts = set(personal_contacts.keys()).difference(shared_contacts.keys()) extra_contacts_serial = [SerialContact.from_o365_contact(personal_contacts[c]) for c in extra_contacts] extra_contacts_serial.sort() json.dump( extra_contacts_serial, (base_dir / f"personal_{mailbox_prefix}_extra.json").open("w"), indent=2, cls=EnhancedJSONEncoder, ) print(extra_contacts) different_contacts = [] if len(delete_candidates) == 0: delete_candidates = extra_contacts else: delete_candidates = delete_candidates.intersection(extra_contacts) for p_shared in shared_contacts.values(): if p_shared.display_name not in personal_contacts: p_new = personal.new_contact() c = copy_contact(p_shared, p_new) if hash(c) not in shared_contacts_combined[p_shared.display_name]: shared_contacts_combined[p_shared.display_name].append(hash(c)) continue if personal_contacts_hash[p_shared.display_name] == shared_contacts_hash[p_shared.display_name]: # contact details are identical, nothing to do continue if personal_contacts_hash[p_shared.display_name] in shared_contacts_combined[p_shared.display_name]: # simply update contact details c = copy_contact(p_shared, personal_contacts[p_shared.display_name]) if hash(c) not in shared_contacts_combined[p_shared.display_name]: shared_contacts_combined[p_shared.display_name].append(hash(c)) continue different_contacts.append(SerialContact.from_o365_contact(personal_contacts[p_shared.display_name])) different_contacts.sort() json.dump( different_contacts, (base_dir / f"personal_{mailbox_prefix}_diff.json").open("w"), indent=2, cls=EnhancedJSONEncoder, ) different_contacts_shared = [ SerialContact.from_o365_contact(shared_contacts[c.display_name]) for c in different_contacts ] json.dump( different_contacts_shared, (base_dir / f"shared_{mailbox_prefix}_diff.json").open("w"), indent=2, cls=EnhancedJSONEncoder, ) json.dump(shared_contacts_combined, (base_dir / "shared_hash.json").open("w"), indent=2, cls=EnhancedJSONEncoder) return delete_candidates def get_contacts_combined(shared_contacts_hash: dict[str, int], shared_contacts_prev: dict): res = {} for k, v in shared_contacts_hash.items(): if k in shared_contacts_prev: v2 = shared_contacts_prev[k] if isinstance(v2, int): v2 = [v2] if v not in v2: v2.append(v) res[k] = v2 else: res[k] = [v] return res def delete_contacts(delete_candidates): account.authenticate() for mailbox in mailboxes: print(mailbox) personal = account.address_book(resource=mailbox, address_book="personal") personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)} selected_contacts = set(delete_candidates).intersection(personal_contacts.keys()) print(selected_contacts) for c in selected_contacts: personal_contacts[c].delete() def normalize_phone_number(phone_number: str): if phone_number is None: return "" if phone_number.startswith("0"): phone_number = "+49 " + phone_number[1:] for c in "()/?": phone_number = phone_number.replace(c, "") return phone_number.replace(" -", "-").replace("- ", "-").replace(" ", " ") def cleanup_contacts(): account.authenticate() for mailbox in mailboxes: print(mailbox) shared = account.address_book(resource=mailbox, address_book="personal") for p in shared.get_contacts(limit=None): business = [normalize_phone_number(no) for no in p.business_phones] if p.business_phones != business: p.business_phones = business home = [normalize_phone_number(no) for no in p.home_phones] if p.home_phones != home: p.home_phones = home mobile = normalize_phone_number(p.mobile_phone) if p.mobile_phone != mobile: p.mobile_phone = mobile if p._track_changes: print(p.display_name) p.save() def cleanup_contacts2(): account.authenticate() for mailbox in mailboxes: print(mailbox) personal = account.address_book(resource=mailbox, address_book="personal") for p in personal.get_contacts(limit=None): if not p.mobile_phone: p.mobile_phone = "" if not p.job_title: p.job_title = "" if not p.department: p.department = "" if not p.office_location: p.office_location = "" p.save() def cleanup_contacts3(): account.authenticate() for mailbox in mailboxes: print(mailbox) personal = account.address_book(resource=mailbox, address_book="personal") for p in personal.get_contacts(limit=None): if p.display_name is None or p.display_name == "": print(p.name, p.surname) def create_private_folder(): account.authenticate() for mailbox in mailboxes: print(mailbox) personal = account.address_book(resource=mailbox, address_book="personal") private = personal.get_folder(folder_name="privat") if private is None: personal.create_child_folder("privat") @app.command() def send_mail(): account.authenticate() # mailboxes2 = ["bedner@global-cube.net"] with open(base_dir / "../templates/info.html.jinja") as f: template = jinja2.Template(f.read()) for mailbox in mailboxes: mailbox_prefix = mailbox.split("@")[0] extra_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_extra.json").open("r")) different_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_diff.json").open("r")) with SmtpMailer() as mailer: mailer.send( mailto=mailbox, subject="Wochenbericht adressbuch@global-cube.net", html=template.render(extra_contacts=extra_contacts, different_contacts=different_contacts), attachment=[ ("privat.json", (base_dir / f"personal_{mailbox_prefix}_diff.json")), ("geteilt.json", (base_dir / f"shared_{mailbox_prefix}_diff.json")), ], ) @app.command() def sync_and_delete(): contacts = sync_contacts() if len(contacts) > 0: delete_contacts(contacts) if __name__ == "__main__": # create_private_folder() # send_mail() # cleanup_contacts3() app() # print(sync_contacts())