apache_ldap.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import base64
  2. import hashlib
  3. import os
  4. import ldap3
  5. LDAP_SERVER = "localhost:10389"
  6. LDAP_BASE_DN = "ou=cognos,dc=ibm,dc=com"
  7. def ldap_connect(username: str, password: str) -> ldap3.Connection:
  8. if not username.startswith("uid="):
  9. username = f"uid={username},{LDAP_BASE_DN}"
  10. server = ldap3.Server(LDAP_SERVER, get_info=ldap3.ALL, use_ssl=False)
  11. conn = ldap3.Connection(server, user=username, password=password)
  12. if not conn.bind():
  13. print(conn.result)
  14. return None
  15. return conn
  16. def ldap_backup(username: str, password: str, backup_file: str):
  17. conn = ldap_connect(username, password)
  18. if not conn:
  19. return
  20. conn.search(
  21. LDAP_BASE_DN,
  22. "(objectclass=person)",
  23. "SUBTREE",
  24. attributes=[ldap3.ALL_ATTRIBUTES, ldap3.ALL_OPERATIONAL_ATTRIBUTES],
  25. )
  26. with open(backup_file, "w", encoding="latin-1", newline="") as fwh:
  27. fwh.write(conn.response_to_ldif())
  28. format_string = "{:15} {:25} {:19} {:25} {}"
  29. print(format_string.format("UID", "Name", "erstellt", "E-Mail", "Details"))
  30. for e in conn.entries:
  31. # print(e.entry_to_ldif())
  32. # print(e.entry_to_json())
  33. desc = ""
  34. email = ""
  35. if "description" in e:
  36. desc = e.description
  37. if "mail" in e:
  38. email = e.mail
  39. print(format_string.format(str(e.uid), str(e.givenName), str(e.createTimestamp)[:19], str(email), desc))
  40. conn.unbind()
  41. def ldap_restore(username: str, password: str, backup_file: str):
  42. conn = ldap_connect(username, password)
  43. if not conn:
  44. return
  45. with open(backup_file, "r", encoding="latin-1") as ldif_file:
  46. ldif_data = ldif_file.read()
  47. ldif = ldap3.LDIF(ldif_data)
  48. for entry in ldif.entries:
  49. dn = entry["dn"]
  50. object_class = entry["objectClass"]
  51. attributes = entry["attributes"]
  52. conn.search(dn, "(objectClass=*)", search_scope=ldap3.SUBTREE)
  53. if conn.entries:
  54. print(f"Eintrag {dn} existiert bereits.")
  55. else:
  56. conn.add(dn, object_class, attributes)
  57. conn.unbind()
  58. def ldap_create_user(admin_username: str, admin_password: str, new_username: str, new_password: str, email: str):
  59. conn = ldap_connect(admin_username, admin_password)
  60. if not conn:
  61. return False
  62. user_dn = f"uid={new_username},{LDAP_BASE_DN}"
  63. ssha_password = create_ssha_password(new_password)
  64. attributes = {
  65. "objectClass": ["top", "person", "organizationalPerson", "inetOrgPerson"],
  66. "sn": new_username,
  67. "cn": new_username,
  68. "uid": new_username,
  69. "userPassword": ssha_password,
  70. "mail": email,
  71. }
  72. if conn.search(user_dn, "(objectClass=*)", search_scope=ldap3.BASE):
  73. print(f"Benutzer {new_username} existiert bereits.")
  74. conn.unbind()
  75. return False
  76. if not conn.add(user_dn, attributes=attributes):
  77. print(f"Fehler beim Erstellen des Benutzers {new_username}: {conn.result}")
  78. conn.unbind()
  79. return False
  80. print(f"Benutzer {new_username} erfolgreich erstellt.")
  81. conn.unbind()
  82. return True
  83. def ldap_change_admin_password(old_password: str, new_password: str):
  84. admin_user = "uid=admin,ou=system"
  85. conn = ldap_connect(admin_user, old_password)
  86. if not conn:
  87. print("Admin-Passwort falsch!")
  88. return False
  89. if new_password == "":
  90. return True
  91. ssha_password = create_ssha_password(new_password)
  92. conn.modify(admin_user, {"userPassword": [(ldap3.MODIFY_REPLACE, [ssha_password])]})
  93. conn.unbind()
  94. print("Passwort-Aenderung erfolgreich!")
  95. return True
  96. def create_ssha_password(password: str) -> str:
  97. salt = os.urandom(4)
  98. sha = hashlib.sha1(password.encode("utf-8"))
  99. sha.update(salt)
  100. return "{SSHA}" + base64.b64encode(sha.digest() + salt).decode("utf-8")
  101. if __name__ == "__main__":
  102. # ldap_backup()
  103. ldap_change_admin_password("test12test", "gc01gapsC$")
  104. # ldap_recovery("uid=admin,ou=system", "gc01gapsC$", "test.ldif")