import datetime import json from pathlib import Path import requests import typer from credentials import Client from model import AworkProjekt, Comment, User from sqlalchemy import create_engine, text from sqlalchemy.orm import Session app = typer.Typer() DSN = "mysql+pymysql://gaps:Gcbs12ma@192.168.2.41/tasks" awork_api_url = "https://api.awork.com/api/v1" token_expires_in = 24 * 60 * 60 header = {"Authorization": ""} token_json_file = Path(__file__).parent.joinpath("token.json") access_token_file = Path(__file__).parent.joinpath("access_token.txt") def main(): # bearer_token = login() bearer_token = access_token_file.read_text() header["Authorization"] = f"Bearer {bearer_token}" # users = get_users() # companies = get_companies() # import_projects() # create_task(73849) def get_companies(): res = requests.get(awork_api_url + "/companies", headers=header) data = res.json() json.dump(data, Path(__file__).parent.joinpath("companies.json").open("w"), indent=2) return data def get_users(): res = requests.get(awork_api_url + "/users", headers=header) data = res.json() json.dump(data, Path(__file__).parent.joinpath("users.json").open("w"), indent=2) return data @app.command() def import_projects(): projects = get_projects() p = convert_projects(projects) with Session(create_engine(DSN)) as db_session: db_session.execute(text("TRUNCATE TABLE awork_projekte")) db_session.add_all(p) db_session.commit() def get_projects(): res = requests.get(awork_api_url + "/projects?pageSize=1000", headers=header) data = res.json() json.dump(data, Path(__file__).parent.joinpath("projects.json").open("w"), indent=2) return data def convert_projects(projects): res = [] for p in projects: if "companyId" not in p: continue p_allgemein = 1 if "Allgemein" in p["name"] else 0 p_typ = p["projectType"]["name"] if "projectType" in p else "Unbekannt" entry = AworkProjekt( **{ "awork_project_id": p["id"], "awork_company_id": p["companyId"], "projekt_name": p["name"], "projekt_status": p["projectStatus"]["name"], "projekt_typ": p_typ, "projekt_allgemein": p_allgemein, "kunde_name": p["company"]["name"], "tasks_count": p["tasksCount"], "tracked_duration": p["trackedDuration"], } ) res.append(entry) return res @app.command() def create_task(comment_id: int): with Session(create_engine(DSN)) as db_session: comment = db_session.get(Comment, comment_id) if comment.awork_task_id: return comment.awork_task_id datum = comment.datum.strftime("%d.%m.%Y") link = f"http://gc-server1/fehlerbericht/#/report/{comment.kunde}/{comment.datum}/{comment.start}" res = requests.get(awork_api_url + "/projects/" + comment.awork_project_id + "/taskstatuses", headers=header) data = res.json() task_status = [s["id"] for s in data if s["type"] == "progress"] if len(task_status) == 0: return 0 task_status_id = task_status[0] task = { "name": f"{comment.kunde} - Fehlerbericht vom {datum} - {comment.fehler} Fehler", "description": ( f'

Fehlerbericht-Portal

' + f"

{comment.benutzer}: {comment.kommentar}

" ), "isPrio": False, # "startOn": "2021-03-03T17:00:00Z", "dueOn": f"{comment.datum}T16:00:00Z", # "laneOrder": 0, "plannedDuration": 3600, # "remainingDuration": 0, "typeOfWorkId": "1854242b-8d9a-44c6-a98e-f17b89b6bed8", "taskStatusId": task_status_id, # "order": 0, # "subtaskOrder": 1, "entityId": comment.awork_project_id, "baseType": "projecttask", # "parentId": "3fa85f64-5717-4562-b3fc-2c963f66afa6", # "lists": [{"id": "94166142-b5f9-4bd0-968d-a6a3474ee705", "order": 0}], } res = requests.post(awork_api_url + "/tasks", json=task, headers=header) data = res.json() task_id = data["id"] res = requests.post( awork_api_url + f"/tasks/{task_id}/addtags", json=[{"name": "Fehlerprotokolle"}], headers=header ) user1 = db_session.get(User, comment.benutzer) user_comment = { "message": comment.kommentar, "userId": user1.awork_user_id, } res = requests.post(awork_api_url + f"/tasks/{task_id}/comments", json=user_comment, headers=header) res = requests.post( awork_api_url + f"/tasks/{task_id}/setassignees", json=[user1.awork_user_id], headers=header ) user2 = user1 if comment.benutzer2: user2 = db_session.get(User, comment.benutzer2) res = requests.post( awork_api_url + f"/tasks/{task_id}/setassignees", json=[user2.awork_user_id], headers=header ) comment.awork_task_id = task_id db_session.commit() def login(): # token_file = Path(__file__).parent.joinpath("access_token.txt") timestamp = datetime.datetime.now().timestamp() if not token_json_file.exists(): return token(authorize()) token_dict = json.loads(token_json_file.read_text()) if timestamp < token_dict["expires_at"]: return token_dict["access_token"] res = refresh(token_dict["refresh_token"]) if res: return res token_json_file.unlink() return token(authorize()) def refresh(refresh_token): header = { "Authorization": "Basic Base64(" + Client.authorization_base64() + ")", "Content-Type": "application/x-www-form-urlencoded", } params = { "client_id": Client.CLIENT_ID, "client_secret": Client.CLIENT_SECRET, "grant_type": "refresh_token", "redirect_uri": "http://gc-server1/fehlerbericht/", "refresh_token": refresh_token, } r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params) if r.status_code == 400: return False token_dict = r.json() if "refresh_token" not in token_dict: token_dict["refresh_token"] = refresh_token write_token_file(token_dict) return token_dict["access_token"] def token(code): header = { "Authorization": "Basic Base64(" + Client.authorization_base64() + ")", "Content-Type": "application/x-www-form-urlencoded", } params = { "client_id": Client.CLIENT_ID, "client_secret": Client.CLIENT_SECRET, "grant_type": "authorization_code", "redirect_uri": "http://gc-server1/fehlerbericht/", "code": code, } r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params) if r.status_code == 400: return False token_dict = r.json() write_token_file(token_dict) return token_dict["access_token"] def write_token_file(token_dict): timestamp = datetime.datetime.now().timestamp() token_dict["expires_at"] = token_dict["expires_in"] + timestamp token_json_file.write_text(json.dumps(token_dict)) def authorize(): params = { "client_id": Client.CLIENT_ID, "response_type": "code", "grant_type": "authorization_code", "redirect_uri": "http://gc-server1/fehlerbericht/", "state": "", "scope": "offline_access", } r = requests.get(awork_api_url + "/accounts/authorize", params=params) print(r.url) print("code: ", end="") code = input() return code if __name__ == "__main__": main() app() # create_task(74715) # import_projects()