c11_api.py 20 KB


  1. import base64
  2. import os
  3. import requests
  4. from requests_toolbelt.multipart import decoder
  5. import jinja2
  6. import json
  7. import re
  8. from bs4 import BeautifulSoup
  9. import config
  10. from cognos11.xml_prettify import prettify_xml
  11. import logging
  12. class c11_api:
  13. webservice = ""
  14. templates_dir = ""
  15. # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
  16. headers = {}
  17. caf = ""
  18. cam = ""
  19. reports = []
  20. folders = []
  21. jobs = []
  22. def __init__(self, cfg: config.Config):
  23. self.cfg = cfg
  24. self.webservice = cfg.cognos11.webservice
  25. self.templates_dir = cfg.tasks_dir + "/scripts/templates"
  26. self._env = jinja2.Environment(
  27. loader=jinja2.FileSystemLoader(self.templates_dir),
  28. autoescape=jinja2.select_autoescape(["html", "xml"]),
  29. )
  30. self.templates = {
  31. "get_report": self._env.get_template("get_report.xml"),
  32. "create_report": self._env.get_template("create_report.xml"),
  33. "update_report": self._env.get_template("update_report.xml"),
  34. "get_package": self._env.get_template("get_package.xml"),
  35. }
  36. @staticmethod
  37. def generate_token(message_base64):
  38. version = "V1".encode("utf-8")
  39. header_len = 4
  40. msg = base64.b64decode(message_base64)[1:]
  41. chunks = []
  42. while len(msg) >= header_len:
  43. chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
  44. msg = msg[header_len:]
  45. chunks.append(msg[:chunk_len])
  46. msg = msg[chunk_len:]
  47. return base64.b64encode(version + chunks[-1]).decode("utf-8")
  48. def login(self):
  49. cred = self.cfg.cognos11.credetials
  50. credentials = {
  51. "parameters": [
  52. {"name": "h_CAM_action", "value": "logonAs"},
  53. {"name": "CAMNamespace", "value": cred.namespace},
  54. {"name": "CAMUsername", "value": cred.username},
  55. {"name": "CAMPassword", "value": cred.password},
  56. ]
  57. }
  58. self.session = requests.Session()
  59. r = self.session.get(self.webservice)
  60. self.headers = {
  61. "Content-Type": "application/json; charset=UTF-8",
  62. "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"),
  63. }
  64. r = self.session.post(
  65. self.webservice + "v1/login",
  66. data=json.dumps(credentials),
  67. headers=self.headers,
  68. )
  69. self.caf = r.json()["cafContextId"]
  70. self.cam = self.generate_token(r.cookies["usersessionid"])
  71. return self
  72. def get_folders(self):
  73. if len(self.folders) == 0:
  74. self.folders.append({"id": "_dot_public_folders", "name": "Team Content"})
  75. self.load_folder_list()
  76. self.save_config()
  77. return self.folders
  78. def save_config(self):
  79. os.makedirs(self.cfg.cognos11.config_dir, exist_ok=True)
  80. with open(self.cfg.cognos11.folders_file, "w") as fwh:
  81. json.dump(self.folders, fwh, indent=2)
  82. with open(self.cfg.cognos11.reports_file, "w") as fwh:
  83. json.dump(self.reports, fwh, indent=2)
  84. with open(self.cfg.cognos11.jobs_file, "w") as fwh:
  85. json.dump(self.jobs, fwh, indent=2)
  86. def load_config_from_files(self):
  87. with open(self.cfg.cognos11.folders_file, "r") as frh:
  88. self.folders = json.load(frh)
  89. with open(self.cfg.cognos11.reports_file, "r") as frh:
  90. self.reports = json.load(frh)
  91. with open(self.cfg.cognos11.jobs_file, "w") as frh:
  92. self.jobs = json.load(frh)
  93. def load_folder_list(self, folder_id="_dot_public_folders", prefix="Team Content"):
  94. fields = ",".join(
  95. [
  96. "id",
  97. "defaultName",
  98. "defaultDescription",
  99. "type",
  100. "modificationTime",
  101. "target.id",
  102. "target.searchPath",
  103. "target.defaultName",
  104. "base.id",
  105. "base.searchPath",
  106. "base.defaultName",
  107. "parameters",
  108. ]
  109. )
  110. res = self.session.get(
  111. f"{self.webservice}v1/objects/{folder_id}/items?fields={fields}",
  112. headers=self.headers,
  113. )
  114. folder_list = sorted(res.json()["data"], key=lambda x: x["defaultName"])
  115. for f in folder_list:
  116. if f["type"] == "folder":
  117. folder = {
  118. "id": f["id"],
  119. "name": prefix + "/" + f["defaultName"].replace("/", "-"),
  120. }
  121. self.folders.append(folder)
  122. self.load_folder_list(folder["id"], folder["name"])
  123. elif f["type"] in ("report", "reportView", "shortcut"):
  124. report = {
  125. "id": f["id"],
  126. "name": f["defaultName"].replace("/", "-"),
  127. "description": f["defaultDescription"],
  128. "modified": f["modificationTime"],
  129. "path": prefix,
  130. "type": f["type"],
  131. }
  132. if f["type"] == "shortcut":
  133. report["target_id"] = f["target"][0]["id"]
  134. if f["type"] == "reportView":
  135. report["target_id"] = f["base"][0]["id"]
  136. report["parameters"] = {}
  137. params = [p for p in f["parameters"] if len(p["value"]) > 0]
  138. for p in params:
  139. report["parameters"][p["name"]] = dict(
  140. [(v["use"], v["display"]) for v in p["value"]]
  141. )
  142. # report["details"] = self.get_report_details(report["id"])
  143. # print(json.dumps(f, indent=2))
  144. self.reports.append(report)
  145. elif f["type"] == "jobDefinition":
  146. job = {"id": f["id"], "name": f["defaultName"], "path": prefix}
  147. job["details"] = self.get_job_details(job["id"])
  148. self.jobs.append(job)
  149. def get_report_details(self, object_id):
  150. fields = ",".join(
  151. [
  152. "defaultDescription",
  153. "options",
  154. "executionPrompt",
  155. "parameters",
  156. "module.defaultName",
  157. "module.ancestors",
  158. "allowNotification",
  159. "base.id",
  160. "base.searchPath",
  161. "base.defaultName",
  162. "base.defaultDescription",
  163. "base.ancestors",
  164. "base.metadataModelPackage",
  165. "base.module",
  166. ]
  167. )
  168. res = self.session.get(
  169. f"{self.webservice}v1/objects/{object_id}?fields={fields}",
  170. headers=self.headers,
  171. )
  172. res = res.json()["data"][0]
  173. res.pop("_meta", None)
  174. res.pop("id", None)
  175. res.pop("type", None)
  176. res.pop("defaultName", None)
  177. return res
  178. def get_job_details(self, object_id):
  179. fields = ",".join(
  180. [
  181. "userInterfaces",
  182. "disabled",
  183. "runInAdvancedViewer",
  184. "modificationTime",
  185. "canBurst",
  186. "defaultPortalAction",
  187. "base.defaultName",
  188. "tags",
  189. "target.searchPath",
  190. "target.disabled",
  191. "options",
  192. "base.options",
  193. ]
  194. )
  195. res = self.session.get(
  196. f"{self.webservice}v1/objects/{object_id}?fields={fields}",
  197. headers=self.headers,
  198. )
  199. job = res.json()["data"][0]
  200. job.pop("_meta", None)
  201. job.pop("id", None)
  202. job.pop("type", None)
  203. job.pop("defaultName", None)
  204. fields2 = ",".join(
  205. [
  206. r"id,displaySequence,stepObject{defaultName}",
  207. r"stepObject{id},stepObject{parameters}",
  208. r"stepObject{canBurst},options,parameters",
  209. ]
  210. )
  211. res = self.session.get(
  212. f"{self.webservice}v1/objects/{object_id}/items?types=jobStepDefinition&fields={fields2}",
  213. headers=self.headers,
  214. )
  215. steps = res.json()["data"]
  216. for s in steps:
  217. s.pop("_meta", None)
  218. if s["stepObject"] is not None:
  219. s["report_id"] = s["stepObject"][0]["id"]
  220. s.pop("stepObject", None)
  221. job["steps"] = steps
  222. return job
  223. def get_report(self, report_id):
  224. self.get_folders()
  225. report = [r for r in self.reports if r["id"] == report_id]
  226. if len(report) == 0:
  227. return None
  228. report = report[0]
  229. if "meta" not in report:
  230. report = self.get_report_specs(report)
  231. return report
  232. def get_reports_in_folder(self, folder, recursive=False, specs=False):
  233. self.get_folders()
  234. if recursive:
  235. res = [r for r in self.reports if r["path"].startswith(folder)]
  236. else:
  237. res = [r for r in self.reports if r["path"] == folder]
  238. if specs:
  239. return [self.get_report_specs(r) for r in res]
  240. return res
  241. def get_report_specs(self, report):
  242. report = self.get_report_filename(report)
  243. headers = {
  244. "Content-Type": "text/xml; charset=UTF-8",
  245. "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"],
  246. "X-RsCMStoreID": report["id"],
  247. "X-UseRsConsumerMode": "true",
  248. "SOAPAction": "http://www.ibm.com/xmlns/prod/cognos/reportService/202004/",
  249. }
  250. soap = self.templates["get_report"].render(
  251. {
  252. "caf": self.caf,
  253. "cam": self.cam,
  254. "report": report,
  255. "format": "XHTML",
  256. "prompt": "true",
  257. "tracking": "",
  258. "params": {},
  259. }
  260. )
  261. r = self.session.post(
  262. self.webservice + "v1/reports", data=soap, headers=headers
  263. )
  264. if r.status_code == 500:
  265. bs = BeautifulSoup(r.text, "xml")
  266. report["error"] = bs.find_all("messageString")[0].string
  267. logging.error(f"{report['path']}/{report['name']}")
  268. logging.error(report["error"])
  269. return report
  270. parts = decoder.MultipartDecoder.from_response(r).parts
  271. meta = {"required": {}, "optional": {}}
  272. bs = BeautifulSoup(parts[0].content, "xml")
  273. result = bs.find("bus:result")
  274. details = result.find("bus:primaryRequest")
  275. params = details.find("bus:parameters")
  276. for item in params.find_all("item"):
  277. if item["xsi:type"] != "bus:parameterValue":
  278. continue
  279. k = item.find("bus:name").text
  280. # v = item.find("bus:value").find_all("item")
  281. v = {}
  282. for opt in item.find("bus:value").find_all("item"):
  283. if opt.find("bus:display") is not None:
  284. v[opt.find("bus:use").text] = opt.find("bus:display").text
  285. else:
  286. v[opt.find("bus:use").text] = opt.find("bus:use").text
  287. if len(v.items()) > 0:
  288. meta["required"][k] = v
  289. bs = BeautifulSoup(parts[1].content, "lxml")
  290. for sv in bs.find_all("selectvalue"):
  291. k = sv["parameter"]
  292. v = dict(
  293. [
  294. (opt["usevalue"], opt.get("displayvalue", ""))
  295. for opt in sv.find_all("selectoption")
  296. ]
  297. )
  298. meta["optional"][k] = v
  299. for sv in bs.find_all("selectdate"):
  300. k = sv["parameter"]
  301. v = dict(
  302. [
  303. (opt["usevalue"], opt.get("displayvalue", ""))
  304. for opt in sv.find_all("selectoption")
  305. ]
  306. )
  307. meta["optional"][k] = v
  308. filename = (
  309. self.cfg.cognos11.config_dir
  310. + f"/params/{report['path']}/{report['name']}.json"
  311. )
  312. os.makedirs(os.path.dirname(filename), exist_ok=True)
  313. json.dump(meta, open(filename, "w"), indent=2)
  314. report["meta"] = meta
  315. report["spec"] = parts[2].text
  316. return report
  317. def get_report_filename(self, report):
  318. path = report["path"].replace("Team Content/ReportOutput", "")
  319. report[
  320. "filename"
  321. ] = f"{self.cfg.cognos11.reportoutput_dir}/{path}/{report['name']}.pdf"
  322. report["format"] = "PDF"
  323. if report["name"][-5:].lower() == ".xlsx":
  324. report["format"] = "spreadsheetML"
  325. report["filename"] = report["filename"][:-4]
  326. report["params"] = list(re.findall(r"\[([^\]]+)\]", report["filename"]))
  327. for i, p in enumerate(report["params"]):
  328. report["filename"] = report["filename"].replace(
  329. "[" + p + "]", "{" + str(i) + "}"
  330. )
  331. return report
  332. def request_unstubbed(self, report_id):
  333. report = self.get_report(report_id)
  334. if "spec" not in report:
  335. return ""
  336. payload = json.dumps(
  337. {"reportspec_stubbed": report["spec"], "storeid": report["id"]}
  338. )
  339. headers = {
  340. "Content-Type": "application/json; charset=UTF-8",
  341. "X-XSRF-TOKEN": self.session.cookies.get("XSRF-TOKEN"),
  342. "authenticityToken": self.cam,
  343. "X-UseRsConsumerMode": "true",
  344. "cafContextId": self.caf,
  345. }
  346. r = self.session.post(
  347. self.webservice + "v1/reports/unstubreport", data=payload, headers=headers
  348. )
  349. if r.status_code != 200:
  350. logging.error(f"{report['path']}/{report['name']}")
  351. logging.error(r.text)
  352. return
  353. unstubbed = json.loads(r.text)["reportspec_full"]
  354. unstubbed = re.sub(r' iid="[^"]*"', "", unstubbed)
  355. bs = BeautifulSoup(unstubbed, "xml")
  356. for xa in bs.find_all("XMLAttributes"):
  357. if (
  358. xa.find_all("XMLAttribute", {"name": "RS_dataType"})
  359. or xa.find_all("XMLAttribute", {"name": "RS_CreateExtendedDataItems"})
  360. or xa.find_all("XMLAttribute", {"name": "RS_legacyDrillDown"})
  361. ):
  362. continue
  363. if xa.find_all("XMLAttribute", {"name": "supportsDefaultDataFormatting"}):
  364. for xa2 in xa.find_all("XMLAttribute"):
  365. if xa2.attrs["name"] != "supportsDefaultDataFormatting":
  366. xa2.decompose()
  367. continue
  368. xa.decompose()
  369. for cti in bs.find_all("crosstabIntersection"):
  370. if len(list(cti.children)) == 0:
  371. cti.decompose()
  372. unstubbed_report = str(bs)
  373. unstubbed_report = prettify_xml(unstubbed_report).replace("'", """)
  374. return unstubbed_report
  375. def get_report_headers(self, report_id=None):
  376. res = {
  377. "Content-Type": "text/xml; charset=UTF-8",
  378. "X-XSRF-TOKEN": self.headers["X-XSRF-TOKEN"],
  379. "X-UseRsConsumerMode": "true",
  380. "SOAPAction": "http://www.ibm.com/xmlns/prod/cognos/reportService/202004/",
  381. }
  382. if report_id is not None:
  383. res["X-RsCMStoreID"] = report_id
  384. return res
  385. def request_file(self, report_id, params, format="PDF"):
  386. report = self.get_report(report_id)
  387. headers = self.get_report_headers(report_id)
  388. soap = (
  389. self.templates["get_report"]
  390. .render(
  391. {
  392. "caf": self.caf,
  393. "cam": self.cam,
  394. "report": report,
  395. "format": format,
  396. "prompt": "false",
  397. "tracking": "",
  398. "params": params,
  399. }
  400. )
  401. .encode("utf-8")
  402. )
  403. r = self.session.post(
  404. self.webservice + "v1/reports", data=soap, headers=headers
  405. )
  406. bs = BeautifulSoup(r.text, "xml")
  407. if r.status_code == 200:
  408. try:
  409. parts = decoder.MultipartDecoder.from_response(r).parts
  410. except decoder.NonMultipartContentTypeException:
  411. return 500, "Timeout"
  412. return 200, parts[1].content
  413. error = bs.find_all("messageString")[0].string
  414. logging.debug(error)
  415. return r.status_code, error
  416. def create_report(self, folder_id, fullpath):
  417. # self.session.get(self.webservice + 'v1/reports/templates?path=%2Fcontent%2Ffolder%5B%40name%3D%27Templates%27%5D/
  418. # *[@objectClass=%27interactiveReport%27%20or%20@objectClass=%27report%27%20or%20@objectClass=%27reportTemplate%27]&maxResults=100&locale=de')
  419. # self.session.get(self.webservice + 'v1/reports/startupconfig?keys=supportedContentLocales,supportedCurrencies,
  420. # supportedFonts,metadataInformationURI,glossaryURI&locale=de')
  421. headers = self.get_report_headers()
  422. soap = (
  423. self.templates["get_package"]
  424. .render({"caf": self.caf, "cam": self.cam})
  425. .encode("utf-8")
  426. )
  427. r = self.session.post(
  428. self.webservice + "v1/reports", data=soap, headers=headers
  429. )
  430. open("package.xml", "wb").write(r.content)
  431. search_path = self.request_search_path(folder_id)
  432. report_name = os.path.basename(fullpath)
  433. unstubbed = open(fullpath, "rb").read()
  434. headers[
  435. "SOAPAction"
  436. ] = "http://www.ibm.com/xmlns/prod/cognos/reportService/202004/.session"
  437. headers["Referer"] = "http://localhost:9300/bi/pat/rsapp.htm"
  438. # headers['caf'] = self.caf
  439. soap = (
  440. self.templates["create_report"]
  441. .render(
  442. {
  443. "caf": self.caf,
  444. "cam": self.cam,
  445. "search_path": search_path,
  446. "report_name": report_name,
  447. "unstubbed": unstubbed,
  448. }
  449. )
  450. .encode("utf-8")
  451. )
  452. r = self.session.post(
  453. self.webservice + "v1/reports", data=soap, headers=headers
  454. )
  455. open("request_create.xml", "wb").write(r.request.body)
  456. print(r.status_code)
  457. print(r.text)
  458. def update_report(self, report, fullpath):
  459. search_path = self.request_search_path(report["id"])
  460. unstubbed = open(fullpath, "r").read()
  461. headers = self.get_report_headers(report["id"])
  462. # headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm'
  463. headers["caf"] = self.caf
  464. soap = self.templates["update_report"].render(
  465. {
  466. "caf": self.caf,
  467. "cam": self.cam,
  468. "search_path": search_path,
  469. "unstubbed": unstubbed,
  470. }
  471. ) # .encode("utf-8")
  472. r = self.session.post(
  473. self.webservice + "v1/reports", data=soap, headers=headers
  474. )
  475. # open('request_update.xml', 'wb').write(r.request.body)
  476. print(r.status_code)
  477. print(r.text)
  478. def create_folder(self, parent_id, folder_name):
  479. data = json.dumps({"defaultName": folder_name, "type": "folder"})
  480. res = self.session.post(
  481. f"{self.webservice}v1/objects/{parent_id}/items",
  482. headers=self.headers,
  483. data=data,
  484. )
  485. if res.status_code == 201:
  486. loc = res.headers.get("Location")
  487. folder_id = loc.split("/")[-1]
  488. self.folders.append({"id": folder_id, "name": folder_name})
  489. return folder_id
  490. def request_search_path(self, id):
  491. res = self.session.get(
  492. f"{self.webservice}v1/objects/{id}?fields=searchPath", headers=self.headers
  493. )
  494. return res.json()["data"][0]["searchPath"]
  495. if __name__ == "__main__":
  496. api = c11_api()
  497. api.login()
  498. folders = api.get_folders()
  499. # filename = "C:/GlobalCube/Tasks/gctools/logs/config/folders.json"
  500. # json.dump(folders, open(filename, "w"), indent=2)
  501. # filename = "C:/GlobalCube/Tasks/gctools/logs/config/reports.json"
  502. # json.dump(api.reports, open(filename, "w"), indent=2)
  503. # filename = "C:/GlobalCube/Tasks/gctools/logs/config/jobs.json"
  504. # json.dump(api.jobs, open(filename, "w"), indent=2)