123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- from credentials import Client
- import requests
- import json
- import datetime
- from pathlib import Path
- awork_api_url = "https://api.awork.com/api/v1"
- token_expires_in = 24 * 60 * 60
- header = {"Authorization": ""}
- token_json_file = Path(__file__).parent.joinpath("token.json")
- access_token_file = Path(__file__).parent.joinpath("access_token.txt")
- def main():
- # bearer_token = login()
- bearer_token = access_token_file.read_text()
- header["Authorization"] = f"Bearer {bearer_token}"
- users()
- companies()
- def companies():
- res = requests.get(awork_api_url + "/companies", headers=header)
- json.dump(res.json(), Path(__file__).parent.joinpath("companies.json").open("w"), indent=2)
- def users():
- res = requests.get(awork_api_url + "/users", headers=header)
- json.dump(res.json(), Path(__file__).parent.joinpath("users.json").open("w"), indent=2)
- def login():
- # token_file = Path(__file__).parent.joinpath("access_token.txt")
- timestamp = datetime.datetime.now().timestamp()
- if not token_json_file.exists():
- return token(authorize())
- token_dict = json.loads(token_json_file.read_text())
- if timestamp < token_dict["expires_at"]:
- return token_dict["access_token"]
- res = refresh(token_dict["refresh_token"])
- if res:
- return res
- token_json_file.unlink()
- return token(authorize())
- def refresh(refresh_token):
- header = {
- "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- params = {
- "client_id": Client.CLIENT_ID,
- "client_secret": Client.CLIENT_SECRET,
- "grant_type": "refresh_token",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "refresh_token": refresh_token,
- }
- r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
- if r.status_code == 400:
- return False
- token_dict = r.json()
- if "refresh_token" not in token_dict:
- token_dict["refresh_token"] = refresh_token
- write_token_file(token_dict)
- return token_dict["access_token"]
- def token(code):
- header = {
- "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- params = {
- "client_id": Client.CLIENT_ID,
- "client_secret": Client.CLIENT_SECRET,
- "grant_type": "authorization_code",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "code": code,
- }
- r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
- if r.status_code == 400:
- return False
- token_dict = r.json()
- write_token_file(token_dict)
- return token_dict["access_token"]
- def write_token_file(token_dict):
- timestamp = datetime.datetime.now().timestamp()
- token_dict["expires_at"] = token_dict["expires_in"] + timestamp
- token_json_file.write_text(json.dumps(token_dict))
- def authorize():
- params = {
- "client_id": Client.CLIENT_ID,
- "response_type": "code",
- "grant_type": "authorization_code",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "state": "",
- "scope": "offline_access",
- }
- r = requests.get(awork_api_url + "/accounts/authorize", params=params)
- print(r.url)
- print("code: ", end="")
- code = input()
- return code
- if __name__ == "__main__":
- main()
|