123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from credentials import Client
- import requests
- import json
- import datetime
- from pathlib import Path
- awork_api_url = "https://api.awork.com/api/v1"
- token_expires_in = 24 * 60 * 60
- header = {"Authorization": ""}
- def main():
- bearer_token = login()
- header["Authorization"] = f"Bearer {bearer_token}"
- users()
- companies()
- def companies():
- res = requests.get(awork_api_url + "/companies", headers=header)
- json.dump(res.json(), Path(__file__).parent.joinpath("companies.json").open("w"), indent=2)
- def users():
- res = requests.get(awork_api_url + "/users", headers=header)
- json.dump(res.json(), Path(__file__).parent.joinpath("users.json").open("w"), indent=2)
- def login():
- token_file = Path(__file__).parent.joinpath("access_token.txt")
- token_json_file = Path(__file__).parent.joinpath("token.json")
- timestamp = datetime.datetime.now().timestamp()
- token_dict = json.loads(token_json_file.read_text())
- if timestamp > token_json_file.stat().st_mtime + token_dict["expires_in"]:
- res = refresh(token_dict)
- token_json_file.write_text(json.dumps(res))
- return res["access_token"]
- return token_dict["access_token"]
- if token_file.exists():
- code = token_file.read_text()
- valid = token(code)
- if not valid:
- token_file.unlink()
- else:
- token_json_file.write_text(json.dumps(valid))
- return
- authorize()
- def refresh(token_dict):
- header = {
- "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- params = {
- "client_id": Client.CLIENT_ID,
- "client_secret": Client.CLIENT_SECRET,
- "grant_type": "refresh_token",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "refresh_token": token_dict["refresh_token"],
- }
- r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
- if r.status_code == 400:
- return False
- res = r.json()
- if "refresh_token" not in res:
- res["refresh_token"] = token_dict["refresh_token"]
- return res
- def token(code):
- header = {
- "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- params = {
- "client_id": Client.CLIENT_ID,
- "client_secret": Client.CLIENT_SECRET,
- "grant_type": "authorization_code",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "code": code,
- }
- r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
- if r.status_code == 400:
- return False
- return r.json()
- def authorize():
- params = {
- "client_id": Client.CLIENT_ID,
- "response_type": "code",
- "grant_type": "authorization_code",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "state": "",
- "scope": "offline_access",
- }
- r = requests.get(awork_api_url + "/accounts/authorize", params=params)
- print(r.url)
- print("Response-URL: ", end="")
- code = input()
- with open(Path(__file__).parent.joinpath("access_token.txt"), "w") as fwh:
- fwh.write(code)
- if __name__ == "__main__":
- main()
|