awork_tasks.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from credentials import Client
  2. import requests
  3. import json
  4. import datetime
  5. from pathlib import Path
  6. awork_api_url = "https://api.awork.com/api/v1"
  7. token_expires_in = 24 * 60 * 60
  8. header = {"Authorization": ""}
  9. token_json_file = Path(__file__).parent.joinpath("token.json")
  10. access_token_file = Path(__file__).parent.joinpath("access_token.txt")
  11. def main():
  12. # bearer_token = login()
  13. bearer_token = access_token_file.read_text()
  14. header["Authorization"] = f"Bearer {bearer_token}"
  15. users()
  16. companies()
  17. def companies():
  18. res = requests.get(awork_api_url + "/companies", headers=header)
  19. json.dump(res.json(), Path(__file__).parent.joinpath("companies.json").open("w"), indent=2)
  20. def users():
  21. res = requests.get(awork_api_url + "/users", headers=header)
  22. json.dump(res.json(), Path(__file__).parent.joinpath("users.json").open("w"), indent=2)
  23. def login():
  24. # token_file = Path(__file__).parent.joinpath("access_token.txt")
  25. timestamp = datetime.datetime.now().timestamp()
  26. if not token_json_file.exists():
  27. return token(authorize())
  28. token_dict = json.loads(token_json_file.read_text())
  29. if timestamp < token_dict["expires_at"]:
  30. return token_dict["access_token"]
  31. res = refresh(token_dict["refresh_token"])
  32. if res:
  33. return res
  34. token_json_file.unlink()
  35. return token(authorize())
  36. def refresh(refresh_token):
  37. header = {
  38. "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
  39. "Content-Type": "application/x-www-form-urlencoded",
  40. }
  41. params = {
  42. "client_id": Client.CLIENT_ID,
  43. "client_secret": Client.CLIENT_SECRET,
  44. "grant_type": "refresh_token",
  45. "redirect_uri": "http://gc-server1/fehlerbericht/",
  46. "refresh_token": refresh_token,
  47. }
  48. r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
  49. if r.status_code == 400:
  50. return False
  51. token_dict = r.json()
  52. if "refresh_token" not in token_dict:
  53. token_dict["refresh_token"] = refresh_token
  54. write_token_file(token_dict)
  55. return token_dict["access_token"]
  56. def token(code):
  57. header = {
  58. "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
  59. "Content-Type": "application/x-www-form-urlencoded",
  60. }
  61. params = {
  62. "client_id": Client.CLIENT_ID,
  63. "client_secret": Client.CLIENT_SECRET,
  64. "grant_type": "authorization_code",
  65. "redirect_uri": "http://gc-server1/fehlerbericht/",
  66. "code": code,
  67. }
  68. r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
  69. if r.status_code == 400:
  70. return False
  71. token_dict = r.json()
  72. write_token_file(token_dict)
  73. return token_dict["access_token"]
  74. def write_token_file(token_dict):
  75. timestamp = datetime.datetime.now().timestamp()
  76. token_dict["expires_at"] = token_dict["expires_in"] + timestamp
  77. token_json_file.write_text(json.dumps(token_dict))
  78. def authorize():
  79. params = {
  80. "client_id": Client.CLIENT_ID,
  81. "response_type": "code",
  82. "grant_type": "authorization_code",
  83. "redirect_uri": "http://gc-server1/fehlerbericht/",
  84. "state": "",
  85. "scope": "offline_access",
  86. }
  87. r = requests.get(awork_api_url + "/accounts/authorize", params=params)
  88. print(r.url)
  89. print("code: ", end="")
  90. code = input()
  91. return code
  92. if __name__ == "__main__":
  93. main()