123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- import base64
- import os
- import requests
- from requests_toolbelt.multipart import decoder
- import jinja2
- import json
- import re
- from bs4 import BeautifulSoup
- from .xml_prettify import prettify_xml
- import logging
- class c11_api:
- webservice = ""
- templates_dir = ""
- # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
- export_dir = "C:/GlobalCube/ReportOutput"
- log_dir = "C:/GlobalCube/Tasks/gctools/logs"
- headers = {}
- caf = ""
- cam = ""
- reports = []
- folders = []
- jobs = []
- def __init__(self, webservice="http://localhost:9300/bi/"):
- self.webservice = webservice
- self.templates_dir = os.path.dirname(__file__) + '/templates'
- self._env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(self.templates_dir),
- autoescape=jinja2.select_autoescape(['html', 'xml'])
- )
- self.templates = {
- 'get_report': self._env.get_template('get_report.xml'),
- 'create_report': self._env.get_template('create_report.xml'),
- 'update_report': self._env.get_template('update_report.xml'),
- 'get_package': self._env.get_template('get_package.xml'),
- }
- @staticmethod
- def generate_token(message_base64):
- version = "V1".encode("utf-8")
- header_len = 4
- msg = base64.b64decode(message_base64)[1:]
- chunks = []
- while len(msg) >= header_len:
- chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
- msg = msg[header_len:]
- chunks.append(msg[:chunk_len])
- msg = msg[chunk_len:]
- return base64.b64encode(version + chunks[-1]).decode("utf-8")
- def login(self, user='Global1', password='Cognos#11'):
- credentials = {
- "parameters": [
- {"name": "CAMNamespace", "value": "CognosEx"},
- {"name": "h_CAM_action", "value": "logonAs"},
- {"name": "CAMUsername", "value": user},
- {"name": "CAMPassword", "value": password}
- ]
- }
- self.session = requests.Session()
- r = self.session.get(self.webservice)
- self.headers = {'Content-Type': "application/json; charset=UTF-8", 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN')}
- r = self.session.post(self.webservice + "v1/login", data=json.dumps(credentials), headers=self.headers)
- self.caf = r.json()['cafContextId']
- self.cam = self.generate_token(r.cookies["usersessionid"])
- return self
- def get_folders(self):
- if len(self.folders) == 0:
- self.folders.append({'id': '_dot_public_folders', 'name': 'Team Content'})
- self.load_folder_list()
- return self.folders
- def load_folder_list(self, folder_id='_dot_public_folders', prefix='Team Content'):
- res = self.session.get(f"{self.webservice}v1/objects/{folder_id}/items", headers=self.headers)
- folder_list = sorted(res.json()['data'], key=lambda x: x['defaultName'])
- for f in folder_list:
- if f['type'] == 'folder':
- folder = {
- 'id': f['id'],
- 'name': prefix + '/' + f['defaultName'].replace('/', '-')
- }
- self.folders.append(folder)
- self.load_folder_list(folder['id'], folder['name'])
- elif f['type'] == 'report':
- report = {
- 'id': f['id'],
- 'name': f['defaultName'].replace('/', '-'),
- 'path': prefix
- }
- self.reports.append(report)
- elif f['type'] == 'jobDefinition':
- job = {
- 'id': f['id'],
- 'name': f['defaultName'],
- 'path': prefix
- }
- job['details'] = self.get_job_details(job['id'])
- self.jobs.append(job)
- def get_job_details(self, job_id):
- fields = ",".join([
- 'userInterfaces,disabled',
- 'runInAdvancedViewer,modificationTime,canBurst',
- 'defaultPortalAction',
- 'base.defaultName,tags,target.searchPath,target.disabled',
- 'options,base.options'
- ])
- res = self.session.get(f"{self.webservice}v1/objects/{job_id}?fields={fields}", headers=self.headers)
- job = res.json()['data'][0]
- job.pop('_meta', None)
- job.pop('id', None)
- job.pop('type', None)
- job.pop('defaultName', None)
- fields2 = ",".join([
- r'id,displaySequence,stepObject{defaultName}',
- r'stepObject{id},stepObject{parameters}',
- r'stepObject{canBurst},options,parameters'
- ])
- res = self.session.get(
- f"{self.webservice}v1/objects/{job_id}/items?types=jobStepDefinition&fields={fields2}",
- headers=self.headers
- )
- steps = res.json()['data']
- for s in steps:
- s.pop('_meta', None)
- if s['stepObject'] is not None:
- s['report_id'] = s['stepObject'][0]['id']
- s.pop('stepObject', None)
- job['steps'] = steps
- return job
- def get_report(self, report_id):
- self.get_folders()
- report = [r for r in self.reports if r['id'] == report_id]
- if len(report) == 0:
- return None
- report = report[0]
- if 'meta' not in report:
- report = self.get_report_specs(report)
- return report
- def get_reports_in_folder(self, folder, recursive=False, specs=False):
- self.get_folders()
- if recursive:
- res = [r for r in self.reports if r['path'].startswith(folder)]
- else:
- res = [r for r in self.reports if r['path'] == folder]
- if specs:
- return [self.get_report_specs(r) for r in res]
- return res
- def get_report_specs(self, report):
- headers = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report['id'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- soap = self.templates['get_report'].render(
- {"caf": self.caf, "cam": self.cam,
- "report": report, "format": 'XHTML',
- "prompt": 'true', "tracking": "", "params": {}}
- )
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- if r.status_code == 500:
- bs = BeautifulSoup(r.text, 'xml')
- report['error'] = bs.find_all('messageString')[0].string
- logging.error(report['error'])
- return report
- parts = decoder.MultipartDecoder.from_response(r).parts
- meta = {'required': {}, 'optional': {}}
- bs = BeautifulSoup(parts[1].content, 'lxml')
- for sv in bs.find_all('selectvalue'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
- for sv in bs.find_all('selectdate'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt.get('displayvalue', '')) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
- filename = self.log_dir + f"/config/{report['path']}/{report['name']}.json"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(meta, open(filename, 'w'), indent=2)
- report['meta'] = meta
- report['spec'] = parts[2].text
- path = report['path'].replace('Team Content/ReportOutput', '')
- report['filename'] = f"{self.export_dir}/{path}/{report['name']}.pdf"
- report['params'] = list(re.findall(r'\[([^\]]+)\]', report['filename']))
- for i, p in enumerate(report['params']):
- report['filename'] = report['filename'].replace('[' + p + ']', '{' + str(i) + '}')
- return report
- def request_unstubbed(self, report_id):
- report = self.get_report(report_id)
- if 'spec' not in report:
- return ''
- payload = json.dumps({'reportspec_stubbed': report['spec'], 'storeid': report['id']})
- headers = {
- 'Content-Type': 'application/json; charset=UTF-8',
- 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN'),
- 'authenticityToken': self.cam,
- 'X-UseRsConsumerMode': 'true',
- 'cafContextId': self.caf
- }
- r = self.session.post(self.webservice + 'v1/reports/unstubreport', data=payload, headers=headers)
- unstubbed = json.loads(r.text)['reportspec_full']
- unstubbed = re.sub(r' iid="[^"]*"', '', unstubbed)
- bs = BeautifulSoup(unstubbed, 'xml')
- for xa in bs.find_all('XMLAttributes'):
- if (
- xa.find_all('XMLAttribute', {'name': 'RS_dataType'}) or
- xa.find_all('XMLAttribute', {'name': 'RS_CreateExtendedDataItems'}) or
- xa.find_all('XMLAttribute', {'name': 'RS_legacyDrillDown'})
- ):
- continue
- if xa.find_all('XMLAttribute', {'name': 'supportsDefaultDataFormatting'}):
- for xa2 in xa.find_all('XMLAttribute'):
- if xa2.attrs['name'] != 'supportsDefaultDataFormatting':
- xa2.decompose()
- continue
- xa.decompose()
- for cti in bs.find_all('crosstabIntersection'):
- if len(list(cti.children)) == 0:
- cti.decompose()
- unstubbed_report = str(bs)
- unstubbed_report = prettify_xml(unstubbed_report).replace("'", '"')
- return unstubbed_report
- def get_report_headers(self, report_id=None):
- res = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- if report_id is not None:
- res['X-RsCMStoreID'] = report_id
- return res
- def request_file(self, report_id, params, format='PDF'):
- report = self.get_report(report_id)
- headers = self.get_report_headers(report_id)
- soap = self.templates['get_report'].render(
- {"caf": self.caf, "cam": self.cam,
- "report": report, "format": format,
- "prompt": 'false', "tracking": "", "params": params}
- ).encode("utf-8")
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- bs = BeautifulSoup(r.text, 'xml')
- if r.status_code == 200:
- try:
- parts = decoder.MultipartDecoder.from_response(r).parts
- except decoder.NonMultipartContentTypeException:
- return 500, 'Timeout'
- return 200, parts[1].content
- error = bs.find_all('messageString')[0].string
- logging.debug(error)
- return r.status_code, error
- def create_report(self, folder_id, fullpath):
- # self.session.get(self.webservice + 'v1/reports/templates?path=%2Fcontent%2Ffolder%5B%40name%3D%27Templates%27%5D/*[@objectClass=%27interactiveReport%27%20or%20@objectClass=%27report%27%20or%20@objectClass=%27reportTemplate%27]&maxResults=100&locale=de')
- # self.session.get(self.webservice + 'v1/reports/startupconfig?keys=supportedContentLocales,supportedCurrencies,supportedFonts,metadataInformationURI,glossaryURI&locale=de')
- headers = self.get_report_headers()
- soap = self.templates['get_package'].render(
- {"caf": self.caf, "cam": self.cam}
- ).encode("utf-8")
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- open('package.xml', 'wb').write(r.content)
- search_path = self.request_search_path(folder_id)
- report_name = os.path.basename(fullpath)
- unstubbed = open(fullpath, 'rb').read()
- headers['SOAPAction'] = 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/.session'
- headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm'
- # headers['caf'] = self.caf
- soap = self.templates['create_report'].render(
- {"caf": self.caf, "cam": self.cam,
- "search_path": search_path,
- "report_name": report_name, "unstubbed": unstubbed}
- ).encode("utf-8")
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- open('request_create.xml', 'wb').write(r.request.body)
- print(r.status_code)
- print(r.text)
- def update_report(self, report, fullpath):
- search_path = self.request_search_path(report['id'])
- unstubbed = open(fullpath, 'r').read()
- headers = self.get_report_headers(report['id'])
- # headers['Referer'] = 'http://localhost:9300/bi/pat/rsapp.htm'
- headers['caf'] = self.caf
- soap = self.templates['update_report'].render(
- {"caf": self.caf, "cam": self.cam,
- "search_path": search_path, "unstubbed": unstubbed}
- ) # .encode("utf-8")
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- # open('request_update.xml', 'wb').write(r.request.body)
- print(r.status_code)
- print(r.text)
- def create_folder(self, parent_id, folder_name):
- data = json.dumps({"defaultName": folder_name, "type": "folder"})
- res = self.session.post(
- f"{self.webservice}v1/objects/{parent_id}/items",
- headers=self.headers,
- data=data
- )
- if res.status_code == 201:
- loc = res.headers.get('Location')
- folder_id = loc.split('/')[-1]
- self.folders.append({'id': folder_id, 'name': folder_name})
- return folder_id
- def request_search_path(self, id):
- res = self.session.get(f"{self.webservice}v1/objects/{id}?fields=searchPath", headers=self.headers)
- return res.json()['data'][0]['searchPath']
- if __name__ == '__main__':
- api = c11_api()
- api.login()
- folders = api.get_folders()
- filename = 'C:/GlobalCube/Tasks/gctools/logs/config/folders.json'
- json.dump(folders, open(filename, 'w'), indent=2)
- filename = 'C:/GlobalCube/Tasks/gctools/logs/config/reports.json'
- json.dump(api.reports, open(filename, 'w'), indent=2)
- filename = 'C:/GlobalCube/Tasks/gctools/logs/config/jobs.json'
- json.dump(api.jobs, open(filename, 'w'), indent=2)
|