o365_copy.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. import dataclasses
  2. import json
  3. from pathlib import Path
  4. import jinja2
  5. import typer
  6. from O365 import Account
  7. from O365.address_book import Contact
  8. from smtp_mail import SmtpMailer
  9. # from typing import Self
  10. app = typer.Typer()
  11. client_id = "925f74dc-f96a-4718-9ca7-d6cc3fa43e1e"
  12. client_secret = "SMn8Q~rVUnbYEAtEZZ6jcQElOIU9tDQUgv1VwcRz"
  13. account = Account(
  14. (client_id, client_secret), auth_flow_type="credentials", tenant_id="2ad0dff5-07ce-4cc2-a852-99ce8b91c218"
  15. )
  16. mailboxes = [
  17. "winter@global-cube.net",
  18. "bedner@global-cube.net",
  19. "brandt@global-cube.net",
  20. # 'd.ankenbrand@global-cube.de',
  21. # 'gawliczek@global-cube.de',
  22. "m.geiss@global-cube.net",
  23. "matarrelli@global-cube.net",
  24. "winkler@global-cube.net",
  25. "karaca@global-cube.net",
  26. ]
  27. base_dir = Path(__file__).parent / "data"
  28. class EnhancedJSONEncoder(json.JSONEncoder):
  29. def default(self, o):
  30. if dataclasses.is_dataclass(o):
  31. return dataclasses.asdict(o)
  32. return super().default(o)
  33. @dataclasses.dataclass
  34. class SerialEmail:
  35. address: str
  36. name: str
  37. @dataclasses.dataclass
  38. class SerialContact:
  39. display_name: str
  40. name: str
  41. surname: str
  42. title: str
  43. job_title: str
  44. company_name: str
  45. department: str
  46. office_location: str
  47. business_phones: str
  48. mobile_phone: str
  49. home_phones: str
  50. emails: list[SerialEmail]
  51. business_address: str
  52. other_address: str
  53. categories: str
  54. personal_notes: str
  55. def __lt__(self, other):
  56. return self.display_name < other.display_name
  57. def __eq__(self, other):
  58. return self.display_name == other.display_name
  59. @staticmethod
  60. def from_o365_contact(p_from: Contact):
  61. return SerialContact(
  62. display_name=p_from.display_name or "",
  63. name=p_from.name,
  64. surname=p_from.surname,
  65. title=p_from.title,
  66. job_title=p_from.job_title,
  67. company_name=p_from.company_name,
  68. department=p_from.department,
  69. office_location=p_from.office_location,
  70. business_phones=p_from.business_phones,
  71. mobile_phone=p_from.mobile_phone,
  72. home_phones=p_from.home_phones,
  73. emails=[SerialEmail(e.address, e.name) for e in p_from.emails],
  74. business_address=p_from.business_address,
  75. other_address=p_from.other_address,
  76. categories=p_from.categories,
  77. personal_notes=p_from.personal_notes,
  78. )
  79. def __hash__(self) -> int:
  80. return hash(json.dumps(dataclasses.asdict(self)))
  81. def to_o365_contact(self, p_to: Contact):
  82. p_to.display_name = self.display_name
  83. p_to.name = self.name
  84. p_to.surname = self.surname
  85. p_to.title = self.title
  86. p_to.job_title = self.job_title
  87. p_to.company_name = self.company_name
  88. p_to.department = self.department
  89. p_to.office_location = self.office_location
  90. p_to.business_phones = self.business_phones
  91. p_to.mobile_phone = self.mobile_phone
  92. p_to.home_phones = self.home_phones
  93. p_to.emails.clear()
  94. for rcp in self.emails:
  95. p_to.emails.add(rcp)
  96. p_to.business_address = self.business_address
  97. # p_to.home_address = self.home_address
  98. p_to.other_address = self.other_address
  99. p_to.categories = self.categories
  100. p_to.personal_notes = self.personal_notes
  101. return p_to
  102. def copy_contact(p_from: Contact, p_to: Contact) -> SerialContact:
  103. p_to.display_name = p_from.display_name
  104. p_to.name = p_from.name
  105. p_to.surname = p_from.surname
  106. p_to.title = p_from.title
  107. p_to.job_title = p_from.job_title
  108. p_to.company_name = p_from.company_name
  109. p_to.department = p_from.department
  110. p_to.office_location = p_from.office_location
  111. p_to.business_phones = p_from.business_phones
  112. p_to.mobile_phone = p_from.mobile_phone
  113. p_to.home_phones = p_from.home_phones
  114. p_to.emails.clear()
  115. for rcp in p_from.emails:
  116. p_to.emails.add(rcp)
  117. p_to.business_address = p_from.business_address
  118. # p_to.home_address = p_from.home_address
  119. p_to.other_address = p_from.other_address
  120. p_to.categories = p_from.categories
  121. p_to.personal_notes = p_from.personal_notes
  122. p_to.save()
  123. return SerialContact.from_o365_contact(p_to)
  124. @app.command()
  125. def sync_contacts():
  126. account.authenticate()
  127. shared = account.address_book(resource="adressbuch@global-cube.net", address_book="personal")
  128. shared_contacts = {p.display_name: p for p in shared.get_contacts(limit=None)}
  129. shared_contacts_serial = [SerialContact.from_o365_contact(p) for p in shared_contacts.values()]
  130. shared_contacts_serial.sort()
  131. json.dump(shared_contacts_serial, (base_dir / "shared.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
  132. shared_contacts_hash = {c.display_name: hash(c) for c in shared_contacts_serial}
  133. shared_contacts_prev = json.load((base_dir / "shared_hash.json").open("r"))
  134. shared_contacts_combined = get_contacts_combined(shared_contacts_hash, shared_contacts_prev)
  135. delete_candidates = set()
  136. for mailbox in mailboxes:
  137. print(mailbox)
  138. mailbox_prefix = mailbox.split("@")[0]
  139. personal = account.address_book(resource=mailbox, address_book="personal")
  140. personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
  141. personal_contacts_serial = [SerialContact.from_o365_contact(p) for p in personal_contacts.values()]
  142. personal_contacts_serial.sort()
  143. personal_contacts_hash = {c.display_name: hash(c) for c in personal_contacts_serial}
  144. extra_contacts = set(personal_contacts.keys()).difference(shared_contacts.keys())
  145. extra_contacts_serial = [SerialContact.from_o365_contact(personal_contacts[c]) for c in extra_contacts]
  146. extra_contacts_serial.sort()
  147. json.dump(
  148. extra_contacts_serial,
  149. (base_dir / f"personal_{mailbox_prefix}_extra.json").open("w"),
  150. indent=2,
  151. cls=EnhancedJSONEncoder,
  152. )
  153. print(extra_contacts)
  154. different_contacts = []
  155. if len(delete_candidates) == 0:
  156. delete_candidates = extra_contacts
  157. else:
  158. delete_candidates = delete_candidates.intersection(extra_contacts)
  159. for p_shared in shared_contacts.values():
  160. if p_shared.display_name not in personal_contacts:
  161. p_new = personal.new_contact()
  162. c = copy_contact(p_shared, p_new)
  163. if hash(c) not in shared_contacts_combined[p_shared.display_name]:
  164. shared_contacts_combined[p_shared.display_name].append(hash(c))
  165. continue
  166. if personal_contacts_hash[p_shared.display_name] == shared_contacts_hash[p_shared.display_name]:
  167. # contact details are identical, nothing to do
  168. continue
  169. if personal_contacts_hash[p_shared.display_name] in shared_contacts_combined[p_shared.display_name]:
  170. # simply update contact details
  171. c = copy_contact(p_shared, personal_contacts[p_shared.display_name])
  172. if hash(c) not in shared_contacts_combined[p_shared.display_name]:
  173. shared_contacts_combined[p_shared.display_name].append(hash(c))
  174. continue
  175. different_contacts.append(SerialContact.from_o365_contact(personal_contacts[p_shared.display_name]))
  176. different_contacts.sort()
  177. json.dump(
  178. different_contacts,
  179. (base_dir / f"personal_{mailbox_prefix}_diff.json").open("w"),
  180. indent=2,
  181. cls=EnhancedJSONEncoder,
  182. )
  183. different_contacts_shared = [
  184. SerialContact.from_o365_contact(shared_contacts[c.display_name]) for c in different_contacts
  185. ]
  186. json.dump(
  187. different_contacts_shared,
  188. (base_dir / f"shared_{mailbox_prefix}_diff.json").open("w"),
  189. indent=2,
  190. cls=EnhancedJSONEncoder,
  191. )
  192. json.dump(shared_contacts_combined, (base_dir / "shared_hash.json").open("w"), indent=2, cls=EnhancedJSONEncoder)
  193. return delete_candidates
  194. def get_contacts_combined(shared_contacts_hash: dict[str, int], shared_contacts_prev: dict):
  195. res = {}
  196. for k, v in shared_contacts_hash.items():
  197. if k in shared_contacts_prev:
  198. v2 = shared_contacts_prev[k]
  199. if isinstance(v2, int):
  200. v2 = [v2]
  201. if v not in v2:
  202. v2.append(v)
  203. res[k] = v2
  204. else:
  205. res[k] = [v]
  206. return res
  207. def delete_contacts(delete_candidates):
  208. account.authenticate()
  209. for mailbox in mailboxes:
  210. print(mailbox)
  211. personal = account.address_book(resource=mailbox, address_book="personal")
  212. personal_contacts = {p.display_name: p for p in personal.get_contacts(limit=None)}
  213. selected_contacts = set(delete_candidates).intersection(personal_contacts.keys())
  214. print(selected_contacts)
  215. for c in selected_contacts:
  216. personal_contacts[c].delete()
  217. def normalize_phone_number(phone_number: str):
  218. if phone_number is None:
  219. return ""
  220. if phone_number.startswith("0"):
  221. phone_number = "+49 " + phone_number[1:]
  222. for c in "()/?":
  223. phone_number = phone_number.replace(c, "")
  224. return phone_number.replace(" -", "-").replace("- ", "-").replace(" ", " ")
  225. def cleanup_contacts():
  226. account.authenticate()
  227. for mailbox in mailboxes:
  228. print(mailbox)
  229. shared = account.address_book(resource=mailbox, address_book="personal")
  230. for p in shared.get_contacts(limit=None):
  231. business = [normalize_phone_number(no) for no in p.business_phones]
  232. if p.business_phones != business:
  233. p.business_phones = business
  234. home = [normalize_phone_number(no) for no in p.home_phones]
  235. if p.home_phones != home:
  236. p.home_phones = home
  237. mobile = normalize_phone_number(p.mobile_phone)
  238. if p.mobile_phone != mobile:
  239. p.mobile_phone = mobile
  240. if p._track_changes:
  241. print(p.display_name)
  242. p.save()
  243. def cleanup_contacts2():
  244. account.authenticate()
  245. for mailbox in mailboxes:
  246. print(mailbox)
  247. personal = account.address_book(resource=mailbox, address_book="personal")
  248. for p in personal.get_contacts(limit=None):
  249. if not p.mobile_phone:
  250. p.mobile_phone = ""
  251. if not p.job_title:
  252. p.job_title = ""
  253. if not p.department:
  254. p.department = ""
  255. if not p.office_location:
  256. p.office_location = ""
  257. p.save()
  258. def cleanup_contacts3():
  259. account.authenticate()
  260. for mailbox in mailboxes:
  261. print(mailbox)
  262. personal = account.address_book(resource=mailbox, address_book="personal")
  263. for p in personal.get_contacts(limit=None):
  264. if p.display_name is None or p.display_name == "":
  265. print(p.name, p.surname)
  266. def create_private_folder():
  267. account.authenticate()
  268. for mailbox in mailboxes:
  269. print(mailbox)
  270. personal = account.address_book(resource=mailbox, address_book="personal")
  271. private = personal.get_folder(folder_name="privat")
  272. if private is None:
  273. personal.create_child_folder("privat")
  274. @app.command()
  275. def send_mail():
  276. account.authenticate()
  277. # mailboxes2 = ["bedner@global-cube.net"]
  278. with open(base_dir / "../templates/info.html.jinja") as f:
  279. template = jinja2.Template(f.read())
  280. for mailbox in mailboxes:
  281. mailbox_prefix = mailbox.split("@")[0]
  282. extra_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_extra.json").open("r"))
  283. different_contacts = json.load((base_dir / f"personal_{mailbox_prefix}_diff.json").open("r"))
  284. with SmtpMailer() as mailer:
  285. mailer.send(
  286. mailto=mailbox,
  287. subject="Wochenbericht adressbuch@global-cube.net",
  288. html=template.render(extra_contacts=extra_contacts, different_contacts=different_contacts),
  289. attachment=[
  290. ("privat.json", (base_dir / f"personal_{mailbox_prefix}_diff.json")),
  291. ("geteilt.json", (base_dir / f"shared_{mailbox_prefix}_diff.json")),
  292. ],
  293. )
  294. @app.command()
  295. def sync_and_delete():
  296. contacts = sync_contacts()
  297. if len(contacts) > 0:
  298. delete_contacts(contacts)
  299. if __name__ == "__main__":
  300. # create_private_folder()
  301. # send_mail()
  302. # cleanup_contacts3()
  303. app()
  304. # print(sync_contacts())