convert_ldap_to_db.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. from pathlib import Path
  3. base_dir = Path(__file__).parent / "Becker-Tiemann"
  4. def read_ldap_json(json_file: str) -> dict:
  5. with open(base_dir / json_file, "r", encoding="latin-1") as frh:
  6. groups_temp = json.load(frh)
  7. res = {}
  8. for entry in groups_temp["entries"]:
  9. res[entry["dn"]] = entry["attributes"]
  10. return res
  11. def read_or_create_selected_groups(json_file: str, groups: dict) -> dict:
  12. sel_groups = []
  13. if Path(base_dir / json_file).exists():
  14. with open(base_dir / json_file, "r", encoding="latin-1") as frh:
  15. sel_groups = json.load(frh)
  16. else:
  17. sel_groups = [dn for dn, attrs in groups.items() if len(attrs.get("member", [])) > 0]
  18. with open(base_dir / json_file, "w", encoding="latin-1") as fwh:
  19. json.dump(sel_groups, fwh, indent=2)
  20. return sel_groups
  21. def print_group_membership(groups: dict, sel_groups: list):
  22. for dn, attrs in groups.items():
  23. group_members = [member_dn for member_dn in attrs.get("member", []) if member_dn in groups]
  24. if group_members:
  25. print("\n" + dn, len(group_members))
  26. for member_dn in group_members:
  27. print(" * " + member_dn)
  28. def get_subgroups(groups: dict, sel_groups: list):
  29. res = []
  30. for group_dn in sel_groups:
  31. attrs = groups.get(group_dn, None)
  32. if not attrs:
  33. continue
  34. res.append(group_dn)
  35. res.extend(get_subgroups(groups, attrs.get("member", [])))
  36. return res
  37. def set_group_member_of(groups: dict, sel_groups: list, parents: list = None):
  38. if parents is None:
  39. parents = []
  40. for group_dn in sel_groups:
  41. attrs = groups.get(group_dn, None)
  42. if not attrs:
  43. continue
  44. attrs["memberOf"] = parents
  45. set_group_member_of(groups, attrs.get("member", []), parents + [group_dn])
  46. def get_cn_or_ou_from_dn(dn: str) -> str:
  47. return dn.split(",")[0][3:]
  48. def main():
  49. groups = read_ldap_json("groups.json")
  50. sel_roles = read_or_create_selected_groups("selected_roles.json", groups)
  51. sel_groups = read_or_create_selected_groups("selected_groups.json", groups)
  52. users = read_ldap_json("users.json")
  53. for user_dn, attrs in users.items():
  54. member_of_role = [groups[dn]["cn"] for dn in sel_roles if is_transitive_member_of(groups, attrs, dn)]
  55. if len(member_of_role) > 0:
  56. # "Benutzer_ID;Name;E-Mail;Rolle;Mitarbeiter;Benutzer_DN"
  57. # role = "Serviceberater" if "_SERV" in member_of[0] else "Buchhaltung"
  58. # print(
  59. # ";".join(
  60. # [
  61. # attrs.get("sAMAccountName", ""),
  62. # attrs.get("cn", ""),
  63. # attrs.get("mail", ""),
  64. # role,
  65. # attrs.get("cn", ""),
  66. # user_dn,
  67. # ]
  68. # )
  69. # )
  70. # print(attrs.get("cn", user_dn))
  71. for g in member_of_role:
  72. print(attrs.get("sAMAccountName", user_dn) + ";" + g)
  73. member_of_group = [
  74. get_cn_or_ou_from_dn(dn) for dn in sel_groups if is_transitive_member_of(groups, attrs, dn)
  75. ]
  76. if len(member_of_group) > 0:
  77. for g in member_of_group:
  78. print(attrs.get("sAMAccountName", user_dn) + ";" + g)
  79. # for dn in sel_groups:
  80. # group = groups.get(dn, {})
  81. # print("\n" + dn, len(group.get("member", [])))
  82. # for member_dn in group.get("member", []):
  83. # print(" - " + member_dn)
  84. def get_ou_subgroups(group_dn: str) -> list[str]:
  85. res = []
  86. group_dn_split = group_dn.split(",")
  87. for i, entry in enumerate(group_dn_split):
  88. if entry[:2] not in ("OU", "ou"):
  89. continue
  90. res.append(",".join(group_dn_split[i:]))
  91. return res
  92. def is_transitive_member_of(groups: dict, dn_attrs: dict, group_dn: str) -> bool:
  93. if group_dn in get_ou_subgroups(dn_attrs["distinguishedName"]):
  94. return True
  95. for dn in dn_attrs.get("memberOf", []):
  96. if dn == group_dn:
  97. return True
  98. if dn in groups and is_transitive_member_of(groups, groups[dn], group_dn):
  99. return True
  100. return False
  101. if __name__ == "__main__":
  102. main()