123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import datetime
- import json
- from pathlib import Path
- import requests
- import typer
- from credentials import Client
- from model import AworkProjekt, Comment, User
- from sqlalchemy import create_engine, text
- from sqlalchemy.orm import Session
- app = typer.Typer()
- DSN = "mysql+pymysql://gaps:Gcbs12ma@192.168.2.41/tasks"
- awork_api_url = "https://api.awork.com/api/v1"
- token_expires_in = 24 * 60 * 60
- header = {"Authorization": ""}
- token_json_file = Path(__file__).parent.joinpath("token.json")
- access_token_file = Path(__file__).parent.joinpath("access_token.txt")
- def main():
- # bearer_token = login()
- bearer_token = access_token_file.read_text()
- header["Authorization"] = f"Bearer {bearer_token}"
- # users = get_users()
- # companies = get_companies()
- # import_projects()
- # create_task(73849)
- def get_companies():
- res = requests.get(awork_api_url + "/companies", headers=header)
- data = res.json()
- json.dump(data, Path(__file__).parent.joinpath("companies.json").open("w"), indent=2)
- return data
- def get_users():
- res = requests.get(awork_api_url + "/users", headers=header)
- data = res.json()
- json.dump(data, Path(__file__).parent.joinpath("users.json").open("w"), indent=2)
- return data
- @app.command()
- def import_projects():
- projects = get_projects()
- p = convert_projects(projects)
- with Session(create_engine(DSN)) as db_session:
- db_session.execute(text("TRUNCATE TABLE awork_projekte"))
- db_session.add_all(p)
- db_session.commit()
- def get_projects():
- res = requests.get(awork_api_url + "/projects?pageSize=1000", headers=header)
- data = res.json()
- json.dump(data, Path(__file__).parent.joinpath("projects.json").open("w"), indent=2)
- return data
- def convert_projects(projects):
- res = []
- for p in projects:
- if "companyId" not in p:
- continue
- p_allgemein = 1 if "Allgemein" in p["name"] else 0
- p_typ = p["projectType"]["name"] if "projectType" in p else "Unbekannt"
- entry = AworkProjekt(
- **{
- "awork_project_id": p["id"],
- "awork_company_id": p["companyId"],
- "projekt_name": p["name"],
- "projekt_status": p["projectStatus"]["name"],
- "projekt_typ": p_typ,
- "projekt_allgemein": p_allgemein,
- "kunde_name": p["company"]["name"],
- "tasks_count": p["tasksCount"],
- "tracked_duration": p["trackedDuration"],
- }
- )
- res.append(entry)
- return res
- @app.command()
- def create_task(comment_id: int):
- with Session(create_engine(DSN)) as db_session:
- comment = db_session.get(Comment, comment_id)
- if comment.awork_task_id:
- return comment.awork_task_id
- datum = comment.datum.strftime("%d.%m.%Y")
- link = f"http://gc-server1/fehlerbericht/#/report/{comment.kunde}/{comment.datum}/{comment.start}"
- res = requests.get(awork_api_url + "/projects/" + comment.awork_project_id + "/taskstatuses", headers=header)
- data = res.json()
- task_status = [s["id"] for s in data if s["type"] == "progress"]
- if len(task_status) == 0:
- return 0
- task_status_id = task_status[0]
- task = {
- "name": f"{comment.kunde} - Fehlerbericht vom {datum} - {comment.fehler} Fehler",
- "description": (
- f'<p><a target="_blank" rel="noopener noreferrer nofollow" href="{link}">Fehlerbericht-Portal</a></p>'
- + f"<p>{comment.benutzer}: {comment.kommentar}</p>"
- ),
- "isPrio": False,
- # "startOn": "2021-03-03T17:00:00Z",
- "dueOn": f"{comment.datum}T16:00:00Z",
- # "laneOrder": 0,
- "plannedDuration": 3600,
- # "remainingDuration": 0,
- "typeOfWorkId": "1854242b-8d9a-44c6-a98e-f17b89b6bed8",
- "taskStatusId": task_status_id,
- # "order": 0,
- # "subtaskOrder": 1,
- "entityId": comment.awork_project_id,
- "baseType": "projecttask",
- # "parentId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
- # "lists": [{"id": "94166142-b5f9-4bd0-968d-a6a3474ee705", "order": 0}],
- }
- res = requests.post(awork_api_url + "/tasks", json=task, headers=header)
- data = res.json()
- task_id = data["id"]
- res = requests.post(
- awork_api_url + f"/tasks/{task_id}/addtags", json=[{"name": "Fehlerprotokolle"}], headers=header
- )
- user1 = db_session.get(User, comment.benutzer)
- user_comment = {
- "message": comment.kommentar,
- "userId": user1.awork_user_id,
- }
- res = requests.post(awork_api_url + f"/tasks/{task_id}/comments", json=user_comment, headers=header)
- res = requests.post(
- awork_api_url + f"/tasks/{task_id}/setassignees", json=[user1.awork_user_id], headers=header
- )
- user2 = user1
- if comment.benutzer2:
- user2 = db_session.get(User, comment.benutzer2)
- res = requests.post(
- awork_api_url + f"/tasks/{task_id}/setassignees", json=[user2.awork_user_id], headers=header
- )
- comment.awork_task_id = task_id
- db_session.commit()
- def login():
- # token_file = Path(__file__).parent.joinpath("access_token.txt")
- timestamp = datetime.datetime.now().timestamp()
- if not token_json_file.exists():
- return token(authorize())
- token_dict = json.loads(token_json_file.read_text())
- if timestamp < token_dict["expires_at"]:
- return token_dict["access_token"]
- res = refresh(token_dict["refresh_token"])
- if res:
- return res
- token_json_file.unlink()
- return token(authorize())
- def refresh(refresh_token):
- header = {
- "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- params = {
- "client_id": Client.CLIENT_ID,
- "client_secret": Client.CLIENT_SECRET,
- "grant_type": "refresh_token",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "refresh_token": refresh_token,
- }
- r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
- if r.status_code == 400:
- return False
- token_dict = r.json()
- if "refresh_token" not in token_dict:
- token_dict["refresh_token"] = refresh_token
- write_token_file(token_dict)
- return token_dict["access_token"]
- def token(code):
- header = {
- "Authorization": "Basic Base64(" + Client.authorization_base64() + ")",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- params = {
- "client_id": Client.CLIENT_ID,
- "client_secret": Client.CLIENT_SECRET,
- "grant_type": "authorization_code",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "code": code,
- }
- r = requests.post(awork_api_url + "/accounts/token", headers=header, data=params)
- if r.status_code == 400:
- return False
- token_dict = r.json()
- write_token_file(token_dict)
- return token_dict["access_token"]
- def write_token_file(token_dict):
- timestamp = datetime.datetime.now().timestamp()
- token_dict["expires_at"] = token_dict["expires_in"] + timestamp
- token_json_file.write_text(json.dumps(token_dict))
- def authorize():
- params = {
- "client_id": Client.CLIENT_ID,
- "response_type": "code",
- "grant_type": "authorization_code",
- "redirect_uri": "http://gc-server1/fehlerbericht/",
- "state": "",
- "scope": "offline_access",
- }
- r = requests.get(awork_api_url + "/accounts/authorize", params=params)
- print(r.url)
- print("code: ", end="")
- code = input()
- return code
- if __name__ == "__main__":
- main()
- app()
- # create_task(74715)
- # import_projects()
|