|
- import base64
- import csv
- import json
- import logging
- import os
- import re
- import jinja2
- import requests
- from bs4 import BeautifulSoup
- from requests_toolbelt.multipart import decoder
- import config
- from cognos11.xml_prettify import prettify_xml
- class c11_api:
- webservice = ""
- templates_dir = ""
- # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
- headers = {}
- caf = ""
- cam = ""
- reports = []
- folders = []
- jobs = []
- server_version = "202004"
- def __init__(self, cfg: config.Config):
- self.cfg = cfg
- self.webservice = cfg.cognos11.webservice
- self.templates_dir = cfg.tasks_dir + "/scripts/templates"
- self._env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(self.templates_dir),
- autoescape=jinja2.select_autoescape(["html", "xml"]),
- )
- self.templates = {
- "get_report": self._env.get_template("get_report.xml"),
- "create_report": self._env.get_template("create_report.xml"),
- "update_report": self._env.get_template("update_report.xml"),
- "get_package": self._env.get_template("get_package.xml"),
- }
- @staticmethod
- def generate_token(message_base64):
- version = "V1".encode("utf-8")
- header_len = 4
- msg = base64.b64decode(message_base64)[1:]
- chunks = []
- while len(msg) >= header_len:
- chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
- msg = msg[header_len:]
- chunks.append(msg[:chunk_len])
- msg = msg[chunk_len:]
- return base64.b64encode(version + chunks[-1]).decode("utf-8")
- def login(self):
- cred = self.cfg.cognos11.credentials
- credentials = {
- "parameters": [
- {"name": "h_CAM_action", "value": "logonAs"},
- {"name": "CAMNamespace", "value": cred.namespace},
- {"name": "CAMUsername", "value": cred.username},
- {"name": "CAMPassword", "value": cred.password},
- ]
- }
- self.session = requests.Session()
- r = self.session.get(self.webservice)
- self.headers = {
- "Content-Type": "application/json; charset=UTF-8",
- "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"),
- }
- r = self.session.post(
- self.webservice + "v1/login",
- data=json.dumps(credentials),
- headers=self.headers,
- )
- if r.ok:
- self.caf = r.json()["cafContextId"]
- self.cam = self.generate_token(r.cookies["usersessionid"])
- else:
- print("!! Error: Not connected to cognos server !!")
- print(f"Response: {r.status_code} {r.reason}")
- exit(1)
- self.server_version = self.get_server_version()
- return self
- def get_server_version(self):
- r = self.session.get(self.webservice + "pat/rsstartupblock_3.js")
- res = re.search(r"\=\"(\d{6})\";", r.text)
- if res:
- return res.group(1)
- return "202004"
- def get_folders(self):
- if len(self.folders) == 0:
- self.folders.append({"id": "_dot_public_folders", "name": "Team Content"})
- self.load_folder_list()
- self.save_config()
- return self.folders
- def save_config(self):
- os.makedirs(self.cfg.cognos11.config_dir, exist_ok=True)
- with open(self.cfg.cognos11.folders_file, "w") as fwh:
- json.dump(self.folders, fwh, indent=2)
- with open(self.cfg.cognos11.reports_file, "w") as fwh:
- json.dump(self.reports, fwh, indent=2)
- with open(self.cfg.cognos11.jobs_file, "w") as fwh:
- json.dump(self.jobs, fwh, indent=2)
- def load_config_from_files(self):
- with open(self.cfg.cognos11.folders_file, "r") as frh:
- self.folders = json.load(frh)
- with open(self.cfg.cognos11.reports_file, "r") as frh:
- self.reports = json.load(frh)
- with open(self.cfg.cognos11.jobs_file, "w") as frh:
- self.jobs = json.load(frh)
- def load_folder_list(self, folder_id="_dot_public_folders", prefix="Team Content"):
- fields = ",".join(
- [
- "id",
- "defaultName",
- "defaultDescription",
- "type",
- "modificationTime",
- "target.id",
- "target.searchPath",
- "target.defaultName",
- "base.id",
- "base.searchPath",
- "base.defaultName",
- "parameters",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/objects/{folder_id}/items?fields={fields}",
- headers=self.headers,
- )
- folder_list = sorted(res.json()["data"], key=lambda x: x["defaultName"])
- for f in folder_list:
- if f["type"] == "folder":
- folder = {
- "id": f["id"],
- "name": prefix + "/" + f["defaultName"].replace("/", "-"),
- }
- self.folders.append(folder)
- self.load_folder_list(folder["id"], folder["name"])
- elif f["type"] in ("report", "reportView", "shortcut"):
- report = {
- "id": f["id"],
- "name": f["defaultName"].replace("/", "-"),
- "description": f["defaultDescription"],
- "modified": f["modificationTime"],
- "path": prefix,
- "type": f["type"],
- }
- if f["type"] == "shortcut":
- report["target_id"] = f["target"][0]["id"]
- if f["type"] == "reportView":
- report["target_id"] = "" if f["base"] is None else f["base"][0]["id"]
- report["parameters"] = {}
- if f["parameters"] is not None:
- params = [p for p in f["parameters"] if len(p["value"]) > 0]
- for p in params:
- report["parameters"][p["name"]] = dict([(v["use"], v["display"]) for v in p["value"]])
- self.reports.append(report)
- elif f["type"] == "jobDefinition":
- job = {"id": f["id"], "name": f["defaultName"], "path": prefix}
- job["details"] = self.get_job_details(job["id"])
- self.jobs.append(job)
- def get_report_details(self, object_id):
- fields = ",".join(
- [
- "defaultDescription",
- "options",
- "executionPrompt",
- "parameters",
- "module.defaultName",
- "module.ancestors",
- "allowNotification",
- "base.id",
- "base.searchPath",
- "base.defaultName",
- "base.defaultDescription",
- "base.ancestors",
- "base.metadataModelPackage",
- "base.module",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/objects/{object_id}?fields={fields}",
- headers=self.headers,
- )
- res = res.json()["data"][0]
- res.pop("_meta", None)
- res.pop("id", None)
- res.pop("type", None)
- res.pop("defaultName", None)
- return res
- def get_job_details(self, object_id):
- fields = ",".join(
- [
- "userInterfaces",
- "disabled",
- "runInAdvancedViewer",
- "modificationTime",
- "canBurst",
- "defaultPortalAction",
- "base.defaultName",
- "tags",
- "target.searchPath",
- "target.disabled",
- "options",
- "base.options",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/objects/{object_id}?fields={fields}",
- headers=self.headers,
- )
- job = res.json()["data"][0]
- job.pop("_meta", None)
- job.pop("id", None)
- job.pop("type", None)
- job.pop("defaultName", None)
- fields2 = ",".join(
- [
- r"id,displaySequence,stepObject{defaultName}",
- r"stepObject{id},stepObject{parameters}",
- r"stepObject{canBurst},options,parameters",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/objects/{object_id}/items?types=jobStepDefinition&fields={fields2}",
- headers=self.headers,
- )
- steps = res.json()["data"]
- for s in steps:
- s.pop("_meta", None)
- if s["stepObject"] is not None:
- s["report_id"] = s["stepObject"][0]["id"]
- s.pop("stepObject", None)
- job["steps"] = steps
- return job
- def get_report(self, report_id):
- self.get_folders()
- report = [r for r in self.reports if r["id"] == report_id]
- if len(report) == 0:
- return None
- report = report[0]
- if "meta" not in report:
- report = self.get_report_specs(report)
- return report
- def get_reports_in_folder(self, folder, recursive=False, specs=False):
- self.get_folders()
- if recursive:
- res = [r for r in self.reports if r["path"].startswith(folder)]
- else:
- res = [r for r in self.reports if r["path"] == folder]
- if specs:
- return [self.get_report_specs(r) for r in res]
- return res
- def get_report_specs(self, report):
- report = self.get_report_filename(report)
- headers = {
- "Content-Type": "text/xml; charset=UTF-8",
- "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"],
- "X-RsCMStoreID": report["id"],
- "X-UseRsConsumerMode": "true",
- "SOAPAction": f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/",
- }
- soap = self.templates["get_report"].render(
- {
- "caf": self.caf,
- "cam": self.cam,
- "report": report,
- "format": "XHTML",
- "prompt": "true",
- "tracking": "",
- "params": {},
- }
- )
- r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
- if r.status_code == 500:
- bs = BeautifulSoup(r.text, "xml")
- report["error"] = bs.find_all("messageString")[0].string
- logging.error(f"{report['path']}/{report['name']}")
- logging.error(report["error"])
- return report
- parts = decoder.MultipartDecoder.from_response(r).parts
- meta = {"required": {}, "optional": {}}
- bs = BeautifulSoup(parts[0].content, "xml")
- result = bs.find("bus:result")
- details = result.find("bus:primaryRequest")
- params = details.find("bus:parameters")
- for item in params.find_all("item"):
- if item["xsi:type"] != "bus:parameterValue":
- continue
- k = item.find("bus:name").text
- # v = item.find("bus:value").find_all("item")
- v = {}
- for opt in item.find("bus:value").find_all("item"):
- if opt.find("bus:display") is not None:
- v[opt.find("bus:use").text] = opt.find("bus:display").text
- else:
- v[opt.find("bus:use").text] = opt.find("bus:use").text
- if len(v.items()) > 0:
- meta["required"][k] = v
- bs = BeautifulSoup(parts[1].content, "lxml")
- for sv in bs.find_all("selectvalue"):
- k = sv["parameter"]
- v = dict([(opt["usevalue"], opt.get("displayvalue", "")) for opt in sv.find_all("selectoption")])
- meta["optional"][k] = v
- for sv in bs.find_all("selectdate"):
- k = sv["parameter"]
- v = dict([(opt["usevalue"], opt.get("displayvalue", "")) for opt in sv.find_all("selectoption")])
- meta["optional"][k] = v
- filename = self.cfg.cognos11.config_dir + f"/params/{report['path']}/{report['name']}.json"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(meta, open(filename, "w"), indent=2)
- report["cube"] = self.get_cube_name(meta)
- report["meta"] = meta
- report["spec"] = parts[2].text
- return report
- @staticmethod
- def get_cube_name(meta):
- for param in meta["optional"].values():
- for key in param.keys():
- if key.startswith("["):
- res = re.search(r"^\[([^\]]*)\]", key)
- return res.group(1)
- def get_report_filename(self, report):
- path = report["path"].replace("Team Content/ReportOutput", "")
- report["filename"] = f"{self.cfg.cognos11.reportoutput_dir}/{path}/{report['name']}.pdf"
- report["format"] = "PDF"
- if report["name"][-5:].lower() == ".xlsx":
- report["format"] = "spreadsheetML"
- report["filename"] = report["filename"][:-4]
- report["params"] = list(re.findall(r"\[([^\]]+)\]", report["filename"]))
- for i, p in enumerate(report["params"]):
- report["filename"] = report["filename"].replace("[" + p + "]", "{" + str(i) + "}")
- return report
- def request_unstubbed(self, report_id):
- report = self.get_report(report_id)
- if "spec" not in report:
- return ""
- payload = json.dumps({"reportspec_stubbed": report["spec"], "storeid": report["id"]})
- headers = {
- "Content-Type": "application/json; charset=UTF-8",
- "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"),
- "authenticityToken": self.cam,
- "X-UseRsConsumerMode": "true",
- "cafContextId": self.caf,
- }
- r = self.session.post(self.webservice + "v1/reports/unstubreport", data=payload, headers=headers)
- if r.status_code != 200:
- logging.error(f"{report['path']}/{report['name']}")
- logging.error(r.text)
- return
- unstubbed = json.loads(r.text)["reportspec_full"]
- unstubbed = re.sub(r' iid="[^"]*"', "", unstubbed)
- bs = BeautifulSoup(unstubbed, "xml")
- for xa in bs.find_all("XMLAttributes"):
- if (
- xa.find_all("XMLAttribute", {"name": "RS_dataType"})
- or xa.find_all("XMLAttribute", {"name": "RS_CreateExtendedDataItems"})
- or xa.find_all("XMLAttribute", {"name": "RS_legacyDrillDown"})
- ):
- continue
- if xa.find_all("XMLAttribute", {"name": "supportsDefaultDataFormatting"}):
- for xa2 in xa.find_all("XMLAttribute"):
- if xa2.attrs["name"] != "supportsDefaultDataFormatting":
- xa2.decompose()
- continue
- xa.decompose()
- for cti in bs.find_all("crosstabIntersection"):
- if len(list(cti.children)) == 0:
- cti.decompose()
- unstubbed_report = str(bs)
- unstubbed_report = prettify_xml(unstubbed_report).replace("'", """)
- return unstubbed_report
- def get_report_headers(self, report_id=None):
- res = {
- "Content-Type": "text/xml; charset=UTF-8",
- "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"],
- "X-UseRsConsumerMode": "true",
- "SOAPAction": f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/",
- }
- if report_id is not None:
- res["X-RsCMStoreID"] = report_id
- return res
- def request_file(self, report_id, params, format="PDF"):
- report = self.get_report(report_id)
- headers = self.get_report_headers(report_id)
- soap = (
- self.templates["get_report"]
- .render(
- {
- "caf": self.caf,
- "cam": self.cam,
- "report": report,
- "format": format,
- "prompt": "false",
- "tracking": "",
- "params": params,
- }
- )
- .encode("utf-8")
- )
- r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
- bs = BeautifulSoup(r.text, "xml")
- if r.status_code == 200:
- try:
- parts = decoder.MultipartDecoder.from_response(r).parts
- except decoder.NonMultipartContentTypeException:
- return 500, "Timeout"
- return 200, parts[1].content
- error = bs.find_all("messageString")[0].string
- logging.debug(error)
- return r.status_code, error
- def get_users(self, export_dir):
- self.get_licenses(export_dir)
- ns = self.get_namespaces()
- for item in ns:
- temp_items = self.get_namespace_items(item["id"])
- item["items"] = self.get_sub_items(temp_items)
- with open(export_dir + "/namespaces.json", "w") as fwh:
- json.dump(ns, fwh, indent=2)
- return ns
- def get_sub_items(self, items):
- for sub_item in items:
- if sub_item["type"] == "namespaceFolder":
- temp_items = self.get_namespace_folder_items(sub_item["id"])
- sub_item["items"] = self.get_sub_items(temp_items)
- if sub_item["type"] == "role":
- sub_item["members"] = self.get_role_members(sub_item["id"])
- if sub_item["type"] == "group":
- sub_item["members"] = self.get_group_members(sub_item["id"])
- return items
- def show_users(self, ns, indent=0):
- indent_str = " " * indent
- style = {
- "account": "@",
- "namespace": "#",
- "role": "~",
- "group": "+",
- "namespaceFolder": "*",
- }
- for item in ns:
- if item["type"] == "account":
- print(indent_str + f"@ {item['defaultName']} (uid={item.get('userName', 'xx')})")
- else:
- print(indent_str + f"{style.get(item['type'], '*')} {item['defaultName']} ({item['type']})")
- if "members" in item:
- for user in item["members"].get("users", []):
- print(indent_str + f" -> @ {user['defaultName']} (uid={user['userName']})")
- for group in item["members"].get("groups", []):
- print(indent_str + f" -> + {group['defaultName']} ({group['type']})")
- for role in item["members"].get("roles", []):
- print(indent_str + f" -> ~ {role['defaultName']} ({role['type']})")
- if "items" in item:
- self.show_users(item["items"], indent + 2)
- def export_users(self, ns, export_dir: str):
- data = self.export_users_recursive(ns, [])
- if len(data) > 0:
- with open(export_dir + "\\cognos_users.csv", "w", encoding="latin-1", newline="") as fwh:
- csv_fwh = csv.DictWriter(fwh, fieldnames=data[0].keys(), delimiter=";")
- csv_fwh.writeheader()
- csv_fwh.writerows(data)
- def export_users_recursive(self, tree, prev_folder_list):
- res = []
- for item in tree:
- folder_list = prev_folder_list + [item["defaultName"]]
- if "items" in item:
- res.extend(self.export_users_recursive(item["items"], folder_list))
- elif "members" in item and isinstance(item["members"], dict):
- for e in ["users", "groups", "roles"]:
- res.extend(self.export_users_recursive(item["members"].get(e, []), folder_list))
- else:
- res.append(
- {
- "Name": item["defaultName"],
- "Typ": item["type"],
- "Pfad": "/".join(prev_folder_list),
- "ID": item["id"],
- "Suchpfad": item["searchPath"].replace('"', "'"),
- "UID": item.get("userName", ""),
- "Email": item.get("email", ""),
- "Erstellung": item.get("creationTime", ""),
- }
- )
- return res
- def get_licenses(self, export_dir):
- res = self.session.get(
- f"{self.webservice}v1/licenses/details",
- headers=self.headers,
- )
- data = res.text.replace(",", ";")
- with open(export_dir + "/licenses.csv", "w", encoding="latin-1", newline="") as fwh:
- fwh.write(data)
- return data
- def get_namespaces(self):
- fields = ",".join(
- [
- "id",
- "defaultName",
- "searchPath",
- "objectClass",
- "creationTime",
- "modificationTime",
- "type",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/namespaces?fields={fields}",
- headers=self.headers,
- )
- return res.json()["data"]
- def get_namespace_items(self, id):
- fields = ",".join(
- [
- "id",
- "defaultName",
- "searchPath",
- "objectClass",
- "creationTime",
- "modificationTime",
- "type",
- "email",
- "givenName",
- "surname",
- "userName",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/namespaces/{id}/items?fields={fields}",
- headers=self.headers,
- )
- return res.json()["data"]
- def get_namespace_folder_items(self, id):
- fields = ",".join(
- [
- "id",
- "defaultName",
- "searchPath",
- "objectClass",
- "creationTime",
- "modificationTime",
- "type",
- ]
- )
- res = self.session.get(
- f"{self.webservice}v1/folders/{id}/items?fields={fields}",
- headers=self.headers,
- )
- return res.json()["data"]
- def get_role_members(self, id):
- res = self.session.get(
- f"{self.webservice}v1/roles/{id}/members",
- headers=self.headers,
- )
- return res.json()
- def get_group_members(self, id):
- res = self.session.get(
- f"{self.webservice}v1/groups/{id}/members",
- headers=self.headers,
- )
- return res.json()
- def create_report(self, folder_id, fullpath):
- # self.session.get(self.webservice + 'v1/reports/templates?path=%2Fcontent%2Ffolder%5B%40name%3D%27Templates%27%5D/
- # *[@objectClass=%27interactiveReport%27%20or%20@objectClass=%27report%27%20or%20@objectClass=%27reportTemplate%27]&maxResults=100&locale=de')
- # self.session.get(self.webservice + 'v1/reports/startupconfig?keys=supportedContentLocales,supportedCurrencies,
- # supportedFonts,metadataInformationURI,glossaryURI&locale=de')
- headers = self.get_report_headers()
- soap = self.templates["get_package"].render({"caf": self.caf, "cam": self.cam}).encode("utf-8")
- r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
- open("package.xml", "wb").write(r.content)
- search_path = self.request_search_path(folder_id)
- report_name = os.path.basename(fullpath)
- unstubbed = open(fullpath, "rb").read()
- headers["SOAPAction"] = f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/.session"
- headers["Referer"] = "http://localhost:9300/bi/pat/rsapp.htm"
- # headers['caf'] = self.caf
- soap = (
- self.templates["create_report"]
- .render(
- {
- "caf": self.caf,
- "cam": self.cam,
- "search_path": search_path,
- "report_name": report_name,
- "unstubbed": unstubbed,
- }
- )
- .encode("utf-8")
- )
- r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
- open("request_create.xml", "wb").write(r.request.body)
- print(r.status_code)
- print(r.text)
- def update_report(self, report, fullpath):
- search_path = self.request_search_path(report["id"])
- unstubbed = open(fullpath, "r").read()
- headers = self.get_report_headers(report["id"])
- # headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm'
- headers["caf"] = self.caf
- soap = self.templates["update_report"].render(
- {
- "caf": self.caf,
- "cam": self.cam,
- "search_path": search_path,
- "unstubbed": unstubbed,
- }
- ) # .encode("utf-8")
- r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
- # open('request_update.xml', 'wb').write(r.request.body)
- print(r.status_code)
- print(r.text)
- def create_folder(self, parent_id, folder_name):
- data = json.dumps({"defaultName": folder_name, "type": "folder"})
- res = self.session.post(
- f"{self.webservice}v1/objects/{parent_id}/items",
- headers=self.headers,
- data=data,
- )
- if res.status_code == 201:
- loc = res.headers.get("Location")
- folder_id = loc.split("/")[-1]
- self.folders.append({"id": folder_id, "name": folder_name})
- return folder_id
- def request_search_path(self, id):
- res = self.session.get(f"{self.webservice}v1/objects/{id}?fields=searchPath", headers=self.headers)
- return res.json()["data"][0]["searchPath"]
|