import base64 import os import requests from requests_toolbelt.multipart import decoder import jinja2 import json import re from bs4 import BeautifulSoup from .xml_prettify import prettify_xml import logging class c11_api: webservice = "" templates_dir = "" # templates_dir = "C:/GlobalCube/Tasks/gctools/templates" export_dir = "C:/GlobalCube/ReportOutput" log_dir = "C:/GlobalCube/Tasks/gctools/logs" headers = {} caf = "" cam = "" reports = [] folders = [] jobs = [] def __init__(self, webservice="http://localhost:9300/bi/"): self.webservice = webservice self.templates_dir = os.path.dirname(__file__) + '/templates' self._env = jinja2.Environment( loader=jinja2.FileSystemLoader(self.templates_dir), autoescape=jinja2.select_autoescape(['html', 'xml']) ) self.templates = { 'get_report': self._env.get_template('get_report.xml'), 'create_report': self._env.get_template('create_report.xml'), 'update_report': self._env.get_template('update_report.xml'), 'get_package': self._env.get_template('get_package.xml'), } @staticmethod def generate_token(message_base64): version = "V1".encode("utf-8") header_len = 4 msg = base64.b64decode(message_base64)[1:] chunks = [] while len(msg) >= header_len: chunk_len = int.from_bytes(msg[:header_len], byteorder="little") msg = msg[header_len:] chunks.append(msg[:chunk_len]) msg = msg[chunk_len:] return base64.b64encode(version + chunks[-1]).decode("utf-8") def login(self, user='Global1', password='Cognos#11'): credentials = { "parameters": [ {"name": "CAMNamespace", "value": "CognosEx"}, {"name": "h_CAM_action", "value": "logonAs"}, {"name": "CAMUsername", "value": user}, {"name": "CAMPassword", "value": password} ] } self.session = requests.Session() r = self.session.get(self.webservice) self.headers = {'Content-Type': "application/json; charset=UTF-8", 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN')} r = self.session.post(self.webservice + "v1/login", data=json.dumps(credentials), headers=self.headers) self.caf = r.json()['cafContextId'] self.cam = self.generate_token(r.cookies["usersessionid"]) return self def get_folders(self): if len(self.folders) == 0: self.folders.append({'id': '_dot_public_folders', 'name': 'Team Content'}) self.load_folder_list() return self.folders def load_folder_list(self, folder_id='_dot_public_folders', prefix='Team Content'): res = self.session.get(f"{self.webservice}v1/objects/{folder_id}/items", headers=self.headers) folder_list = sorted(res.json()['data'], key=lambda x: x['defaultName']) for f in folder_list: if f['type'] == 'folder': folder = { 'id': f['id'], 'name': prefix + '/' + f['defaultName'].replace('/', '-') } self.folders.append(folder) self.load_folder_list(folder['id'], folder['name']) elif f['type'] == 'report': report = { 'id': f['id'], 'name': f['defaultName'].replace('/', '-'), 'path': prefix } self.reports.append(report) elif f['type'] == 'jobDefinition': job = { 'id': f['id'], 'name': f['defaultName'], 'path': prefix } job['details'] = self.get_job_details(job['id']) self.jobs.append(job) def get_job_details(self, job_id): fields = ",".join([ 'userInterfaces,disabled', 'runInAdvancedViewer,modificationTime,canBurst', 'defaultPortalAction', 'base.defaultName,tags,target.searchPath,target.disabled', 'options,base.options' ]) res = self.session.get(f"{self.webservice}v1/objects/{job_id}?fields={fields}", headers=self.headers) job = res.json()['data'][0] job.pop('_meta', None) job.pop('id', None) job.pop('type', None) job.pop('defaultName', None) fields2 = ",".join([ r'id,displaySequence,stepObject{defaultName}', r'stepObject{id},stepObject{parameters}', r'stepObject{canBurst},options,parameters' ]) res = self.session.get( f"{self.webservice}v1/objects/{job_id}/items?types=jobStepDefinition&fields={fields2}", headers=self.headers ) steps = res.json()['data'] for s in steps: s.pop('_meta', None) if s['stepObject'] is not None: s['report_id'] = s['stepObject'][0]['id'] s.pop('stepObject', None) job['steps'] = steps return job def get_report(self, report_id): self.get_folders() report = [r for r in self.reports if r['id'] == report_id] if len(report) == 0: return None report = report[0] if 'meta' not in report: report = self.get_report_specs(report) return report def get_reports_in_folder(self, folder, recursive=False, specs=False): self.get_folders() if recursive: res = [r for r in self.reports if r['path'].startswith(folder)] else: res = [r for r in self.reports if r['path'] == folder] if specs: return [self.get_report_specs(r) for r in res] return res def get_report_specs(self, report): headers = { 'Content-Type': 'text/xml; charset=UTF-8', 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'], 'X-RsCMStoreID': report['id'], 'X-UseRsConsumerMode': 'true', 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/' } soap = self.templates['get_report'].render( {"caf": self.caf, "cam": self.cam, "report": report, "format": 'XHTML', "prompt": 'true', "tracking": "", "params": {}} ) r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers) if r.status_code == 500: bs = BeautifulSoup(r.text, 'xml') report['error'] = bs.find_all('messageString')[0].string logging.error(report['error']) return report parts = decoder.MultipartDecoder.from_response(r).parts meta = {'required': {}, 'optional': {}} bs = BeautifulSoup(parts[1].content, 'lxml') for sv in bs.find_all('selectvalue'): k = sv['parameter'] req = 'required' if sv['required'] == 'true' else 'optional' v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')]) meta[req][k] = v for sv in bs.find_all('selectdate'): k = sv['parameter'] req = 'required' if sv['required'] == 'true' else 'optional' v = dict([(opt['usevalue'], opt.get('displayvalue', '')) for opt in sv.find_all('selectoption')]) meta[req][k] = v filename = self.log_dir + f"/config/{report['path']}/{report['name']}.json" os.makedirs(os.path.dirname(filename), exist_ok=True) json.dump(meta, open(filename, 'w'), indent=2) report['meta'] = meta report['spec'] = parts[2].text path = report['path'].replace('Team Content/ReportOutput', '') report['filename'] = f"{self.export_dir}/{path}/{report['name']}.pdf" report['params'] = list(re.findall(r'\[([^\]]+)\]', report['filename'])) for i, p in enumerate(report['params']): report['filename'] = report['filename'].replace('[' + p + ']', '{' + str(i) + '}') return report def request_unstubbed(self, report_id): report = self.get_report(report_id) if 'spec' not in report: return '' payload = json.dumps({'reportspec_stubbed': report['spec'], 'storeid': report['id']}) headers = { 'Content-Type': 'application/json; charset=UTF-8', 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN'), 'authenticityToken': self.cam, 'X-UseRsConsumerMode': 'true', 'cafContextId': self.caf } r = self.session.post(self.webservice + 'v1/reports/unstubreport', data=payload, headers=headers) unstubbed = json.loads(r.text)['reportspec_full'] unstubbed = re.sub(r' iid="[^"]*"', '', unstubbed) bs = BeautifulSoup(unstubbed, 'xml') for xa in bs.find_all('XMLAttributes'): if ( xa.find_all('XMLAttribute', {'name': 'RS_dataType'}) or xa.find_all('XMLAttribute', {'name': 'RS_CreateExtendedDataItems'}) or xa.find_all('XMLAttribute', {'name': 'RS_legacyDrillDown'}) ): continue if xa.find_all('XMLAttribute', {'name': 'supportsDefaultDataFormatting'}): for xa2 in xa.find_all('XMLAttribute'): if xa2.attrs['name'] != 'supportsDefaultDataFormatting': xa2.decompose() continue xa.decompose() for cti in bs.find_all('crosstabIntersection'): if len(list(cti.children)) == 0: cti.decompose() unstubbed_report = str(bs) unstubbed_report = prettify_xml(unstubbed_report).replace("'", '"') return unstubbed_report def get_report_headers(self, report_id=None): res = { 'Content-Type': 'text/xml; charset=UTF-8', 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'], 'X-UseRsConsumerMode': 'true', 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/' } if report_id is not None: res['X-RsCMStoreID'] = report_id return res def request_file(self, report_id, params, format='PDF'): report = self.get_report(report_id) headers = self.get_report_headers(report_id) soap = self.templates['get_report'].render( {"caf": self.caf, "cam": self.cam, "report": report, "format": format, "prompt": 'false', "tracking": "", "params": params} ).encode("utf-8") r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers) bs = BeautifulSoup(r.text, 'xml') if r.status_code == 200: try: parts = decoder.MultipartDecoder.from_response(r).parts except decoder.NonMultipartContentTypeException: return 500, 'Timeout' return 200, parts[1].content error = bs.find_all('messageString')[0].string logging.debug(error) return r.status_code, error def create_report(self, folder_id, fullpath): # self.session.get(self.webservice + 'v1/reports/templates?path=%2Fcontent%2Ffolder%5B%40name%3D%27Templates%27%5D/*[@objectClass=%27interactiveReport%27%20or%20@objectClass=%27report%27%20or%20@objectClass=%27reportTemplate%27]&maxResults=100&locale=de') # self.session.get(self.webservice + 'v1/reports/startupconfig?keys=supportedContentLocales,supportedCurrencies,supportedFonts,metadataInformationURI,glossaryURI&locale=de') headers = self.get_report_headers() soap = self.templates['get_package'].render( {"caf": self.caf, "cam": self.cam} ).encode("utf-8") r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers) open('package.xml', 'wb').write(r.content) search_path = self.request_search_path(folder_id) report_name = os.path.basename(fullpath) unstubbed = open(fullpath, 'rb').read() headers['SOAPAction'] = 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/.session' headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm' # headers['caf'] = self.caf soap = self.templates['create_report'].render( {"caf": self.caf, "cam": self.cam, "search_path": search_path, "report_name": report_name, "unstubbed": unstubbed} ).encode("utf-8") r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers) open('request_create.xml', 'wb').write(r.request.body) print(r.status_code) print(r.text) def update_report(self, report, fullpath): search_path = self.request_search_path(report['id']) unstubbed = open(fullpath, 'r').read() headers = self.get_report_headers(report['id']) # headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm' headers['caf'] = self.caf soap = self.templates['update_report'].render( {"caf": self.caf, "cam": self.cam, "search_path": search_path, "unstubbed": unstubbed} ) # .encode("utf-8") r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers) # open('request_update.xml', 'wb').write(r.request.body) print(r.status_code) print(r.text) def create_folder(self, parent_id, folder_name): data = json.dumps({"defaultName": folder_name, "type": "folder"}) res = self.session.post( f"{self.webservice}v1/objects/{parent_id}/items", headers=self.headers, data=data ) if res.status_code == 201: loc = res.headers.get('Location') folder_id = loc.split('/')[-1] self.folders.append({'id': folder_id, 'name': folder_name}) return folder_id def request_search_path(self, id): res = self.session.get(f"{self.webservice}v1/objects/{id}?fields=searchPath", headers=self.headers) return res.json()['data'][0]['searchPath'] if __name__ == '__main__': api = c11_api() api.login() folders = api.get_folders() filename = 'C:/GlobalCube/Tasks/gctools/logs/config/folders.json' json.dump(folders, open(filename, 'w'), indent=2) filename = 'C:/GlobalCube/Tasks/gctools/logs/config/reports.json' json.dump(api.reports, open(filename, 'w'), indent=2) filename = 'C:/GlobalCube/Tasks/gctools/logs/config/jobs.json' json.dump(api.jobs, open(filename, 'w'), indent=2)