|
@@ -1,42 +1,99 @@
|
|
|
-from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, Connection, Server
|
|
|
-
|
|
|
-# from ldap3.core.exceptions import LDAPCursorError
|
|
|
-
|
|
|
-
|
|
|
-def connect_ldap3():
|
|
|
- server = Server("localhost:10389", get_info=ALL, use_ssl=False)
|
|
|
- # conn = Connection(server, 'uid=admin,ou=system', 'gc01gapsC$', auto_bind='TLS_AFTER_BIND') # 'uid=admin,ou=system'
|
|
|
- # status = conn.search('ou=cognos,dc=ibm,dc=com', '(objectclass=person)', 'SUBTREE')
|
|
|
- # print(conn.entries)
|
|
|
- conn = Connection(server, user="uid=Global1,ou=cognos,dc=ibm,dc=com", password="Cognos#11")
|
|
|
- if conn.bind():
|
|
|
- conn.search(
|
|
|
- "ou=cognos,dc=ibm,dc=com",
|
|
|
- "(objectclass=person)",
|
|
|
- "SUBTREE",
|
|
|
- attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES],
|
|
|
- )
|
|
|
- format_string = "{:15} {:25} {:19} {:25} {}"
|
|
|
- for e in conn.entries:
|
|
|
- desc = ""
|
|
|
- email = ""
|
|
|
- if "description" in e:
|
|
|
- desc = e.description
|
|
|
- if "mail" in e:
|
|
|
- email = e.mail
|
|
|
- # print(e.entry_to_json())
|
|
|
- print(format_string.format(str(e.uid), str(e.givenName), str(e.createTimestamp)[:19], str(email), desc))
|
|
|
- else:
|
|
|
+import base64
|
|
|
+import hashlib
|
|
|
+import os
|
|
|
+
|
|
|
+import ldap3
|
|
|
+
|
|
|
+LDAP_SERVER = "localhost:10389"
|
|
|
+LDAP_BASE_DN = "ou=cognos,dc=ibm,dc=com"
|
|
|
+
|
|
|
+
|
|
|
+def ldap_connect(username: str, password: str) -> ldap3.Connection:
|
|
|
+ if not username.startswith("uid="):
|
|
|
+ username = f"uid={username},{LDAP_BASE_DN}"
|
|
|
+ server = ldap3.Server(LDAP_SERVER, get_info=ldap3.ALL, use_ssl=False)
|
|
|
+ conn = ldap3.Connection(server, user=username, password=password)
|
|
|
+ if not conn.bind():
|
|
|
print(conn.result)
|
|
|
+ return None
|
|
|
+ return conn
|
|
|
+
|
|
|
+
|
|
|
+def ldap_backup(username: str, password: str, backup_file: str):
|
|
|
+ conn = ldap_connect(username, password)
|
|
|
+ if not conn:
|
|
|
+ return
|
|
|
+
|
|
|
+ conn.search(
|
|
|
+ LDAP_BASE_DN,
|
|
|
+ "(objectclass=person)",
|
|
|
+ "SUBTREE",
|
|
|
+ attributes=[ldap3.ALL_ATTRIBUTES, ldap3.ALL_OPERATIONAL_ATTRIBUTES],
|
|
|
+ )
|
|
|
+ with open(backup_file, "w", encoding="latin-1", newline="") as fwh:
|
|
|
+ fwh.write(conn.response_to_ldif())
|
|
|
+
|
|
|
+ format_string = "{:15} {:25} {:19} {:25} {}"
|
|
|
+ print(format_string.format("UID", "Name", "erstellt", "E-Mail", "Details"))
|
|
|
+ for e in conn.entries:
|
|
|
+ # print(e.entry_to_ldif())
|
|
|
+ # print(e.entry_to_json())
|
|
|
+ desc = ""
|
|
|
+ email = ""
|
|
|
+ if "description" in e:
|
|
|
+ desc = e.description
|
|
|
+ if "mail" in e:
|
|
|
+ email = e.mail
|
|
|
+ print(format_string.format(str(e.uid), str(e.givenName), str(e.createTimestamp)[:19], str(email), desc))
|
|
|
+ conn.unbind()
|
|
|
+
|
|
|
+
|
|
|
+def ldap_restore(username: str, password: str, backup_file: str):
|
|
|
+ conn = ldap_connect(username, password)
|
|
|
+ if not conn:
|
|
|
+ return
|
|
|
+
|
|
|
+ with open(backup_file, "r", encoding="latin-1") as ldif_file:
|
|
|
+ ldif_data = ldif_file.read()
|
|
|
+
|
|
|
+ ldif = ldap3.LDIF(ldif_data)
|
|
|
+ for entry in ldif.entries:
|
|
|
+ dn = entry["dn"]
|
|
|
+ object_class = entry["objectClass"]
|
|
|
+ attributes = entry["attributes"]
|
|
|
+
|
|
|
+ conn.search(dn, "(objectClass=*)", search_scope=ldap3.SUBTREE)
|
|
|
+ if conn.entries:
|
|
|
+ print(f"Eintrag {dn} existiert bereits.")
|
|
|
+ else:
|
|
|
+ conn.add(dn, object_class, attributes)
|
|
|
+
|
|
|
+ conn.unbind()
|
|
|
+
|
|
|
+
|
|
|
+def ldap_change_admin_password(old_password: str, new_password: str):
|
|
|
+ admin_user = "uid=admin,ou=system"
|
|
|
+ conn = ldap_connect(admin_user, old_password)
|
|
|
+ if not conn:
|
|
|
+ print("Admin-Passwort falsch!")
|
|
|
+ return False
|
|
|
+ if new_password == "":
|
|
|
+ return True
|
|
|
+ ssha_password = create_ssha_password(new_password)
|
|
|
+ conn.modify(admin_user, {"userPassword": [(ldap3.MODIFY_REPLACE, [ssha_password])]})
|
|
|
+ conn.unbind()
|
|
|
+ print("Passwort-Aenderung erfolgreich!")
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def create_ssha_password(password: str) -> str:
|
|
|
+ salt = os.urandom(4)
|
|
|
+ sha = hashlib.sha1(password.encode("utf-8"))
|
|
|
+ sha.update(salt)
|
|
|
+ return "{SSHA}" + base64.b64encode(sha.digest() + salt).decode("utf-8")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
- # connect_pyldap()
|
|
|
- connect_ldap3()
|
|
|
-
|
|
|
-# from ldap3 import Server, Connection, AUTH_SIMPLE, STRATEGY_SYNC, ALL
|
|
|
-# s = Server(HOST, port=389, get_info=ALL)
|
|
|
-# c = Connection(s, authentication=AUTH_SIMPLE, user=user_dn, password=PASSWORD, check_names=True,
|
|
|
-# lazy=False, client_strategy=STRATEGY_SYNC, raise_exceptions=True)
|
|
|
-# c.open()
|
|
|
-# c.bind()
|
|
|
+ # ldap_backup()
|
|
|
+ ldap_change_admin_password("test12test", "gc01gapsC$")
|
|
|
+ # ldap_recovery("uid=admin,ou=system", "gc01gapsC$", "test.ldif")
|