c11_api.py 20 KB


  1. import base64
  2. import os
  3. import requests
  4. from requests_toolbelt.multipart import decoder
  5. import jinja2
  6. import json
  7. import re
  8. from bs4 import BeautifulSoup
  9. import config
  10. from cognos11.xml_prettify import prettify_xml
  11. import logging
  12. class c11_api:
  13. webservice = ""
  14. templates_dir = ""
  15. # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
  16. headers = {}
  17. caf = ""
  18. cam = ""
  19. reports = []
  20. folders = []
  21. jobs = []
  22. server_version = "202004"
  23. def __init__(self, cfg: config.Config):
  24. self.cfg = cfg
  25. self.webservice = cfg.cognos11.webservice
  26. self.templates_dir = cfg.tasks_dir + "/scripts/templates"
  27. self._env = jinja2.Environment(
  28. loader=jinja2.FileSystemLoader(self.templates_dir),
  29. autoescape=jinja2.select_autoescape(["html", "xml"]),
  30. )
  31. self.templates = {
  32. "get_report": self._env.get_template("get_report.xml"),
  33. "create_report": self._env.get_template("create_report.xml"),
  34. "update_report": self._env.get_template("update_report.xml"),
  35. "get_package": self._env.get_template("get_package.xml"),
  36. }
  37. @staticmethod
  38. def generate_token(message_base64):
  39. version = "V1".encode("utf-8")
  40. header_len = 4
  41. msg = base64.b64decode(message_base64)[1:]
  42. chunks = []
  43. while len(msg) >= header_len:
  44. chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
  45. msg = msg[header_len:]
  46. chunks.append(msg[:chunk_len])
  47. msg = msg[chunk_len:]
  48. return base64.b64encode(version + chunks[-1]).decode("utf-8")
  49. def login(self):
  50. cred = self.cfg.cognos11.credetials
  51. credentials = {
  52. "parameters": [
  53. {"name": "h_CAM_action", "value": "logonAs"},
  54. {"name": "CAMNamespace", "value": cred.namespace},
  55. {"name": "CAMUsername", "value": cred.username},
  56. {"name": "CAMPassword", "value": cred.password},
  57. ]
  58. }
  59. self.session = requests.Session()
  60. r = self.session.get(self.webservice)
  61. self.headers = {
  62. "Content-Type": "application/json; charset=UTF-8",
  63. "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"),
  64. }
  65. r = self.session.post(
  66. self.webservice + "v1/login",
  67. data=json.dumps(credentials),
  68. headers=self.headers,
  69. )
  70. self.caf = r.json()["cafContextId"]
  71. self.cam = self.generate_token(r.cookies["usersessionid"])
  72. self.server_version = self.get_server_version()
  73. return self
  74. def get_server_version(self):
  75. r = self.session.get(self.webservice + "pat/rsstartupblock_3.js")
  76. res = re.search(r"\=\"(\d{6})\";", r.text)
  77. if res:
  78. return res.group(1)
  79. return "202004"
  80. def get_folders(self):
  81. if len(self.folders) == 0:
  82. self.folders.append({"id": "_dot_public_folders", "name": "Team Content"})
  83. self.load_folder_list()
  84. self.save_config()
  85. return self.folders
  86. def save_config(self):
  87. os.makedirs(self.cfg.cognos11.config_dir, exist_ok=True)
  88. with open(self.cfg.cognos11.folders_file, "w") as fwh:
  89. json.dump(self.folders, fwh, indent=2)
  90. with open(self.cfg.cognos11.reports_file, "w") as fwh:
  91. json.dump(self.reports, fwh, indent=2)
  92. with open(self.cfg.cognos11.jobs_file, "w") as fwh:
  93. json.dump(self.jobs, fwh, indent=2)
  94. def load_config_from_files(self):
  95. with open(self.cfg.cognos11.folders_file, "r") as frh:
  96. self.folders = json.load(frh)
  97. with open(self.cfg.cognos11.reports_file, "r") as frh:
  98. self.reports = json.load(frh)
  99. with open(self.cfg.cognos11.jobs_file, "w") as frh:
  100. self.jobs = json.load(frh)
  101. def load_folder_list(self, folder_id="_dot_public_folders", prefix="Team Content"):
  102. fields = ",".join(
  103. [
  104. "id",
  105. "defaultName",
  106. "defaultDescription",
  107. "type",
  108. "modificationTime",
  109. "target.id",
  110. "target.searchPath",
  111. "target.defaultName",
  112. "base.id",
  113. "base.searchPath",
  114. "base.defaultName",
  115. "parameters",
  116. ]
  117. )
  118. res = self.session.get(
  119. f"{self.webservice}v1/objects/{folder_id}/items?fields={fields}",
  120. headers=self.headers,
  121. )
  122. folder_list = sorted(res.json()["data"], key=lambda x: x["defaultName"])
  123. for f in folder_list:
  124. if f["type"] == "folder":
  125. folder = {
  126. "id": f["id"],
  127. "name": prefix + "/" + f["defaultName"].replace("/", "-"),
  128. }
  129. self.folders.append(folder)
  130. self.load_folder_list(folder["id"], folder["name"])
  131. elif f["type"] in ("report", "reportView", "shortcut"):
  132. report = {
  133. "id": f["id"],
  134. "name": f["defaultName"].replace("/", "-"),
  135. "description": f["defaultDescription"],
  136. "modified": f["modificationTime"],
  137. "path": prefix,
  138. "type": f["type"],
  139. }
  140. if f["type"] == "shortcut":
  141. report["target_id"] = f["target"][0]["id"]
  142. if f["type"] == "reportView":
  143. report["target_id"] = "" if f["base"] is None else f["base"][0]["id"]
  144. report["parameters"] = {}
  145. if f["parameters"] is not None:
  146. params = [p for p in f["parameters"] if len(p["value"]) > 0]
  147. for p in params:
  148. report["parameters"][p["name"]] = dict([(v["use"], v["display"]) for v in p["value"]])
  149. self.reports.append(report)
  150. elif f["type"] == "jobDefinition":
  151. job = {"id": f["id"], "name": f["defaultName"], "path": prefix}
  152. job["details"] = self.get_job_details(job["id"])
  153. self.jobs.append(job)
  154. def get_report_details(self, object_id):
  155. fields = ",".join(
  156. [
  157. "defaultDescription",
  158. "options",
  159. "executionPrompt",
  160. "parameters",
  161. "module.defaultName",
  162. "module.ancestors",
  163. "allowNotification",
  164. "base.id",
  165. "base.searchPath",
  166. "base.defaultName",
  167. "base.defaultDescription",
  168. "base.ancestors",
  169. "base.metadataModelPackage",
  170. "base.module",
  171. ]
  172. )
  173. res = self.session.get(
  174. f"{self.webservice}v1/objects/{object_id}?fields={fields}",
  175. headers=self.headers,
  176. )
  177. res = res.json()["data"][0]
  178. res.pop("_meta", None)
  179. res.pop("id", None)
  180. res.pop("type", None)
  181. res.pop("defaultName", None)
  182. return res
  183. def get_job_details(self, object_id):
  184. fields = ",".join(
  185. [
  186. "userInterfaces",
  187. "disabled",
  188. "runInAdvancedViewer",
  189. "modificationTime",
  190. "canBurst",
  191. "defaultPortalAction",
  192. "base.defaultName",
  193. "tags",
  194. "target.searchPath",
  195. "target.disabled",
  196. "options",
  197. "base.options",
  198. ]
  199. )
  200. res = self.session.get(
  201. f"{self.webservice}v1/objects/{object_id}?fields={fields}",
  202. headers=self.headers,
  203. )
  204. job = res.json()["data"][0]
  205. job.pop("_meta", None)
  206. job.pop("id", None)
  207. job.pop("type", None)
  208. job.pop("defaultName", None)
  209. fields2 = ",".join(
  210. [
  211. r"id,displaySequence,stepObject{defaultName}",
  212. r"stepObject{id},stepObject{parameters}",
  213. r"stepObject{canBurst},options,parameters",
  214. ]
  215. )
  216. res = self.session.get(
  217. f"{self.webservice}v1/objects/{object_id}/items?types=jobStepDefinition&fields={fields2}",
  218. headers=self.headers,
  219. )
  220. steps = res.json()["data"]
  221. for s in steps:
  222. s.pop("_meta", None)
  223. if s["stepObject"] is not None:
  224. s["report_id"] = s["stepObject"][0]["id"]
  225. s.pop("stepObject", None)
  226. job["steps"] = steps
  227. return job
  228. def get_report(self, report_id):
  229. self.get_folders()
  230. report = [r for r in self.reports if r["id"] == report_id]
  231. if len(report) == 0:
  232. return None
  233. report = report[0]
  234. if "meta" not in report:
  235. report = self.get_report_specs(report)
  236. return report
  237. def get_reports_in_folder(self, folder, recursive=False, specs=False):
  238. self.get_folders()
  239. if recursive:
  240. res = [r for r in self.reports if r["path"].startswith(folder)]
  241. else:
  242. res = [r for r in self.reports if r["path"] == folder]
  243. if specs:
  244. return [self.get_report_specs(r) for r in res]
  245. return res
  246. def get_report_specs(self, report):
  247. report = self.get_report_filename(report)
  248. headers = {
  249. "Content-Type": "text/xml; charset=UTF-8",
  250. "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"],
  251. "X-RsCMStoreID": report["id"],
  252. "X-UseRsConsumerMode": "true",
  253. "SOAPAction": f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/",
  254. }
  255. soap = self.templates["get_report"].render(
  256. {
  257. "caf": self.caf,
  258. "cam": self.cam,
  259. "report": report,
  260. "format": "XHTML",
  261. "prompt": "true",
  262. "tracking": "",
  263. "params": {},
  264. }
  265. )
  266. r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
  267. if r.status_code == 500:
  268. bs = BeautifulSoup(r.text, "xml")
  269. report["error"] = bs.find_all("messageString")[0].string
  270. logging.error(f"{report['path']}/{report['name']}")
  271. logging.error(report["error"])
  272. return report
  273. parts = decoder.MultipartDecoder.from_response(r).parts
  274. meta = {"required": {}, "optional": {}}
  275. bs = BeautifulSoup(parts[0].content, "xml")
  276. result = bs.find("bus:result")
  277. details = result.find("bus:primaryRequest")
  278. params = details.find("bus:parameters")
  279. for item in params.find_all("item"):
  280. if item["xsi:type"] != "bus:parameterValue":
  281. continue
  282. k = item.find("bus:name").text
  283. # v = item.find("bus:value").find_all("item")
  284. v = {}
  285. for opt in item.find("bus:value").find_all("item"):
  286. if opt.find("bus:display") is not None:
  287. v[opt.find("bus:use").text] = opt.find("bus:display").text
  288. else:
  289. v[opt.find("bus:use").text] = opt.find("bus:use").text
  290. if len(v.items()) > 0:
  291. meta["required"][k] = v
  292. bs = BeautifulSoup(parts[1].content, "lxml")
  293. for sv in bs.find_all("selectvalue"):
  294. k = sv["parameter"]
  295. v = dict([(opt["usevalue"], opt.get("displayvalue", "")) for opt in sv.find_all("selectoption")])
  296. meta["optional"][k] = v
  297. for sv in bs.find_all("selectdate"):
  298. k = sv["parameter"]
  299. v = dict([(opt["usevalue"], opt.get("displayvalue", "")) for opt in sv.find_all("selectoption")])
  300. meta["optional"][k] = v
  301. filename = self.cfg.cognos11.config_dir + f"/params/{report['path']}/{report['name']}.json"
  302. os.makedirs(os.path.dirname(filename), exist_ok=True)
  303. json.dump(meta, open(filename, "w"), indent=2)
  304. report["cube"] = self.get_cube_name(meta)
  305. report["meta"] = meta
  306. report["spec"] = parts[2].text
  307. return report
  308. @staticmethod
  309. def get_cube_name(meta):
  310. for param in meta["optional"].values():
  311. for key in param.keys():
  312. if key.startswith("["):
  313. res = re.search(r"^\[([^\]]*)\]", key)
  314. return res.group(1)
  315. def get_report_filename(self, report):
  316. path = report["path"].replace("Team Content/ReportOutput", "")
  317. report["filename"] = f"{self.cfg.cognos11.reportoutput_dir}/{path}/{report['name']}.pdf"
  318. report["format"] = "PDF"
  319. if report["name"][-5:].lower() == ".xlsx":
  320. report["format"] = "spreadsheetML"
  321. report["filename"] = report["filename"][:-4]
  322. report["params"] = list(re.findall(r"\[([^\]]+)\]", report["filename"]))
  323. for i, p in enumerate(report["params"]):
  324. report["filename"] = report["filename"].replace("[" + p + "]", "{" + str(i) + "}")
  325. return report
  326. def request_unstubbed(self, report_id):
  327. report = self.get_report(report_id)
  328. if "spec" not in report:
  329. return ""
  330. payload = json.dumps({"reportspec_stubbed": report["spec"], "storeid": report["id"]})
  331. headers = {
  332. "Content-Type": "application/json; charset=UTF-8",
  333. "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"),
  334. "authenticityToken": self.cam,
  335. "X-UseRsConsumerMode": "true",
  336. "cafContextId": self.caf,
  337. }
  338. r = self.session.post(self.webservice + "v1/reports/unstubreport", data=payload, headers=headers)
  339. if r.status_code != 200:
  340. logging.error(f"{report['path']}/{report['name']}")
  341. logging.error(r.text)
  342. return
  343. unstubbed = json.loads(r.text)["reportspec_full"]
  344. unstubbed = re.sub(r' iid="[^"]*"', "", unstubbed)
  345. bs = BeautifulSoup(unstubbed, "xml")
  346. for xa in bs.find_all("XMLAttributes"):
  347. if (
  348. xa.find_all("XMLAttribute", {"name": "RS_dataType"})
  349. or xa.find_all("XMLAttribute", {"name": "RS_CreateExtendedDataItems"})
  350. or xa.find_all("XMLAttribute", {"name": "RS_legacyDrillDown"})
  351. ):
  352. continue
  353. if xa.find_all("XMLAttribute", {"name": "supportsDefaultDataFormatting"}):
  354. for xa2 in xa.find_all("XMLAttribute"):
  355. if xa2.attrs["name"] != "supportsDefaultDataFormatting":
  356. xa2.decompose()
  357. continue
  358. xa.decompose()
  359. for cti in bs.find_all("crosstabIntersection"):
  360. if len(list(cti.children)) == 0:
  361. cti.decompose()
  362. unstubbed_report = str(bs)
  363. unstubbed_report = prettify_xml(unstubbed_report).replace("'", """)
  364. return unstubbed_report
  365. def get_report_headers(self, report_id=None):
  366. res = {
  367. "Content-Type": "text/xml; charset=UTF-8",
  368. "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"],
  369. "X-UseRsConsumerMode": "true",
  370. "SOAPAction": f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/",
  371. }
  372. if report_id is not None:
  373. res["X-RsCMStoreID"] = report_id
  374. return res
  375. def request_file(self, report_id, params, format="PDF"):
  376. report = self.get_report(report_id)
  377. headers = self.get_report_headers(report_id)
  378. soap = (
  379. self.templates["get_report"]
  380. .render(
  381. {
  382. "caf": self.caf,
  383. "cam": self.cam,
  384. "report": report,
  385. "format": format,
  386. "prompt": "false",
  387. "tracking": "",
  388. "params": params,
  389. }
  390. )
  391. .encode("utf-8")
  392. )
  393. r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
  394. bs = BeautifulSoup(r.text, "xml")
  395. if r.status_code == 200:
  396. try:
  397. parts = decoder.MultipartDecoder.from_response(r).parts
  398. except decoder.NonMultipartContentTypeException:
  399. return 500, "Timeout"
  400. return 200, parts[1].content
  401. error = bs.find_all("messageString")[0].string
  402. logging.debug(error)
  403. return r.status_code, error
  404. def create_report(self, folder_id, fullpath):
  405. # self.session.get(self.webservice + 'v1/reports/templates?path=%2Fcontent%2Ffolder%5B%40name%3D%27Templates%27%5D/
  406. # *[@objectClass=%27interactiveReport%27%20or%20@objectClass=%27report%27%20or%20@objectClass=%27reportTemplate%27]&maxResults=100&locale=de')
  407. # self.session.get(self.webservice + 'v1/reports/startupconfig?keys=supportedContentLocales,supportedCurrencies,
  408. # supportedFonts,metadataInformationURI,glossaryURI&locale=de')
  409. headers = self.get_report_headers()
  410. soap = self.templates["get_package"].render({"caf": self.caf, "cam": self.cam}).encode("utf-8")
  411. r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
  412. open("package.xml", "wb").write(r.content)
  413. search_path = self.request_search_path(folder_id)
  414. report_name = os.path.basename(fullpath)
  415. unstubbed = open(fullpath, "rb").read()
  416. headers["SOAPAction"] = f"http://www.ibm.com/xmlns/prod/cognos/reportService/{self.server_version}/.session"
  417. headers["Referer"] = "http://localhost:9300/bi/pat/rsapp.htm"
  418. # headers['caf'] = self.caf
  419. soap = (
  420. self.templates["create_report"]
  421. .render(
  422. {
  423. "caf": self.caf,
  424. "cam": self.cam,
  425. "search_path": search_path,
  426. "report_name": report_name,
  427. "unstubbed": unstubbed,
  428. }
  429. )
  430. .encode("utf-8")
  431. )
  432. r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
  433. open("request_create.xml", "wb").write(r.request.body)
  434. print(r.status_code)
  435. print(r.text)
  436. def update_report(self, report, fullpath):
  437. search_path = self.request_search_path(report["id"])
  438. unstubbed = open(fullpath, "r").read()
  439. headers = self.get_report_headers(report["id"])
  440. # headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm'
  441. headers["caf"] = self.caf
  442. soap = self.templates["update_report"].render(
  443. {
  444. "caf": self.caf,
  445. "cam": self.cam,
  446. "search_path": search_path,
  447. "unstubbed": unstubbed,
  448. }
  449. ) # .encode("utf-8")
  450. r = self.session.post(self.webservice + "v1/reports", data=soap, headers=headers)
  451. # open('request_update.xml', 'wb').write(r.request.body)
  452. print(r.status_code)
  453. print(r.text)
  454. def create_folder(self, parent_id, folder_name):
  455. data = json.dumps({"defaultName": folder_name, "type": "folder"})
  456. res = self.session.post(
  457. f"{self.webservice}v1/objects/{parent_id}/items",
  458. headers=self.headers,
  459. data=data,
  460. )
  461. if res.status_code == 201:
  462. loc = res.headers.get("Location")
  463. folder_id = loc.split("/")[-1]
  464. self.folders.append({"id": folder_id, "name": folder_name})
  465. return folder_id
  466. def request_search_path(self, id):
  467. res = self.session.get(f"{self.webservice}v1/objects/{id}?fields=searchPath", headers=self.headers)
  468. return res.json()["data"][0]["searchPath"]
  469. if __name__ == "__main__":
  470. api = c11_api()
  471. api.login()
  472. folders = api.get_folders()
  473. # filename = "C:/GlobalCube/Tasks/gctools/logs/config/folders.json"
  474. # json.dump(folders, open(filename, "w"), indent=2)
  475. # filename = "C:/GlobalCube/Tasks/gctools/logs/config/reports.json"
  476. # json.dump(api.reports, open(filename, "w"), indent=2)
  477. # filename = "C:/GlobalCube/Tasks/gctools/logs/config/jobs.json"
  478. # json.dump(api.jobs, open(filename, "w"), indent=2)