o365_copy.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. import dataclasses
  2. import json
  3. from pathlib import Path
  4. from typing import Self
  5. import jinja2
  6. import typer
  7. from O365 import Account
  8. from O365.address_book import Contact
  9. from smtp_mail import SmtpMailer
  10. app = typer.Typer()
  11. client_id = "925f74dc-f96a-4718-9ca7-d6cc3fa43e1e"
  12. client_secret = "SMn8Q~rVUnbYEAtEZZ6jcQElOIU9tDQUgv1VwcRz"
  13. account = Account(
  14. (client_id, client_secret), auth_flow_type="credentials", tenant_id="2ad0dff5-07ce-4cc2-a852-99ce8b91c218"
  15. )
  16. mailboxes = [
  17. "winter@global-cube.net",
  18. "bedner@global-cube.net",
  19. "brandt@global-cube.net",
  20. # 'd.ankenbrand@global-cube.de',
  21. # 'gawliczek@global-cube.de',
  22. "m.geiss@global-cube.net",
  23. "matarrelli@global-cube.net",
  24. "winkler@global-cube.net",
  25. "karaca@global-cube.net",
  26. ]
  27. base_dir = Path(__file__).parent / "data"
  28. class EnhancedJSONEncoder(json.JSONEncoder):
  29. def default(self, o):
  30. if dataclasses.is_dataclass(o):
  31. return dataclasses.asdict(o)
  32. return super().default(o)
  33. @dataclasses.dataclass
  34. class SerialEmail:
  35. address: str
  36. name: str
  37. @dataclasses.dataclass
  38. class SerialContact:
  39. display_name: str
  40. name: str
  41. surname: str
  42. title: str
  43. job_title: str
  44. company_name: str
  45. department: str
  46. office_location: str
  47. business_phones: str
  48. mobile_phone: str
  49. home_phones: str
  50. emails: list[SerialEmail]
  51. business_address: str
  52. other_address: str
  53. categories: str
  54. personal_notes: str
  55. def __lt__(self, other):
  56. return self.display_name < other.display_name
  57. def __eq__(self, other):
  58. return self.display_name == other.display_name
  59. @staticmethod
  60. def from_o365_contact(p_from: Contact) -> Self:
  61. return SerialContact(
  62. display_name=p_from.display_name,
  63. name=p_from.name,
  64. surname=p_from.surname,
  65. title=p_from.title,
  66. job_title=p_from.job_title,
  67. company_name=p_from.company_name,
  68. department=p_from.department,
  69. office_location=p_from.office_location,
  70. business_phones=p_from.business_phones,
  71. mobile_phone=p_from.mobile_phone,
  72. home_phones=p_from.home_phones,
  73. emails=[SerialEmail(e.address, e.name) for e in p_from.emails],
  74. business_address=p_from.business_address,
  75. other_address=p_from.other_address,
  76. categories=p_from.categories,
  77. personal_notes=p_from.personal_notes,
  78. )
  79. def __hash__(self) -> int:
  80. return hash(json.dumps(dataclasses.asdict(self)))
  81. def to_o365_contact(self, p_to: Contact):
  82. p_to.display_name = self.display_name
  83. p_to.name = self.name
  84. p_to.surname = self.surname
  85. p_to.title = self.title
  86. p_to.job_title = self.job_title
  87. p_to.company_name = self.company_name
  88. p_to.department = self.department
  89. p_to.office_location = self.office_location
  90. p_to.business_phones = self.business_phones
  91. p_to.mobile_phone = self.mobile_phone
  92. p_to.home_phones = self.home_phones
  93. p_to.emails.clear()
  94. for rcp in self.emails:
  95. p_to.emails.add(rcp)
  96. p_to.business_address = self.business_address
  97. # p_to.home_address = self.home_address
  98. p_to.other_address = self.other_address
  99. p_to.categories = self.categories
  100. p_to.personal_notes = self.personal_notes
  101. return p_to
  102. def copy_contact(p_from: Contact, p_to: Contact):
  103. p_to.display_name = p_from.display_name
  104. p_to.name = p_from.name
  105. p_to.surname = p_from.surname
  106. p_to.title = p_from.title
  107. p_to.job_title = p_from.job_title
  108. p_to.company_name = p_from.company_name
  109. p_to.department = p_from.department
  110. p_to.office_location = p_from.office_location
  111. p_to.business_phones = p_from.business_phones
  112. p_to.mobile_phone = p_from.mobile_phone
  113. p_to.home_phones = p_from.home_phones
  114. p_to.emails.clear()
  115. for rcp in p_from.emails:
  116. p_to.emails.add(rcp)
  117. p_to.business_address = p_from.business_address
  118. # p_to.home_address = p_from.home_address
  119. p_to.other_address = p_from.other_address
  120. p_to.categories = p_from.categories
  121. p_to.personal_notes = p_from.personal_notes
  122. p_to.save()
  123. @app.command()
  124. def sync_contacts():
  125. account.authenticate()
  126. shared = account.address_book(resource="adressbuch@global-cube.net", address_book="personal")
  127. shared_contacts = {p.display_name: p for p in shared.get_contacts(limit=None)}
  128. shared_contacts_serial = [SerialContact.from_o365_contact(p) for p in shared_contacts.values()]
  129. shared_contacts_serial.sort()
  130. json.dump(shared_contacts_serial, (base_dir / "shared.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
  131. shared_contacts_hash = {c.display_name: hash(c) for c in shared_contacts_serial}
  132. shared_contacts_prev = json.load((base_dir / "shared_hash.json").open("r"))
  133. delete_candidates = set()
  134. for mailbox in mailboxes:
  135. print(mailbox)
  136. mailbox_prefix = mailbox.split("@")[0]
  137. personal = account.address_book(resource=mailbox, address_book="personal")
  138. personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
  139. personal_contacts_serial = [SerialContact.from_o365_contact(p) for p in personal_contacts.values()]
  140. personal_contacts_serial.sort()
  141. personal_contacts_hash = {c.display_name: hash(c) for c in personal_contacts_serial}
  142. extra_contacts = set(personal_contacts.keys()).difference(shared_contacts.keys())
  143. extra_contacts_serial = [SerialContact.from_o365_contact(personal_contacts[c]) for c in extra_contacts]
  144. extra_contacts_serial.sort()
  145. json.dump(
  146. extra_contacts_serial,
  147. (base_dir / f"personal_{mailbox_prefix}_extra.json").open("w"),
  148. indent=2,
  149. cls=EnhancedJSONEncoder,
  150. )
  151. print(extra_contacts)
  152. different_contacts = []
  153. if len(delete_candidates) == 0:
  154. delete_candidates = extra_contacts
  155. else:
  156. delete_candidates = delete_candidates.intersection(extra_contacts)
  157. for p_shared in shared_contacts.values():
  158. if p_shared.display_name not in personal_contacts:
  159. p_new = personal.new_contact()
  160. copy_contact(p_shared, p_new)
  161. continue
  162. if personal_contacts_hash[p_shared.display_name] == shared_contacts_hash[p_shared.display_name]:
  163. # contact details are identical, nothing to do
  164. continue
  165. if personal_contacts_hash[p_shared.display_name] == shared_contacts_prev[p_shared.display_name]:
  166. # simply update contact details
  167. copy_contact(p_shared, personal_contacts[p_shared.display_name])
  168. continue
  169. different_contacts.append(SerialContact.from_o365_contact(personal_contacts[p_shared.display_name]))
  170. different_contacts.sort()
  171. json.dump(
  172. different_contacts,
  173. (base_dir / f"personal_{mailbox_prefix}_diff.json").open("w"),
  174. indent=2,
  175. cls=EnhancedJSONEncoder,
  176. )
  177. different_contacts_shared = [
  178. SerialContact.from_o365_contact(shared_contacts[c.display_name]) for c in different_contacts
  179. ]
  180. json.dump(
  181. different_contacts_shared,
  182. (base_dir / f"shared_{mailbox_prefix}_diff.json").open("w"),
  183. indent=2,
  184. cls=EnhancedJSONEncoder,
  185. )
  186. json.dump(shared_contacts_hash, (base_dir / "shared_hash.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
  187. return delete_candidates
  188. def delete_contacts(delete_candidates):
  189. account.authenticate()
  190. for mailbox in mailboxes:
  191. print(mailbox)
  192. personal = account.address_book(resource=mailbox, address_book="personal")
  193. personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
  194. selected_contacts = set(delete_candidates).intersection(personal_contacts.keys())
  195. print(selected_contacts)
  196. for c in selected_contacts:
  197. personal_contacts[c].delete()
  198. def normalize_phone_number(phone_number: str):
  199. if phone_number is None:
  200. return ""
  201. if phone_number.startswith("0"):
  202. phone_number = "+49 " + phone_number[1:]
  203. for c in "()/?":
  204. phone_number = phone_number.replace(c, "")
  205. return phone_number.replace(" -", "-").replace("- ", "-").replace(" ", " ")
  206. def cleanup_contacts():
  207. account.authenticate()
  208. for mailbox in mailboxes:
  209. print(mailbox)
  210. shared = account.address_book(resource=mailbox, address_book="personal")
  211. for p in shared.get_contacts(limit=None):
  212. business = [normalize_phone_number(no) for no in p.business_phones]
  213. if p.business_phones != business:
  214. p.business_phones = business
  215. home = [normalize_phone_number(no) for no in p.home_phones]
  216. if p.home_phones != home:
  217. p.home_phones = home
  218. mobile = normalize_phone_number(p.mobile_phone)
  219. if p.mobile_phone != mobile:
  220. p.mobile_phone = mobile
  221. if p._track_changes:
  222. print(p.display_name)
  223. p.save()
  224. def cleanup_contacts2():
  225. account.authenticate()
  226. for mailbox in mailboxes:
  227. print(mailbox)
  228. personal = account.address_book(resource=mailbox, address_book="personal")
  229. for p in personal.get_contacts(limit=None):
  230. if not p.mobile_phone:
  231. p.mobile_phone = ""
  232. if not p.job_title:
  233. p.job_title = ""
  234. if not p.department:
  235. p.department = ""
  236. if not p.office_location:
  237. p.office_location = ""
  238. p.save()
  239. def cleanup_contacts3():
  240. account.authenticate()
  241. for mailbox in mailboxes:
  242. print(mailbox)
  243. personal = account.address_book(resource=mailbox, address_book="personal")
  244. for p in personal.get_contacts(limit=None):
  245. if p.display_name is None or p.display_name == "":
  246. print(p.name, p.surname)
  247. def create_private_folder():
  248. account.authenticate()
  249. for mailbox in mailboxes:
  250. print(mailbox)
  251. personal = account.address_book(resource=mailbox, address_book="personal")
  252. private = personal.get_folder(folder_name="privat")
  253. if private is None:
  254. personal.create_child_folder("privat")
  255. @app.command()
  256. def send_mail():
  257. account.authenticate()
  258. # mailboxes2 = ["bedner@global-cube.net"]
  259. with open(base_dir / "../templates/info.html.jinja") as f:
  260. template = jinja2.Template(f.read())
  261. for mailbox in mailboxes:
  262. mailbox_prefix = mailbox.split("@")[0]
  263. extra_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_extra.json").open("r"))
  264. different_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_diff.json").open("r"))
  265. with SmtpMailer() as mailer:
  266. mailer.send(
  267. mailto=mailbox,
  268. subject="Wochenbericht adressbuch@global-cube.net",
  269. html=template.render(extra_contacts=extra_contacts, different_contacts=different_contacts),
  270. attachment=[
  271. ("privat.json", (base_dir / f"personal_{mailbox_prefix}_diff.json")),
  272. ("geteilt.json", (base_dir / f"shared_{mailbox_prefix}_diff.json")),
  273. ],
  274. )
  275. @app.command()
  276. def sync_and_delete():
  277. contacts = sync_contacts()
  278. if len(contacts) > 0:
  279. delete_contacts(contacts)
  280. if __name__ == "__main__":
  281. # create_private_folder()
  282. # send_mail()
  283. # cleanup_contacts3()
  284. app()