import base64 import csv import json import logging import os import re import jinja2 import requests from bs4 import BeautifulSoup from requests_toolbelt.multipart import decoder import config from cognos11.xml_prettify import prettify_xml class c11_api: webservice = "" templates_dir = "" # templates_dir = "C:/GlobalCube/Tasks/gctools/templates" headers = {} caf = "" cam = "" reports = [] folders = [] jobs = [] server_version = "202004" def __init__(self, cfg: config.Config): self.cfg = cfg self.webservice = cfg.cognos11.webservice self.templates_dir = cfg.tasks_dir + "/scripts/templates" self._env = jinja2.Environment( loader=jinja2.FileSystemLoader(self.templates_dir), autoescape=jinja2.select_autoescape(["html", "xml"]), ) self.templates = { "get_report": self._env.get_template("get_report.xml"), "create_report": self._env.get_template("create_report.xml"), "update_report": self._env.get_template("update_report.xml"), "get_package": self._env.get_template("get_package.xml"), } @staticmethod def generate_token(message_base64): version = "V1".encode("utf-8") header_len = 4 msg = base64.b64decode(message_base64)[1:] chunks = [] while len(msg) >= header_len: chunk_len = int.from_bytes(msg[:header_len], byteorder="little") msg = msg[header_len:] chunks.append(msg[:chunk_len]) msg = msg[chunk_len:] return base64.b64encode(version + chunks[-1]).decode("utf-8") def login(self): cred = self.cfg.cognos11.credentials credentials = { "parameters": [ {"name": "h_CAM_action", "value": "logonAs"}, {"name": "CAMNamespace", "value": cred.namespace}, {"name": "CAMUsername", "value": cred.username}, {"name": "CAMPassword", "value": cred.password}, ] } self.session = requests.Session() r = self.session.get(self.webservice) self.headers = { "Content-Type": "application/json; charset=UTF-8", "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"), } r = self.session.post( self.webservice + "v1/login", data=json.dumps(credentials), headers=self.headers, ) if r.ok: self.caf = r.json()["cafContextId"] self.cam = self.generate_token(r.cookies["usersessionid"]) else: print("!! Error: Not connected to cognos server !!") print(f"Response: {r.status_code} {r.reason}") exit(1) self.server_version = self.get_server_version() return self def get_server_version(self): r = self.session.get(self.webservice + "pat/rsstartupblock_3.js") res = re.search(r"\=\"(\d{6})\";", r.text) if res: return res.group(1) return "202004" def get_folders(self): if len(self.folders) == 0: self.folders.append({"id": "_dot_public_folders", "name": "Team Content"}) self.load_folder_list() self.save_config() return self.folders def save_config(self): os.makedirs(self.cfg.cognos11.config_dir, exist_ok=True) with open(self.cfg.cognos11.folders_file, "w") as fwh: json.dump(self.folders, fwh, indent=2) with open(self.cfg.cognos11.reports_file, "w") as fwh: json.dump(self.reports, fwh, indent=2) with open(self.cfg.cognos11.jobs_file, "w") as fwh: json.dump(self.jobs, fwh, indent=2) def load_config_from_files(self): with open(self.cfg.cognos11.folders_file, "r") as frh: self.folders = json.load(frh) with open(self.cfg.cognos11.reports_file, "r") as frh: self.reports = json.load(frh) with open(self.cfg.cognos11.jobs_file, "w") as frh: self.jobs = json.load(frh) def load_folder_list(self, folder_id="_dot_public_folders", prefix="Team Content"): fields = ",".join( [ "id", "defaultName", "defaultDescription", "type", "modificationTime", "target.id", "target.searchPath", "target.defaultName", "base.id", "base.searchPath", "base.defaultName", "parameters", ] ) res = self.session.get( f"{self.webservice}v1/objects/{folder_id}/items?fields={fields}", headers=self.headers, ) folder_list = sorted(res.json()["data"], key=lambda x: x["defaultName"]) for f in folder_list: if f["type"] == "folder": folder = { "id": f["id"], "name": prefix + "/" + f["defaultName"].replace("/", "-"), } self.folders.append(folder) self.load_folder_list(folder["id"], folder["name"]) elif f["type"] in ("report", "reportView", "shortcut"): report = { "id": f["id"], "name": f["defaultName"].replace("/", "-"), "description": f["defaultDescription"], "modified": f["modificationTime"], "path": prefix, "type": f["type"], } if f["type"] == "shortcut": report["target_id"] = f["target"][0]["id"] if f["type"] == "reportView": report["target_id"] = "" if f["base"] is None else f["base"][0]["id"] report["parameters"] = {} if f["parameters"] is not None: params = [p for p in f["parameters"] if len(p["value"]) > 0] for p in params: report["parameters"][p["name"]] = dict([(v["use"], v["display"]) for v in p["value"]]) self.reports.append(report) elif f["type"] == "jobDefinition": job = {"id": f["id"], "name": f["defaultName"], "path": prefix} job["details"] = self.get_job_details(job["id"]) self.jobs.append(job) def get_report_details(self, object_id): fields = ",".join( [ "defaultDescription", "options", "executionPrompt", "parameters", "module.defaultName", "module.ancestors", "allowNotification", "base.id", "base.searchPath", "base.defaultName", "base.defaultDescription", "base.ancestors", "base.metadataModelPackage", "base.module", ] ) res = self.session.get( f"{self.webservice}v1/objects/{object_id}?fields={fields}", headers=self.headers, ) res = res.json()["data"][0] res.pop("_meta", None) res.pop("id", None) res.pop("type", None) res.pop("defaultName", None) return res def get_job_details(self, object_id): fields = ",".join( [ "userInterfaces", "disabled", "runInAdvancedViewer", "modificationTime", "canBurst", "defaultPortalAction", "base.defaultName", "tags", "target.searchPath", "target.disabled", "options", "base.options", ] ) res = self.session.get( f"{self.webservice}v1/objects/{object_id}?fields={fields}", headers=self.headers, ) job = res.json()["data"][0] job.pop("_meta", None) job.pop("id", None) job.pop("type", None) job.pop("defaultName", None) fields2 = ",".join( [ r"id,displaySequence,stepObject{defaultName}", r"stepObject{id},stepObject{parameters}", r"stepObject{canBurst},options,parameters", ] ) res = self.session.get( f"{self.webservice}v1/objects/{object_id}/items?types=jobStepDefinition&fields={fields2}", headers=self.headers, ) steps = res.json()["data"] for s in steps: s.pop("_meta", None) if s["stepObject"] is not None: s["report_id"] = s["stepObject"][0]["id"] s.pop("stepObject", None) job["steps"] = steps return job def get_report(self, report_id): self.get_folders() report = [r for r in self.reports if r["id"] == report_id] if len(report) == 0: return None report = report[0] if "meta" not in report: report = self.get_report_specs(report) return report def get_reports_in_folder(self, folder, recursive=False, specs=False): self.get_folders() if recursive: res = [r for r in self.reports if r["path"].startswith(folder)] else: res = [r for r in self.reports if r["path"] == folder] if specs: return [self.get_report_specs(r) for r in res] return res def get_report_specs(self, report): report = self.get_report_filename(report) headers = { "Content-Type": "text/xml; charset=UTF-8", "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"], "X-RsCMStoreID": report["id"], "X-UseRsConsumerMode": "true", "SOAPAction": f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/", } soap = self.templates["get_report"].render( { "caf": self.caf, "cam": self.cam, "report": report, "format": "XHTML", "prompt": "true", "tracking": "", "params": {}, } ) r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers) if r.status_code == 500: bs = BeautifulSoup(r.text, "xml") report["error"] = bs.find_all("messageString")[0].string logging.error(f"{report['path']}/{report['name']}") logging.error(report["error"]) return report parts = decoder.MultipartDecoder.from_response(r).parts meta = {"required": {}, "optional": {}} bs = BeautifulSoup(parts[0].content, "xml") result = bs.find("bus:result") details = result.find("bus:primaryRequest") params = details.find("bus:parameters") for item in params.find_all("item"): if item["xsi:type"] != "bus:parameterValue": continue k = item.find("bus:name").text # v = item.find("bus:value").find_all("item") v = {} for opt in item.find("bus:value").find_all("item"): if opt.find("bus:display") is not None: v[opt.find("bus:use").text] = opt.find("bus:display").text else: v[opt.find("bus:use").text] = opt.find("bus:use").text if len(v.items()) > 0: meta["required"][k] = v bs = BeautifulSoup(parts[1].content, "lxml") for sv in bs.find_all("selectvalue"): k = sv["parameter"] v = dict([(opt["usevalue"], opt.get("displayvalue", "")) for opt in sv.find_all("selectoption")]) meta["optional"][k] = v for sv in bs.find_all("selectdate"): k = sv["parameter"] v = dict([(opt["usevalue"], opt.get("displayvalue", "")) for opt in sv.find_all("selectoption")]) meta["optional"][k] = v filename = self.cfg.cognos11.config_dir + f"/params/{report['path']}/{report['name']}.json" os.makedirs(os.path.dirname(filename), exist_ok=True) json.dump(meta, open(filename, "w"), indent=2) report["cube"] = self.get_cube_name(meta) report["meta"] = meta report["spec"] = parts[2].text return report @staticmethod def get_cube_name(meta): for param in meta["optional"].values(): for key in param.keys(): if key.startswith("["): res = re.search(r"^\[([^\]]*)\]", key) return res.group(1) def get_report_filename(self, report): path = report["path"].replace("Team Content/ReportOutput", "") report["filename"] = f"{self.cfg.cognos11.reportoutput_dir}/{path}/{report['name']}.pdf" report["format"] = "PDF" if report["name"][-5:].lower() == ".xlsx": report["format"] = "spreadsheetML" report["filename"] = report["filename"][:-4] report["params"] = list(re.findall(r"\[([^\]]+)\]", report["filename"])) for i, p in enumerate(report["params"]): report["filename"] = report["filename"].replace("[" + p + "]", "{" + str(i) + "}") return report def request_unstubbed(self, report_id): report = self.get_report(report_id) if "spec" not in report: return "" payload = json.dumps({"reportspec_stubbed": report["spec"], "storeid": report["id"]}) headers = { "Content-Type": "application/json; charset=UTF-8", "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"), "authenticityToken": self.cam, "X-UseRsConsumerMode": "true", "cafContextId": self.caf, } r = self.session.post(self.webservice + "v1/reports/unstubreport", data=payload, headers=headers) if r.status_code != 200: logging.error(f"{report['path']}/{report['name']}") logging.error(r.text) return unstubbed = json.loads(r.text)["reportspec_full"] unstubbed = re.sub(r' iid="[^"]*"', "", unstubbed) bs = BeautifulSoup(unstubbed, "xml") for xa in bs.find_all("XMLAttributes"): if ( xa.find_all("XMLAttribute", {"name": "RS_dataType"}) or xa.find_all("XMLAttribute", {"name": "RS_CreateExtendedDataItems"}) or xa.find_all("XMLAttribute", {"name": "RS_legacyDrillDown"}) ): continue if xa.find_all("XMLAttribute", {"name": "supportsDefaultDataFormatting"}): for xa2 in xa.find_all("XMLAttribute"): if xa2.attrs["name"] != "supportsDefaultDataFormatting": xa2.decompose() continue xa.decompose() for cti in bs.find_all("crosstabIntersection"): if len(list(cti.children)) == 0: cti.decompose() unstubbed_report = str(bs) unstubbed_report = prettify_xml(unstubbed_report).replace("'", """) return unstubbed_report def get_report_headers(self, report_id=None): res = { "Content-Type": "text/xml; charset=UTF-8", "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"], "X-UseRsConsumerMode": "true", "SOAPAction": f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/", } if report_id is not None: res["X-RsCMStoreID"] = report_id return res def request_file(self, report_id, params, format="PDF"): report = self.get_report(report_id) headers = self.get_report_headers(report_id) soap = ( self.templates["get_report"] .render( { "caf": self.caf, "cam": self.cam, "report": report, "format": format, "prompt": "false", "tracking": "", "params": params, } ) .encode("utf-8") ) r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers) bs = BeautifulSoup(r.text, "xml") if r.status_code == 200: try: parts = decoder.MultipartDecoder.from_response(r).parts except decoder.NonMultipartContentTypeException: return 500, "Timeout" return 200, parts[1].content error = bs.find_all("messageString")[0].string logging.debug(error) return r.status_code, error def get_users(self, export_dir): self.get_licenses(export_dir) ns = self.get_namespaces() for item in ns: temp_items = self.get_namespace_items(item["id"]) item["items"] = self.get_sub_items(temp_items) with open(export_dir + "/namespaces.json", "w") as fwh: json.dump(ns, fwh, indent=2) return ns def get_sub_items(self, items): for sub_item in items: if sub_item["type"] == "namespaceFolder": temp_items = self.get_namespace_folder_items(sub_item["id"]) sub_item["items"] = self.get_sub_items(temp_items) if sub_item["type"] == "role": sub_item["members"] = self.get_role_members(sub_item["id"]) if sub_item["type"] == "group": sub_item["members"] = self.get_group_members(sub_item["id"]) return items def show_users(self, ns, indent=0): indent_str = " " * indent style = { "account": "@", "namespace": "#", "role": "~", "group": "+", "namespaceFolder": "*", } for item in ns: if item["type"] == "account": print(indent_str + f"@ {item['defaultName']} (uid={item.get('userName', 'xx')})") else: print(indent_str + f"{style.get(item['type'], '*')} {item['defaultName']} ({item['type']})") if "members" in item: for user in item["members"].get("users", []): print(indent_str + f" -> @ {user['defaultName']} (uid={user['userName']})") for group in item["members"].get("groups", []): print(indent_str + f" -> + {group['defaultName']} ({group['type']})") for role in item["members"].get("roles", []): print(indent_str + f" -> ~ {role['defaultName']} ({role['type']})") if "items" in item: self.show_users(item["items"], indent + 2) def export_users(self, ns, export_dir: str): data = self.export_users_recursive(ns, []) if len(data) > 0: with open(export_dir + "\\cognos_users.csv", "w", encoding="latin-1", newline="") as fwh: csv_fwh = csv.DictWriter(fwh, fieldnames=data[0].keys(), delimiter=";") csv_fwh.writeheader() csv_fwh.writerows(data) def export_users_recursive(self, tree, prev_folder_list): res = [] for item in tree: folder_list = prev_folder_list + [item["defaultName"]] if "items" in item: res.extend(self.export_users_recursive(item["items"], folder_list)) elif "members" in item and isinstance(item["members"], dict): for e in ["users", "groups", "roles"]: res.extend(self.export_users_recursive(item["members"].get(e, []), folder_list)) else: res.append( { "Name": item["defaultName"], "Typ": item["type"], "Pfad": "/".join(prev_folder_list), "ID": item["id"], "Suchpfad": item["searchPath"].replace('"', "'"), "UID": item.get("userName", ""), "Email": item.get("email", ""), "Erstellung": item.get("creationTime", ""), } ) return res def get_licenses(self, export_dir): res = self.session.get( f"{self.webservice}v1/licenses/details", headers=self.headers, ) data = res.text.replace(",", ";") with open(export_dir + "/licenses.csv", "w", encoding="latin-1", newline="") as fwh: fwh.write(data) return data def get_namespaces(self): fields = ",".join( [ "id", "defaultName", "searchPath", "objectClass", "creationTime", "modificationTime", "type", ] ) res = self.session.get( f"{self.webservice}v1/namespaces?fields={fields}", headers=self.headers, ) return res.json()["data"] def get_namespace_items(self, id): fields = ",".join( [ "id", "defaultName", "searchPath", "objectClass", "creationTime", "modificationTime", "type", "email", "givenName", "surname", "userName", ] ) res = self.session.get( f"{self.webservice}v1/namespaces/{id}/items?fields={fields}", headers=self.headers, ) return res.json()["data"] def get_namespace_folder_items(self, id): fields = ",".join( [ "id", "defaultName", "searchPath", "objectClass", "creationTime", "modificationTime", "type", ] ) res = self.session.get( f"{self.webservice}v1/folders/{id}/items?fields={fields}", headers=self.headers, ) return res.json()["data"] def get_role_members(self, id): res = self.session.get( f"{self.webservice}v1/roles/{id}/members", headers=self.headers, ) return res.json() def get_group_members(self, id): res = self.session.get( f"{self.webservice}v1/groups/{id}/members", headers=self.headers, ) return res.json() def create_report(self, folder_id, fullpath): # self.session.get(self.webservice + 'v1/reports/templates?path=%2Fcontent%2Ffolder%5B%40name%3D%27Templates%27%5D/ # *[@objectClass=%27interactiveReport%27%20or%20@objectClass=%27report%27%20or%20@objectClass=%27reportTemplate%27]&maxResults=100&locale=de') # self.session.get(self.webservice + 'v1/reports/startupconfig?keys=supportedContentLocales,supportedCurrencies, # supportedFonts,metadataInformationURI,glossaryURI&locale=de') headers = self.get_report_headers() soap = self.templates["get_package"].render({"caf": self.caf, "cam": self.cam}).encode("utf-8") r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers) open("package.xml", "wb").write(r.content) search_path = self.request_search_path(folder_id) report_name = os.path.basename(fullpath) unstubbed = open(fullpath, "rb").read() headers["SOAPAction"] = f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/.session" headers["Referer"] = "http://localhost:9300/bi/pat/rsapp.htm" # headers['caf'] = self.caf soap = ( self.templates["create_report"] .render( { "caf": self.caf, "cam": self.cam, "search_path": search_path, "report_name": report_name, "unstubbed": unstubbed, } ) .encode("utf-8") ) r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers) open("request_create.xml", "wb").write(r.request.body) print(r.status_code) print(r.text) def update_report(self, report, fullpath): search_path = self.request_search_path(report["id"]) unstubbed = open(fullpath, "r").read() headers = self.get_report_headers(report["id"]) # headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm' headers["caf"] = self.caf soap = self.templates["update_report"].render( { "caf": self.caf, "cam": self.cam, "search_path": search_path, "unstubbed": unstubbed, } ) # .encode("utf-8") r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers) # open('request_update.xml', 'wb').write(r.request.body) print(r.status_code) print(r.text) def create_folder(self, parent_id, folder_name): data = json.dumps({"defaultName": folder_name, "type": "folder"}) res = self.session.post( f"{self.webservice}v1/objects/{parent_id}/items", headers=self.headers, data=data, ) if res.status_code == 201: loc = res.headers.get("Location") folder_id = loc.split("/")[-1] self.folders.append({"id": folder_id, "name": folder_name}) return folder_id def request_search_path(self, id): res = self.session.get(f"{self.webservice}v1/objects/{id}?fields=searchPath", headers=self.headers) return res.json()["data"][0]["searchPath"]