c11_api.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import base64
  2. import os
  3. import time
  4. import requests
  5. from requests_toolbelt.multipart import decoder
  6. import jinja2
  7. import json
  8. import re
  9. from bs4 import BeautifulSoup
  10. from xml_prettify import prettify_xml
  11. class c11_api:
  12. webservice = ""
  13. templates_dir = "tools/cognos11/templates"
  14. # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
  15. export_dir = "C:/GlobalCube/ReportOutput"
  16. log_dir = "C:/GlobalCube/Tasks/gctools/logs"
  17. headers = {}
  18. caf = ""
  19. cam = ""
  20. reports = None
  21. def __init__(self, webservice="http://localhost:9300/bi/"):
  22. self.webservice = webservice
  23. self._env = jinja2.Environment(
  24. loader=jinja2.FileSystemLoader(self.templates_dir),
  25. autoescape=jinja2.select_autoescape(['html', 'xml'])
  26. )
  27. self.template = self._env.get_template('get_report.xml')
  28. @staticmethod
  29. def generate_token(message_base64):
  30. version = "V1".encode("utf-8")
  31. header_len = 4
  32. msg = base64.b64decode(message_base64)[1:]
  33. chunks = []
  34. while len(msg) >= header_len:
  35. chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
  36. msg = msg[header_len:]
  37. chunks.append(msg[:chunk_len])
  38. msg = msg[chunk_len:]
  39. return base64.b64encode(version + chunks[-1]).decode("utf-8")
  40. def login(self, user='Global1', password='Cognos#11'):
  41. credentials = {
  42. "parameters": [
  43. {"name": "CAMNamespace", "value": "CognosEx"},
  44. {"name": "h_CAM_action", "value": "logonAs"},
  45. {"name": "CAMUsername", "value": user},
  46. {"name": "CAMPassword", "value": password}
  47. ]
  48. }
  49. self.session = requests.Session()
  50. r = self.session.get(self.webservice)
  51. self.headers = {'Content-Type': "application/json; charset=UTF-8", 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN')}
  52. r = self.session.post(self.webservice + "v1/login", data=json.dumps(credentials), headers=self.headers)
  53. self.caf = r.json()['cafContextId']
  54. self.cam = self.generate_token(r.cookies["usersessionid"])
  55. return r.status_code
  56. def report_list(self):
  57. # "v1/objects/_dot_public_folders/items"
  58. fields = "defaultName|id|ancestors"
  59. filter = "type|analysis|interactiveReport|powerPlayReport|powerPlay8Report|powerPlay8ReportView|query|report|reportTemplate"
  60. res = self.session.get(
  61. f"{self.webservice}v1/search/cm?fields={fields}&results=1000&query=.&hide_internal=all&filter={filter}",
  62. headers=self.headers
  63. )
  64. # filename = self.log_dir + '/reports_error.log'
  65. # os.makedirs(os.path.dirname(filename), exist_ok=True)
  66. # with open(filename, "wb") as f:
  67. # f.write(res.content)
  68. self.reports = res.json()['results']
  69. for r in self.reports:
  70. r['path'] = "/".join([a['defaultName'].replace('/', '_') for a in r['ancestors']])
  71. r['name'] = r['defaultName']
  72. del(r['ancestors'])
  73. del(r['defaultName'])
  74. return self.reports
  75. def get_report(self, report_id):
  76. if self.reports is None:
  77. self.reports = self.report_list()
  78. report = [r for r in self.reports if r['id'] == report_id]
  79. if len(report) == 0:
  80. return None
  81. report = report[0]
  82. if 'meta' not in report:
  83. report = self.get_report_specs(report)
  84. return report
  85. def get_reports_in_folder(self, folder, recursive=False):
  86. if self.reports is None:
  87. self.reports = self.report_list()
  88. if recursive:
  89. return [r for r in self.reports if r['path'].startswith(folder)]
  90. return [r for r in self.reports if r['path'] == folder]
  91. def get_report_specs(self, report):
  92. headers = {
  93. 'Content-Type': 'text/xml; charset=UTF-8',
  94. 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
  95. 'X-RsCMStoreID': report['id'],
  96. 'X-UseRsConsumerMode': 'true',
  97. 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
  98. }
  99. soap = self.template.render({"caf": self.caf, "cam": self.cam,
  100. "report": report, "format": 'XHTML',
  101. "prompt": 'true', "tracking": "", "params": {}})
  102. r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
  103. if r.status_code == 500:
  104. bs = BeautifulSoup(r.text, 'xml')
  105. report['error'] = bs.find_all('messageString')[0].string
  106. # time.sleep(2)
  107. return report
  108. parts = decoder.MultipartDecoder.from_response(r).parts
  109. # for i, p in enumerate(parts):
  110. # with open(f"export/{report['report']}_{i}.xml", "w") as f:
  111. # f.write(p.text.replace('\x81', ''))
  112. meta = {'required': {}, 'optional': {}}
  113. bs = BeautifulSoup(parts[1].content, 'lxml')
  114. # print(bs.prettify())
  115. for sv in bs.find_all('selectvalue'):
  116. k = sv['parameter']
  117. req = 'required' if sv['required'] == 'true' else 'optional'
  118. v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')])
  119. meta[req][k] = v
  120. for sv in bs.find_all('selectdate'):
  121. k = sv['parameter']
  122. req = 'required' if sv['required'] == 'true' else 'optional'
  123. v = dict([(opt['usevalue'], opt.get('displayvalue', '')) for opt in sv.find_all('selectoption')])
  124. meta[req][k] = v
  125. filename = self.log_dir + f"/config/{report['path']}/{report['name']}.json"
  126. os.makedirs(os.path.dirname(filename), exist_ok=True)
  127. json.dump(meta, open(filename, 'w'), indent=2)
  128. report['meta'] = meta
  129. report['spec'] = parts[2].text
  130. return report
  131. def export_unstubbed(self, report_id):
  132. report = self.get_report(report_id)
  133. if 'spec' not in report:
  134. return False
  135. payload = json.dumps({'reportspec_stubbed': report['spec'], 'storeid': report['id']})
  136. headers = {
  137. 'Content-Type': 'application/json; charset=UTF-8',
  138. 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN'),
  139. 'authenticityToken': self.cam,
  140. 'X-UseRsConsumerMode': 'true',
  141. 'cafContextId': self.caf
  142. }
  143. r = self.session.post(self.webservice + 'v1/reports/unstubreport', data=payload, headers=headers)
  144. unstubbed = json.loads(r.text)['reportspec_full']
  145. unstubbed = re.sub(r' iid="[^"]*"', '', unstubbed)
  146. bs = BeautifulSoup(unstubbed, 'xml')
  147. for xa in bs.find_all('XMLAttributes'):
  148. if (
  149. xa.find_all('XMLAttribute', {'name': 'RS_dataType'}) or
  150. xa.find_all('XMLAttribute', {'name': 'RS_CreateExtendedDataItems'}) or
  151. xa.find_all('XMLAttribute', {'name': 'RS_legacyDrillDown'})
  152. ):
  153. continue
  154. if xa.find_all('XMLAttribute', {'name': 'supportsDefaultDataFormatting'}):
  155. for xa2 in xa.find_all('XMLAttribute'):
  156. if xa2.attrs['name'] != 'supportsDefaultDataFormatting':
  157. xa2.decompose()
  158. continue
  159. xa.decompose()
  160. for cti in bs.find_all('crosstabIntersection'):
  161. if len(list(cti.children)) == 0:
  162. cti.decompose()
  163. unstubbed_report = str(bs).replace("'", ''')
  164. unstubbed_report = prettify_xml(unstubbed_report)
  165. filename = self.log_dir + f"/config/{report['path']}/{report['name']}.xml"
  166. os.makedirs(os.path.dirname(filename), exist_ok=True)
  167. with open(filename, "w") as f:
  168. f.write(unstubbed_report)
  169. return unstubbed_report
  170. def get_report_headers(self, report_id):
  171. return {
  172. 'Content-Type': 'text/xml; charset=UTF-8',
  173. 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
  174. 'X-RsCMStoreID': report_id,
  175. 'X-UseRsConsumerMode': 'true',
  176. 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
  177. }
  178. def request_file(self, report_id, params, filename, format='PDF'):
  179. report = self.get_report(report_id)
  180. headers = self.get_report_headers(report_id)
  181. soap = self.template.render({"caf": self.caf, "cam": self.cam,
  182. "report": report, "format": format,
  183. "prompt": 'false', "tracking": "", "params": params}).encode("utf-8")
  184. try:
  185. r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
  186. except UnicodeEncodeError:
  187. filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
  188. os.makedirs(os.path.dirname(filename), exist_ok=True)
  189. with open(filename, "w") as f:
  190. f.write(soap)
  191. return False
  192. if r.status_code == 200:
  193. parts = decoder.MultipartDecoder.from_response(r).parts
  194. filename = self.export_dir + filename
  195. os.makedirs(os.path.dirname(filename), exist_ok=True)
  196. with open(filename, "wb") as f:
  197. f.write(parts[1].content)
  198. else:
  199. filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
  200. os.makedirs(os.path.dirname(filename), exist_ok=True)
  201. with open(filename, "wb") as f:
  202. f.write(r.content)
  203. return True
  204. if __name__ == '__main__':
  205. api = c11_api()
  206. api.login()