123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- import base64
- import os
- import time
- import requests
- from requests_toolbelt.multipart import decoder
- import pandas as pd
- import jinja2
- import json
- import re
- from bs4 import BeautifulSoup
- class ca_webscraper:
- webservice = "http://localhost:9300/bi/"
- templates_dir = "C:/Projekte/Python/cognos11/templates"
- export_dir = "C:/Projekte/Python/export"
- log_dir = "C:/Projekte/Python/logs"
- credentials = '{"parameters":[{"name":"CAMNamespace","value":"CognosEx"},{"name":"h_CAM_action","value":"logonAs"},{"name":"CAMUsername","value":"Global1"},{"name":"CAMPassword","value":"Cognos#11"}]}'
- headers = {}
- caf = ""
- cam = ""
- def __init__(self):
- self._env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(self.templates_dir),
- autoescape=jinja2.select_autoescape(['html', 'xml'])
- )
- self.template = self._env.get_template('get_report.xml')
- def generate_token(self, message_base64):
- version = "V1".encode("utf-8")
- header_len = 4
- msg = base64.b64decode(message_base64)[1:]
- chunks = []
- while len(msg) >= header_len:
- chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
- msg = msg[header_len:]
- chunks.append(msg[:chunk_len])
- msg = msg[chunk_len:]
- return base64.b64encode(version + chunks[-1]).decode("utf-8")
- def login(self):
- self.session = requests.Session()
- r = self.session.get(self.webservice)
- self.headers = {'Content-Type': "application/json; charset=UTF-8", 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN')}
- r = self.session.post(self.webservice + "v1/login", data=self.credentials, headers=self.headers)
- self.caf = r.json()['cafContextId']
- self.cam = self.generate_token(r.cookies["usersessionid"])
- return r.status_code
- def report_list(self):
-
- filter = "type|analysis|interactiveReport|powerPlayReport|powerPlay8Report|powerPlay8ReportView|query|report|reportTemplate"
- res = self.session.get(self.webservice + "v1/search/cm?fields=defaultName|id|ancestors&results=1000&query=.&hide_internal=all&filter=" + filter, headers=self.headers)
- self.reports = res.json()['results']
- for r in self.reports:
- r['path'] = "/".join([a['defaultName'].replace('/', '_') for a in r['ancestors']])
- r['name'] = r['defaultName']
- del(r['ancestors'])
- del(r['defaultName'])
- filename = self.log_dir + '/config/reports.json'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(self.reports, open(filename, 'w'), indent=2)
- return self.reports
- def export_folder(self, folder):
- reports = [r for r in self.reports if r['path'].startswith(folder)]
- for r in reports:
- print(r['name'])
-
- path = r['path'].replace(folder, '')
- r['filename'] = f"{path}/{r['name']}.pdf"
- r['params'] = list(re.findall(r'\[([^\]]+)\]', r['filename']))
- for i, p in enumerate(r['params']):
- r['filename'] = r['filename'].replace('[' + p + ']', '{' + str(i) + '}')
- self.export(r, 'PDF')
- def export(self, report, format='XML'):
- headers = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report['id'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- soap = self.template.render({"caf": self.caf, "cam": self.cam,
- "report": report, "format": 'XHTML',
- "prompt": 'true', "tracking": "", "params": {}})
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- parts = decoder.MultipartDecoder.from_response(r).parts
-
-
-
- meta = {'required': {}, 'optional': {}}
- bs = BeautifulSoup(parts[1].content, 'lxml')
-
- for sv in bs.find_all('selectvalue'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
-
-
-
-
-
-
-
-
- filename = self.log_dir + f"/config/{report['name']}.json"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(meta, open(filename, 'w'), indent=2)
-
- if format == 'PDF':
- return self.export_pdf(report, meta)
-
- payload = json.dumps({'reportspec_stubbed': parts[2].text, 'storeid': report['id']})
- token = re.findall(r'<bus:authenticityToken xsi:type="xs:base64Binary">(.*)</bus:authenticityToken>', parts[0].text)[0]
- headers = {
- 'Content-Type': 'application/json; charset=UTF-8',
- 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN'),
- 'authenticityToken': token,
- 'X-UseRsConsumerMode': 'true',
- 'cafContextId': self.caf
- }
- r = self.session.post(self.webservice + 'v1/reports/unstubreport', data=payload, headers=headers)
- unstubbed = json.loads(r.content.decode('latin-1'))['reportspec_full']
- unstubbed = re.sub(r' iid="[^"]*"', '', unstubbed).replace('\x9f', '').replace('\x96', '')
- bs = BeautifulSoup(unstubbed, 'xml')
- for xa in bs.find_all('XMLAttributes'):
- xa.decompose()
- filename = self.export_dir + f"unstubbed/{report['name']}.xml"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(bs.prettify())
- return soap
- def export_pdf(self, report, meta):
- headers = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report['id'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
-
- if len(report['params']) == 0:
- params = {}
- filename = report['filename']
- self.request_file(report, headers, params, filename)
- return True
- if len(report['params']) == 1:
- key1 = report['params'][0]
- for k1, v1 in meta['optional'][key1].items():
- filename = report['filename'].format(v1)
- params = {key1: {k1: v1}}
- self.request_file(report, headers, params, filename)
- return True
- if len(report['params']) == 2:
- key1, key2 = report['params']
- for k1, v1 in meta['optional'][key1].items():
- for k2, v2 in meta['optional'][key2].items():
- filename = report['filename'].format(v1, v2)
- params = {key1: {k1: v1}, key2: {k2: v2}}
- self.request_file(report, headers, params, filename)
- return True
- def request_file(self, report, headers, params, filename):
- soap = self.template.render({"caf": self.caf, "cam": self.cam,
- "report": report, "format": 'PDF',
- "prompt": 'false', "tracking": "", "params": params})
- try:
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- except UnicodeEncodeError:
- filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(soap)
- return False
- if r.status_code == 200:
- parts = decoder.MultipartDecoder.from_response(r).parts
- filename = self.export_dir + filename
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(parts[1].content)
- else:
- filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(r.content)
- return True
- def admin(self):
- r = self.session.get(self.webservice + "v1/disp?m_p_owner=&changed_m_p_owner=0&changed_genprop=0&so.select=&so.return.m=portal%2Fproperties_general.xts&so.defaultLocation=&so.defaultObject=&ro_name=false&origDefLang=de&m_email=&m_defaultName=&m_t_default_name_de=CARLO_F_Belege&m_t_default_description_de=&m_t_default_screenTip_de=&ifrmcmd=save&m_p_disabled=false&m_p_hidden=false&icon_radio=standard&m_transloc=de&pty_activeLang=de&pty_deactivLang=&pty_del=&pty_add=&pty_name=CARLO_F_Belege&pty_scrt=&pty_desc=&m_new_class=&b_action=xts.run&sharedPagesChanged=&from_tool=true&backURL=%2Fbi%2Fv1%2Fdisp%3Fb_action%3Dxts.run%26m%3Dportal%2Flegacy_tools%2Ftools_directory.xts%26m_pathID%3Di339AF66BADEC411E943590402582B75B%26m_path%3DCAMID%28%2522%253a%2522%29%252fdataSource%255b%2540name%253d%2527CARLO_F_Belege%2527%255d%26tool_tab%3Dd&m_selectedPage=&m_classSubtype=&m_obj=CAMID%28%22%3A%22%29%2FdataSource%5B%40name%3D%27CARLO_F_Belege%27%5D%2FdataSourceConnection%5B%40name%3D%27CARLO_F_Belege%27%5D&b_report_type=&encoding=UTF-8&m=portal%2Fproperties_connection.xts&m_class=dataSourceConnection&m_name=CARLO_F_Belege&ui.cafcontextid=CAFW000000a0Q0FGQTYwMDAwMDAwMDlBaFFBQUFERWpZV1g4bEExbmlJd29ualF1cEgwWVVTeGtnY0FBQUJUU0VFdE1qVTJJQUFBQUxha3gqeHQ5TXN3Ukw2dGhjMTJVRzN1NVhaMWVzNU5FLXRvWXI1VzlwYTE0NDI0NzN8cHM_&m_path=CAMID%28%22%3A%22%29%2FdataSource%5B%40name%3D%27CARLO_F_Belege%27%5D&cmd=&m_location=&reportLocation=&ps_nav_op=maintain&ps_nav_stack=&ps_nav_source=portal%2Fproperties_general.xts")
- print(r.cookies.keys())
- if __name__ == '__main__':
- caws = ca_webscraper()
- caws.login()
- caws.report_list()
- caws.export_folder('Team Content/Export/Test')
|