o365_copy.py 12 KB


  1. import dataclasses
  2. import json
  3. from pathlib import Path
  4. from typing import Self
  5. import jinja2
  6. import typer
  7. from O365 import Account
  8. from O365.address_book import Contact
  9. from smtp_mail import SmtpMailer
  10. app = typer.Typer()
  11. client_id = "925f74dc-f96a-4718-9ca7-d6cc3fa43e1e"
  12. client_secret = "SMn8Q~rVUnbYEAtEZZ6jcQElOIU9tDQUgv1VwcRz"
  13. account = Account(
  14. (client_id, client_secret), auth_flow_type="credentials", tenant_id="2ad0dff5-07ce-4cc2-a852-99ce8b91c218"
  15. )
  16. mailboxes = [
  17. "winter@global-cube.net",
  18. "bedner@global-cube.net",
  19. "brandt@global-cube.net",
  20. # 'd.ankenbrand@global-cube.de',
  21. # 'gawliczek@global-cube.de',
  22. "m.geiss@global-cube.net",
  23. "matarrelli@global-cube.net",
  24. "winkler@global-cube.net",
  25. "karaca@global-cube.net",
  26. ]
  27. base_dir = Path(__file__).parent / "data"
  28. class EnhancedJSONEncoder(json.JSONEncoder):
  29. def default(self, o):
  30. if dataclasses.is_dataclass(o):
  31. return dataclasses.asdict(o)
  32. return super().default(o)
  33. @dataclasses.dataclass
  34. class SerialEmail:
  35. address: str
  36. name: str
  37. @dataclasses.dataclass
  38. class SerialContact:
  39. display_name: str
  40. name: str
  41. surname: str
  42. title: str
  43. job_title: str
  44. company_name: str
  45. department: str
  46. office_location: str
  47. business_phones: str
  48. mobile_phone: str
  49. home_phones: str
  50. emails: list[SerialEmail]
  51. business_address: str
  52. other_address: str
  53. categories: str
  54. personal_notes: str
  55. def __lt__(self, other):
  56. return self.display_name < other.display_name
  57. def __eq__(self, other):
  58. return self.display_name == other.display_name
  59. @staticmethod
  60. def from_o365_contact(p_from: Contact) -> Self:
  61. return SerialContact(
  62. display_name=p_from.display_name,
  63. name=p_from.name,
  64. surname=p_from.surname,
  65. title=p_from.title,
  66. job_title=p_from.job_title,
  67. company_name=p_from.company_name,
  68. department=p_from.department,
  69. office_location=p_from.office_location,
  70. business_phones=p_from.business_phones,
  71. mobile_phone=p_from.mobile_phone,
  72. home_phones=p_from.home_phones,
  73. emails=[SerialEmail(e.address, e.name) for e in p_from.emails],
  74. business_address=p_from.business_address,
  75. other_address=p_from.other_address,
  76. categories=p_from.categories,
  77. personal_notes=p_from.personal_notes,
  78. )
  79. def __hash__(self) -> int:
  80. return hash(json.dumps(dataclasses.asdict(self)))
  81. def to_o365_contact(self, p_to: Contact):
  82. p_to.display_name = self.display_name
  83. p_to.name = self.name
  84. p_to.surname = self.surname
  85. p_to.title = self.title
  86. p_to.job_title = self.job_title
  87. p_to.company_name = self.company_name
  88. p_to.department = self.department
  89. p_to.office_location = self.office_location
  90. p_to.business_phones = self.business_phones
  91. p_to.mobile_phone = self.mobile_phone
  92. p_to.home_phones = self.home_phones
  93. p_to.emails.clear()
  94. for rcp in self.emails:
  95. p_to.emails.add(rcp)
  96. p_to.business_address = self.business_address
  97. # p_to.home_address = self.home_address
  98. p_to.other_address = self.other_address
  99. p_to.categories = self.categories
  100. p_to.personal_notes = self.personal_notes
  101. return p_to
  102. def copy_contact(p_from: Contact, p_to: Contact):
  103. p_to.display_name = p_from.display_name
  104. p_to.name = p_from.name
  105. p_to.surname = p_from.surname
  106. p_to.title = p_from.title
  107. p_to.job_title = p_from.job_title
  108. p_to.company_name = p_from.company_name
  109. p_to.department = p_from.department
  110. p_to.office_location = p_from.office_location
  111. p_to.business_phones = p_from.business_phones
  112. p_to.mobile_phone = p_from.mobile_phone
  113. p_to.home_phones = p_from.home_phones
  114. p_to.emails.clear()
  115. for rcp in p_from.emails:
  116. p_to.emails.add(rcp)
  117. p_to.business_address = p_from.business_address
  118. # p_to.home_address = p_from.home_address
  119. p_to.other_address = p_from.other_address
  120. p_to.categories = p_from.categories
  121. p_to.personal_notes = p_from.personal_notes
  122. p_to.save()
  123. @app.command()
  124. def sync_contacts():
  125. account.authenticate()
  126. shared = account.address_book(resource="adressbuch@global-cube.net", address_book="personal")
  127. shared_contacts = {p.display_name: p for p in shared.get_contacts(limit=None)}
  128. shared_contacts_serial = [SerialContact.from_o365_contact(p) for p in shared_contacts.values()]
  129. shared_contacts_serial.sort()
  130. json.dump(shared_contacts_serial, (base_dir / "shared.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
  131. shared_contacts_hash = {c.display_name: hash(c) for c in shared_contacts_serial}
  132. shared_contacts_prev = json.load((base_dir / "shared_hash.json").open("r"))
  133. delete_candidates = set()
  134. for mailbox in mailboxes:
  135. print(mailbox)
  136. mailbox_prefix = mailbox.split("@")[0]
  137. personal = account.address_book(resource=mailbox, address_book="personal")
  138. personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
  139. personal_contacts_serial = [SerialContact.from_o365_contact(p) for p in personal_contacts.values()]
  140. personal_contacts_serial.sort()
  141. personal_contacts_hash = {c.display_name: hash(c) for c in personal_contacts_serial}
  142. extra_contacts = set(personal_contacts.keys()).difference(shared_contacts.keys())
  143. extra_contacts_serial = [SerialContact.from_o365_contact(personal_contacts[c]) for c in extra_contacts]
  144. extra_contacts_serial.sort()
  145. json.dump(
  146. extra_contacts_serial,
  147. (base_dir / f"personal_{mailbox_prefix}_extra.json").open("w"),
  148. indent=2,
  149. cls=EnhancedJSONEncoder,
  150. )
  151. print(extra_contacts)
  152. different_contacts = []
  153. if len(delete_candidates) == 0:
  154. delete_candidates = extra_contacts
  155. else:
  156. delete_candidates = delete_candidates.intersection(extra_contacts)
  157. for p_shared in shared_contacts.values():
  158. if p_shared.display_name not in personal_contacts:
  159. p_new = personal.new_contact()
  160. copy_contact(p_shared, p_new)
  161. continue
  162. if personal_contacts_hash[p_shared.display_name] == shared_contacts_hash[p_shared.display_name]:
  163. # contact details are identical, nothing to do
  164. continue
  165. if personal_contacts_hash[p_shared.display_name] == shared_contacts_prev[p_shared.display_name]:
  166. # simply update contact details
  167. copy_contact(p_shared, personal_contacts[p_shared.display_name])
  168. continue
  169. different_contacts.append(SerialContact.from_o365_contact(personal_contacts[p_shared.display_name]))
  170. different_contacts.sort()
  171. json.dump(
  172. different_contacts,
  173. (base_dir / f"personal_{mailbox_prefix}_diff.json").open("w"),
  174. indent=2,
  175. cls=EnhancedJSONEncoder,
  176. )
  177. different_contacts_shared = [
  178. SerialContact.from_o365_contact(shared_contacts[c.display_name]) for c in different_contacts
  179. ]
  180. json.dump(
  181. different_contacts_shared,
  182. (base_dir / f"shared_{mailbox_prefix}_diff.json").open("w"),
  183. indent=2,
  184. cls=EnhancedJSONEncoder,
  185. )
  186. json.dump(shared_contacts_hash, (base_dir / "shared_hash.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
  187. return delete_candidates
  188. def delete_contacts(delete_candidates):
  189. account.authenticate()
  190. for mailbox in mailboxes:
  191. print(mailbox)
  192. personal = account.address_book(resource=mailbox, address_book="personal")
  193. personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
  194. selected_contacts = set(delete_candidates).intersection(personal_contacts.keys())
  195. print(selected_contacts)
  196. for c in selected_contacts:
  197. personal_contacts[c].delete()
  198. def normalize_phone_number(phone_number: str):
  199. if phone_number is None:
  200. return ""
  201. if phone_number.startswith("0"):
  202. phone_number = "+49 " + phone_number[1:]
  203. for c in "()/?":
  204. phone_number = phone_number.replace(c, "")
  205. return phone_number.replace(" -", "-").replace("- ", "-").replace(" ", " ")
  206. def cleanup_contacts():
  207. account.authenticate()
  208. for mailbox in mailboxes:
  209. print(mailbox)
  210. shared = account.address_book(resource=mailbox, address_book="personal")
  211. for p in shared.get_contacts(limit=None):
  212. business = [normalize_phone_number(no) for no in p.business_phones]
  213. if p.business_phones != business:
  214. p.business_phones = business
  215. home = [normalize_phone_number(no) for no in p.home_phones]
  216. if p.home_phones != home:
  217. p.home_phones = home
  218. mobile = normalize_phone_number(p.mobile_phone)
  219. if p.mobile_phone != mobile:
  220. p.mobile_phone = mobile
  221. if p._track_changes:
  222. print(p.display_name)
  223. p.save()
  224. def cleanup_contacts2():
  225. account.authenticate()
  226. for mailbox in mailboxes:
  227. print(mailbox)
  228. personal = account.address_book(resource=mailbox, address_book="personal")
  229. for p in personal.get_contacts(limit=None):
  230. if not p.mobile_phone:
  231. p.mobile_phone = ""
  232. if not p.job_title:
  233. p.job_title = ""
  234. if not p.department:
  235. p.department = ""
  236. if not p.office_location:
  237. p.office_location = ""
  238. p.save()
  239. def cleanup_contacts3():
  240. account.authenticate()
  241. for mailbox in mailboxes:
  242. print(mailbox)
  243. personal = account.address_book(resource=mailbox, address_book="personal")
  244. for p in personal.get_contacts(limit=None):
  245. if p.display_name is None or p.display_name == "":
  246. print(p.name, p.surname)
  247. def create_private_folder():
  248. account.authenticate()
  249. for mailbox in mailboxes:
  250. print(mailbox)
  251. personal = account.address_book(resource=mailbox, address_book="personal")
  252. private = personal.get_folder(folder_name="privat")
  253. if private is None:
  254. personal.create_child_folder("privat")
  255. @app.command()
  256. def send_mail():
  257. account.authenticate()
  258. # mailboxes2 = ["bedner@global-cube.net"]
  259. with open(base_dir / "../templates/info.html.jinja") as f:
  260. template = jinja2.Template(f.read())
  261. for mailbox in mailboxes:
  262. mailbox_prefix = mailbox.split("@")[0]
  263. extra_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_extra.json").open("r"))
  264. different_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_diff.json").open("r"))
  265. with SmtpMailer() as mailer:
  266. mailer.send(
  267. mailto=mailbox,
  268. subject="Wochenbericht adressbuch@global-cube.net",
  269. html=template.render(extra_contacts=extra_contacts, different_contacts=different_contacts),
  270. attachment=[
  271. ("privat.json", (base_dir / f"personal_{mailbox_prefix}_diff.json")),
  272. ("geteilt.json", (base_dir / f"shared_{mailbox_prefix}_diff.json")),
  273. ],
  274. )
  275. @app.command()
  276. def sync_and_delete():
  277. contacts = sync_contacts()
  278. if len(contacts) > 0:
  279. delete_contacts(contacts)
  280. if __name__ == "__main__":
  281. # create_private_folder()
  282. # send_mail()
  283. # cleanup_contacts3()
  284. app()