auth.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from dataclasses import dataclass, field, asdict
  2. from ldap3 import Server, Connection, ALL
  3. import json
  4. from pathlib import Path
  5. import csv
  6. from typing import Optional
  7. @dataclass
  8. class User:
  9. lastname: str
  10. firstname: str
  11. username: str
  12. password: str = field(repr=False)
  13. title: str
  14. admin: bool
  15. write: bool
  16. department: list
  17. costcenter: list
  18. class Auth:
  19. def __init__(self) -> None:
  20. with open(Path(__file__).parent.parent.joinpath("config", "users.csv"), "r") as frh:
  21. csv_reader = csv.DictReader(frh, delimiter=";")
  22. self.users = dict([(row["username"].lower(), self.parse_users_csv(row)) for row in csv_reader])
  23. def parse_users_csv(self, row: dict) -> User:
  24. row["admin"] = row["admin"] == "True"
  25. row["write"] = row["write"] == "True"
  26. row["department"] = json.loads(row["department"])
  27. row["costcenter"] = json.loads(row["costcenter"])
  28. return User(**row)
  29. def get_user(self, username, password) -> Optional[dict]:
  30. username = username.lower()
  31. if username not in self.users:
  32. return None
  33. if self.users[username].password != password and not self.connect_ldap(username, password):
  34. return None
  35. res = asdict(self.users[username])
  36. del res["password"]
  37. return res
  38. def connect_ldap(self, username, password) -> bool:
  39. server = Server("ahr.local:389", get_info=ALL, use_ssl=False, connect_timeout=5)
  40. user = username.lower() + "@ahr.local"
  41. conn = Connection(server, user=user, password=password)
  42. try:
  43. return bool(conn.bind())
  44. except Exception:
  45. return False
  46. if __name__ == "__main__":
  47. print(Auth().get_user("TKP", "G9zHjA__"))