123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- import dataclasses
- import json
- from pathlib import Path
- import jinja2
- import typer
- from O365 import Account
- from O365.address_book import Contact
- from smtp_mail import SmtpMailer
- # from typing import Self
- app = typer.Typer()
- client_id = "925f74dc-f96a-4718-9ca7-d6cc3fa43e1e"
- client_secret = "SMn8Q~rVUnbYEAtEZZ6jcQElOIU9tDQUgv1VwcRz"
- account = Account(
- (client_id, client_secret), auth_flow_type="credentials", tenant_id="2ad0dff5-07ce-4cc2-a852-99ce8b91c218"
- )
- mailboxes = [
- "winter@global-cube.net",
- "bedner@global-cube.net",
- "brandt@global-cube.net",
- # 'd.ankenbrand@global-cube.de',
- # 'gawliczek@global-cube.de',
- "m.geiss@global-cube.net",
- "matarrelli@global-cube.net",
- "winkler@global-cube.net",
- "karaca@global-cube.net",
- ]
- base_dir = Path(__file__).parent / "data"
- class EnhancedJSONEncoder(json.JSONEncoder):
- def default(self, o):
- if dataclasses.is_dataclass(o):
- return dataclasses.asdict(o)
- return super().default(o)
- @dataclasses.dataclass
- class SerialEmail:
- address: str
- name: str
- @dataclasses.dataclass
- class SerialContact:
- display_name: str
- name: str
- surname: str
- title: str
- job_title: str
- company_name: str
- department: str
- office_location: str
- business_phones: str
- mobile_phone: str
- home_phones: str
- emails: list[SerialEmail]
- business_address: str
- other_address: str
- categories: str
- personal_notes: str
- def __lt__(self, other):
- return self.display_name < other.display_name
- def __eq__(self, other):
- return self.display_name == other.display_name
- @staticmethod
- def from_o365_contact(p_from: Contact):
- return SerialContact(
- display_name=p_from.display_name or "",
- name=p_from.name,
- surname=p_from.surname,
- title=p_from.title,
- job_title=p_from.job_title,
- company_name=p_from.company_name,
- department=p_from.department,
- office_location=p_from.office_location,
- business_phones=p_from.business_phones,
- mobile_phone=p_from.mobile_phone,
- home_phones=p_from.home_phones,
- emails=[SerialEmail(e.address, e.name) for e in p_from.emails],
- business_address=p_from.business_address,
- other_address=p_from.other_address,
- categories=p_from.categories,
- personal_notes=p_from.personal_notes,
- )
- def __hash__(self) -> int:
- return hash(json.dumps(dataclasses.asdict(self)))
- def to_o365_contact(self, p_to: Contact):
- p_to.display_name = self.display_name
- p_to.name = self.name
- p_to.surname = self.surname
- p_to.title = self.title
- p_to.job_title = self.job_title
- p_to.company_name = self.company_name
- p_to.department = self.department
- p_to.office_location = self.office_location
- p_to.business_phones = self.business_phones
- p_to.mobile_phone = self.mobile_phone
- p_to.home_phones = self.home_phones
- p_to.emails.clear()
- for rcp in self.emails:
- p_to.emails.add(rcp)
- p_to.business_address = self.business_address
- # p_to.home_address = self.home_address
- p_to.other_address = self.other_address
- p_to.categories = self.categories
- p_to.personal_notes = self.personal_notes
- return p_to
- def copy_contact(p_from: Contact, p_to: Contact) -> SerialContact:
- p_to.display_name = p_from.display_name
- p_to.name = p_from.name
- p_to.surname = p_from.surname
- p_to.title = p_from.title
- p_to.job_title = p_from.job_title
- p_to.company_name = p_from.company_name
- p_to.department = p_from.department
- p_to.office_location = p_from.office_location
- p_to.business_phones = p_from.business_phones
- p_to.mobile_phone = p_from.mobile_phone
- p_to.home_phones = p_from.home_phones
- p_to.emails.clear()
- for rcp in p_from.emails:
- p_to.emails.add(rcp)
- p_to.business_address = p_from.business_address
- # p_to.home_address = p_from.home_address
- p_to.other_address = p_from.other_address
- p_to.categories = p_from.categories
- p_to.personal_notes = p_from.personal_notes
- p_to.save()
- return SerialContact.from_o365_contact(p_to)
- @app.command()
- def sync_contacts():
- account.authenticate()
- shared = account.address_book(resource="adressbuch@global-cube.net", address_book="personal")
- shared_contacts = {p.display_name: p for p in shared.get_contacts(limit=None)}
- shared_contacts_serial = [SerialContact.from_o365_contact(p) for p in shared_contacts.values()]
- shared_contacts_serial.sort()
- json.dump(shared_contacts_serial, (base_dir / "shared.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
- shared_contacts_hash = {c.display_name: hash(c) for c in shared_contacts_serial}
- shared_contacts_prev = json.load((base_dir / "shared_hash.json").open("r"))
- shared_contacts_combined = get_contacts_combined(shared_contacts_hash, shared_contacts_prev)
- delete_candidates = set()
- for mailbox in mailboxes:
- print(mailbox)
- mailbox_prefix = mailbox.split("@")[0]
- personal = account.address_book(resource=mailbox, address_book="personal")
- personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
- personal_contacts_serial = [SerialContact.from_o365_contact(p) for p in personal_contacts.values()]
- personal_contacts_serial.sort()
- personal_contacts_hash = {c.display_name: hash(c) for c in personal_contacts_serial}
- extra_contacts = set(personal_contacts.keys()).difference(shared_contacts.keys())
- extra_contacts_serial = [SerialContact.from_o365_contact(personal_contacts[c]) for c in extra_contacts]
- extra_contacts_serial.sort()
- json.dump(
- extra_contacts_serial,
- (base_dir / f"personal_{mailbox_prefix}_extra.json").open("w"),
- indent=2,
- cls=EnhancedJSONEncoder,
- )
- print(extra_contacts)
- different_contacts = []
- if len(delete_candidates) == 0:
- delete_candidates = extra_contacts
- else:
- delete_candidates = delete_candidates.intersection(extra_contacts)
- for p_shared in shared_contacts.values():
- if p_shared.display_name not in personal_contacts:
- p_new = personal.new_contact()
- c = copy_contact(p_shared, p_new)
- if hash(c) not in shared_contacts_combined[p_shared.display_name]:
- shared_contacts_combined[p_shared.display_name].append(hash(c))
- continue
- if personal_contacts_hash[p_shared.display_name] == shared_contacts_hash[p_shared.display_name]:
- # contact details are identical, nothing to do
- continue
- if personal_contacts_hash[p_shared.display_name] in shared_contacts_combined[p_shared.display_name]:
- # simply update contact details
- c = copy_contact(p_shared, personal_contacts[p_shared.display_name])
- if hash(c) not in shared_contacts_combined[p_shared.display_name]:
- shared_contacts_combined[p_shared.display_name].append(hash(c))
- continue
- different_contacts.append(SerialContact.from_o365_contact(personal_contacts[p_shared.display_name]))
- different_contacts.sort()
- json.dump(
- different_contacts,
- (base_dir / f"personal_{mailbox_prefix}_diff.json").open("w"),
- indent=2,
- cls=EnhancedJSONEncoder,
- )
- different_contacts_shared = [
- SerialContact.from_o365_contact(shared_contacts[c.display_name]) for c in different_contacts
- ]
- json.dump(
- different_contacts_shared,
- (base_dir / f"shared_{mailbox_prefix}_diff.json").open("w"),
- indent=2,
- cls=EnhancedJSONEncoder,
- )
- json.dump(shared_contacts_combined, (base_dir / "shared_hash.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
- return delete_candidates
- def get_contacts_combined(shared_contacts_hash: dict[str, int], shared_contacts_prev: dict):
- res = {}
- for k, v in shared_contacts_hash.items():
- if k in shared_contacts_prev:
- v2 = shared_contacts_prev[k]
- if isinstance(v2, int):
- v2 = [v2]
- if v not in v2:
- v2.append(v)
- res[k] = v2
- else:
- res[k] = [v]
- return res
- def delete_contacts(delete_candidates):
- account.authenticate()
- for mailbox in mailboxes:
- print(mailbox)
- personal = account.address_book(resource=mailbox, address_book="personal")
- personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
- selected_contacts = set(delete_candidates).intersection(personal_contacts.keys())
- print(selected_contacts)
- for c in selected_contacts:
- personal_contacts[c].delete()
- def normalize_phone_number(phone_number: str):
- if phone_number is None:
- return ""
- if phone_number.startswith("0"):
- phone_number = "+49 " + phone_number[1:]
- for c in "()/?":
- phone_number = phone_number.replace(c, "")
- return phone_number.replace(" -", "-").replace("- ", "-").replace(" ", " ")
- def cleanup_contacts():
- account.authenticate()
- for mailbox in mailboxes:
- print(mailbox)
- shared = account.address_book(resource=mailbox, address_book="personal")
- for p in shared.get_contacts(limit=None):
- business = [normalize_phone_number(no) for no in p.business_phones]
- if p.business_phones != business:
- p.business_phones = business
- home = [normalize_phone_number(no) for no in p.home_phones]
- if p.home_phones != home:
- p.home_phones = home
- mobile = normalize_phone_number(p.mobile_phone)
- if p.mobile_phone != mobile:
- p.mobile_phone = mobile
- if p._track_changes:
- print(p.display_name)
- p.save()
- def cleanup_contacts2():
- account.authenticate()
- for mailbox in mailboxes:
- print(mailbox)
- personal = account.address_book(resource=mailbox, address_book="personal")
- for p in personal.get_contacts(limit=None):
- if not p.mobile_phone:
- p.mobile_phone = ""
- if not p.job_title:
- p.job_title = ""
- if not p.department:
- p.department = ""
- if not p.office_location:
- p.office_location = ""
- p.save()
- def cleanup_contacts3():
- account.authenticate()
- for mailbox in mailboxes:
- print(mailbox)
- personal = account.address_book(resource=mailbox, address_book="personal")
- for p in personal.get_contacts(limit=None):
- if p.display_name is None or p.display_name == "":
- print(p.name, p.surname)
- def create_private_folder():
- account.authenticate()
- for mailbox in mailboxes:
- print(mailbox)
- personal = account.address_book(resource=mailbox, address_book="personal")
- private = personal.get_folder(folder_name="privat")
- if private is None:
- personal.create_child_folder("privat")
- @app.command()
- def send_mail():
- account.authenticate()
- # mailboxes2 = ["bedner@global-cube.net"]
- with open(base_dir / "../templates/info.html.jinja") as f:
- template = jinja2.Template(f.read())
- for mailbox in mailboxes:
- mailbox_prefix = mailbox.split("@")[0]
- extra_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_extra.json").open("r"))
- different_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_diff.json").open("r"))
- with SmtpMailer() as mailer:
- mailer.send(
- mailto=mailbox,
- subject="Wochenbericht adressbuch@global-cube.net",
- html=template.render(extra_contacts=extra_contacts, different_contacts=different_contacts),
- attachment=[
- ("privat.json", (base_dir / f"personal_{mailbox_prefix}_diff.json")),
- ("geteilt.json", (base_dir / f"shared_{mailbox_prefix}_diff.json")),
- ],
- )
- @app.command()
- def sync_and_delete():
- contacts = sync_contacts()
- if len(contacts) > 0:
- delete_contacts(contacts)
- if __name__ == "__main__":
- # create_private_folder()
- # send_mail()
- # cleanup_contacts3()
- app()
- # print(sync_contacts())
|