apache_ldap.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import base64
  2. import hashlib
  3. import os
  4. import ldap3
  5. LDAP_SERVER = "localhost:10389"
  6. LDAP_BASE_DN = "ou=cognos,dc=ibm,dc=com"
  7. def ldap_connect(username: str, password: str) -> ldap3.Connection:
  8. if not username.startswith("uid="):
  9. username = f"uid={username},{LDAP_BASE_DN}"
  10. server = ldap3.Server(LDAP_SERVER, get_info=ldap3.ALL, use_ssl=False)
  11. conn = ldap3.Connection(server, user=username, password=password)
  12. if not conn.bind():
  13. print(conn.result)
  14. return None
  15. return conn
  16. def ldap_backup(username: str, password: str, backup_file: str):
  17. conn = ldap_connect(username, password)
  18. if not conn:
  19. return
  20. conn.search(
  21. LDAP_BASE_DN,
  22. "(objectclass=person)",
  23. "SUBTREE",
  24. attributes=[ldap3.ALL_ATTRIBUTES, ldap3.ALL_OPERATIONAL_ATTRIBUTES],
  25. )
  26. with open(backup_file, "w", encoding="latin-1", newline="") as fwh:
  27. fwh.write(conn.response_to_ldif())
  28. format_string = "{:15} {:25} {:19} {:25} {}"
  29. print(format_string.format("UID", "Name", "erstellt", "E-Mail", "Details"))
  30. for e in conn.entries:
  31. # print(e.entry_to_ldif())
  32. # print(e.entry_to_json())
  33. desc = ""
  34. email = ""
  35. if "description" in e:
  36. desc = e.description
  37. if "mail" in e:
  38. email = e.mail
  39. print(format_string.format(str(e.uid), str(e.givenName), str(e.createTimestamp)[:19], str(email), desc))
  40. conn.unbind()
  41. def ldap_restore(username: str, password: str, backup_file: str):
  42. conn = ldap_connect(username, password)
  43. if not conn:
  44. return
  45. with open(backup_file, "r", encoding="latin-1") as ldif_file:
  46. ldif_data = ldif_file.read()
  47. ldif = ldap3.LDIF(ldif_data)
  48. for entry in ldif.entries:
  49. dn = entry["dn"]
  50. object_class = entry["objectClass"]
  51. attributes = entry["attributes"]
  52. conn.search(dn, "(objectClass=*)", search_scope=ldap3.SUBTREE)
  53. if conn.entries:
  54. print(f"Eintrag {dn} existiert bereits.")
  55. else:
  56. conn.add(dn, object_class, attributes)
  57. conn.unbind()
  58. def ldap_change_admin_password(old_password: str, new_password: str):
  59. admin_user = "uid=admin,ou=system"
  60. conn = ldap_connect(admin_user, old_password)
  61. if not conn:
  62. print("Admin-Passwort falsch!")
  63. return False
  64. if new_password == "":
  65. return True
  66. ssha_password = create_ssha_password(new_password)
  67. conn.modify(admin_user, {"userPassword": [(ldap3.MODIFY_REPLACE, [ssha_password])]})
  68. conn.unbind()
  69. print("Passwort-Aenderung erfolgreich!")
  70. return True
  71. def create_ssha_password(password: str) -> str:
  72. salt = os.urandom(4)
  73. sha = hashlib.sha1(password.encode("utf-8"))
  74. sha.update(salt)
  75. return "{SSHA}" + base64.b64encode(sha.digest() + salt).decode("utf-8")
  76. if __name__ == "__main__":
  77. # ldap_backup()
  78. ldap_change_admin_password("test12test", "gc01gapsC$")
  79. # ldap_recovery("uid=admin,ou=system", "gc01gapsC$", "test.ldif")