12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- from dataclasses import dataclass, field, asdict
- from ldap3 import Server, Connection, ALL
- import json
- from pathlib import Path
- import csv
- from typing import Optional
- @dataclass
- class User:
- lastname: str
- firstname: str
- username: str
- password: str = field(repr=False)
- title: str
- admin: bool
- write: bool
- department: list
- costcenter: list
- class Auth:
- def __init__(self) -> None:
- with open(Path(__file__).parent.parent.joinpath("config", "users.csv"), "r") as frh:
- csv_reader = csv.DictReader(frh, delimiter=";")
- self.users = dict([(row["username"].lower(), self.parse_users_csv(row)) for row in csv_reader])
- def parse_users_csv(self, row: dict) -> User:
- row["admin"] = row["admin"] == "True"
- row["write"] = row["write"] == "True"
- row["department"] = json.loads(row["department"])
- row["costcenter"] = json.loads(row["costcenter"])
- return User(**row)
- def get_user(self, username, password) -> Optional[dict]:
- username = username.lower()
- if username not in self.users:
- return None
- if self.users[username].password != password and not self.connect_ldap(username, password):
- return None
- res = asdict(self.users[username])
- del res["password"]
- return res
- def connect_ldap(self, username, password) -> bool:
- server = Server("ahr.local:389", get_info=ALL, use_ssl=False, connect_timeout=5)
- user = username.lower() + "@ahr.local"
- conn = Connection(server, user=user, password=password)
- try:
- return bool(conn.bind())
- except Exception:
- return False
- if __name__ == "__main__":
- print(Auth().get_user("TKP", "G9zHjA__"))
|