awork_tasks.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from credentials import Client
  2. import requests
  3. import json
  4. import datetime
  5. from pathlib import Path
  6. awork_api_url = "https://api.awork.com/api/v1"
  7. token_expires_in = 24 * 60 * 60
  8. header = {"Authorization": ""}
  9. def main():
  10. bearer_token = login()
  11. header["Authorization"] = f"Bearer {bearer_token}"
  12. users()
  13. companies()
  14. def companies():
  15. res = requests.get(awork_api_url + "/companies", headers=header)
  16. json.dump(res.json(), Path(__file__).parent.joinpath("companies.json").open("w"), indent=2)
  17. def users():
  18. res = requests.get(awork_api_url + "/users", headers=header)
  19. json.dump(res.json(), Path(__file__).parent.joinpath("users.json").open("w"), indent=2)
  20. def login():
  21. token_file = Path(__file__).parent.joinpath("access_token.txt")
  22. token_json_file = Path(__file__).parent.joinpath("token.json")
  23. timestamp = datetime.datetime.now().timestamp()
  24. token_dict = json.loads(token_json_file.read_text())
  25. if timestamp > token_json_file.stat().st_mtime + token_dict["expires_in"]:
  26. res = refresh(token_dict)
  27. token_json_file.write_text(json.dumps(res))
  28. return res["access_token"]
  29. return token_dict["access_token"]
  30. if token_file.exists():
  31. code = token_file.read_text()
  32. valid = token(code)
  33. if not valid:
  34. token_file.unlink()
  35. else:
  36. token_json_file.write_text(json.dumps(valid))
  37. return
  38. authorize()
  39. def refresh(token_dict):
  40. header = {
  41. "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
  42. "Content-Type": "application/x-www-form-urlencoded",
  43. }
  44. params = {
  45. "client_id": Client.CLIENT_ID,
  46. "client_secret": Client.CLIENT_SECRET,
  47. "grant_type": "refresh_token",
  48. "redirect_uri": "http://gc-server1/fehlerbericht/",
  49. "refresh_token": token_dict["refresh_token"],
  50. }
  51. r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
  52. if r.status_code == 400:
  53. return False
  54. res = r.json()
  55. if "refresh_token" not in res:
  56. res["refresh_token"] = token_dict["refresh_token"]
  57. return res
  58. def token(code):
  59. header = {
  60. "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
  61. "Content-Type": "application/x-www-form-urlencoded",
  62. }
  63. params = {
  64. "client_id": Client.CLIENT_ID,
  65. "client_secret": Client.CLIENT_SECRET,
  66. "grant_type": "authorization_code",
  67. "redirect_uri": "http://gc-server1/fehlerbericht/",
  68. "code": code,
  69. }
  70. r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
  71. if r.status_code == 400:
  72. return False
  73. return r.json()
  74. def authorize():
  75. params = {
  76. "client_id": Client.CLIENT_ID,
  77. "response_type": "code",
  78. "grant_type": "authorization_code",
  79. "redirect_uri": "http://gc-server1/fehlerbericht/",
  80. "state": "",
  81. "scope": "offline_access",
  82. }
  83. r = requests.get(awork_api_url + "/accounts/authorize", params=params)
  84. print(r.url)
  85. print("Response-URL: ", end="")
  86. code = input()
  87. with open(Path(__file__).parent.joinpath("access_token.txt"), "w") as fwh:
  88. fwh.write(code)
  89. if __name__ == "__main__":
  90. main()