123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- import base64
- import os
- import time
- import requests
- from requests_toolbelt.multipart import decoder
- import jinja2
- import json
- import re
- from bs4 import BeautifulSoup
- from xml_prettify import prettify_xml
- class c11_api:
- webservice = ""
- templates_dir = "tools/cognos11/templates"
- # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
- export_dir = "C:/GlobalCube/ReportOutput"
- log_dir = "C:/GlobalCube/Tasks/gctools/logs"
- headers = {}
- caf = ""
- cam = ""
- reports = None
- def __init__(self, webservice="http://localhost:9300/bi/"):
- self.webservice = webservice
- self._env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(self.templates_dir),
- autoescape=jinja2.select_autoescape(['html', 'xml'])
- )
- self.template = self._env.get_template('get_report.xml')
- @staticmethod
- def generate_token(message_base64):
- version = "V1".encode("utf-8")
- header_len = 4
- msg = base64.b64decode(message_base64)[1:]
- chunks = []
- while len(msg) >= header_len:
- chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
- msg = msg[header_len:]
- chunks.append(msg[:chunk_len])
- msg = msg[chunk_len:]
- return base64.b64encode(version + chunks[-1]).decode("utf-8")
- def login(self, user='Global1', password='Cognos#11'):
- credentials = {
- "parameters": [
- {"name": "CAMNamespace", "value": "CognosEx"},
- {"name": "h_CAM_action", "value": "logonAs"},
- {"name": "CAMUsername", "value": user},
- {"name": "CAMPassword", "value": password}
- ]
- }
- self.session = requests.Session()
- r = self.session.get(self.webservice)
- self.headers = {'Content-Type': "application/json; charset=UTF-8", 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN')}
- r = self.session.post(self.webservice + "v1/login", data=json.dumps(credentials), headers=self.headers)
- self.caf = r.json()['cafContextId']
- self.cam = self.generate_token(r.cookies["usersessionid"])
- return r.status_code
- def report_list(self):
- # "v1/objects/_dot_public_folders/items"
- fields = "defaultName|id|ancestors"
- filter = "type|analysis|interactiveReport|powerPlayReport|powerPlay8Report|powerPlay8ReportView|query|report|reportTemplate"
- res = self.session.get(
- f"{self.webservice}v1/search/cm?fields={fields}&results=1000&query=.&hide_internal=all&filter={filter}",
- headers=self.headers
- )
- # filename = self.log_dir + '/reports_error.log'
- # os.makedirs(os.path.dirname(filename), exist_ok=True)
- # with open(filename, "wb") as f:
- # f.write(res.content)
- self.reports = res.json()['results']
- for r in self.reports:
- r['path'] = "/".join([a['defaultName'].replace('/', '_') for a in r['ancestors']])
- r['name'] = r['defaultName']
- del(r['ancestors'])
- del(r['defaultName'])
- return self.reports
- def get_report(self, report_id):
- if self.reports is None:
- self.reports = self.report_list()
- report = [r for r in self.reports if r['id'] == report_id]
- if len(report) == 0:
- return None
- report = report[0]
- if 'meta' not in report:
- report = self.get_report_specs(report)
- return report
- def get_reports_in_folder(self, folder, recursive=False):
- if self.reports is None:
- self.reports = self.report_list()
- if recursive:
- return [r for r in self.reports if r['path'].startswith(folder)]
- return [r for r in self.reports if r['path'] == folder]
- def get_report_specs(self, report):
- headers = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report['id'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- soap = self.template.render({"caf": self.caf, "cam": self.cam,
- "report": report, "format": 'XHTML',
- "prompt": 'true', "tracking": "", "params": {}})
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- if r.status_code == 500:
- bs = BeautifulSoup(r.text, 'xml')
- report['error'] = bs.find_all('messageString')[0].string
- # time.sleep(2)
- return report
- parts = decoder.MultipartDecoder.from_response(r).parts
- # for i, p in enumerate(parts):
- # with open(f"export/{report['report']}_{i}.xml", "w") as f:
- # f.write(p.text.replace('\x81', ''))
- meta = {'required': {}, 'optional': {}}
- bs = BeautifulSoup(parts[1].content, 'lxml')
- # print(bs.prettify())
- for sv in bs.find_all('selectvalue'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
- for sv in bs.find_all('selectdate'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt.get('displayvalue', '')) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
- filename = self.log_dir + f"/config/{report['path']}/{report['name']}.json"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(meta, open(filename, 'w'), indent=2)
- report['meta'] = meta
- report['spec'] = parts[2].text
- return report
- def export_unstubbed(self, report_id):
- report = self.get_report(report_id)
- if 'spec' not in report:
- return False
- payload = json.dumps({'reportspec_stubbed': report['spec'], 'storeid': report['id']})
- headers = {
- 'Content-Type': 'application/json; charset=UTF-8',
- 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN'),
- 'authenticityToken': self.cam,
- 'X-UseRsConsumerMode': 'true',
- 'cafContextId': self.caf
- }
- r = self.session.post(self.webservice + 'v1/reports/unstubreport', data=payload, headers=headers)
- unstubbed = json.loads(r.text)['reportspec_full']
- unstubbed = re.sub(r' iid="[^"]*"', '', unstubbed)
- bs = BeautifulSoup(unstubbed, 'xml')
- for xa in bs.find_all('XMLAttributes'):
- if (
- xa.find_all('XMLAttribute', {'name': 'RS_dataType'}) or
- xa.find_all('XMLAttribute', {'name': 'RS_CreateExtendedDataItems'}) or
- xa.find_all('XMLAttribute', {'name': 'RS_legacyDrillDown'})
- ):
- continue
- if xa.find_all('XMLAttribute', {'name': 'supportsDefaultDataFormatting'}):
- for xa2 in xa.find_all('XMLAttribute'):
- if xa2.attrs['name'] != 'supportsDefaultDataFormatting':
- xa2.decompose()
- continue
- xa.decompose()
- for cti in bs.find_all('crosstabIntersection'):
- if len(list(cti.children)) == 0:
- cti.decompose()
- unstubbed_report = str(bs).replace("'", ''')
- unstubbed_report = prettify_xml(unstubbed_report)
- filename = self.log_dir + f"/config/{report['path']}/{report['name']}.xml"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(unstubbed_report)
- return unstubbed_report
- def get_report_headers(self, report_id):
- return {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report_id,
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- def request_file(self, report_id, params, filename, format='PDF'):
- report = self.get_report(report_id)
- headers = self.get_report_headers(report_id)
- soap = self.template.render({"caf": self.caf, "cam": self.cam,
- "report": report, "format": format,
- "prompt": 'false', "tracking": "", "params": params}).encode("utf-8")
- try:
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- except UnicodeEncodeError:
- filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(soap)
- return False
- if r.status_code == 200:
- parts = decoder.MultipartDecoder.from_response(r).parts
- filename = self.export_dir + filename
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(parts[1].content)
- else:
- filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(r.content)
- return True
- if __name__ == '__main__':
- api = c11_api()
- api.login()
|