123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- import base64
- import os
- import time
- import requests
- from requests_toolbelt.multipart import decoder
- import jinja2
- import json
- import re
- from bs4 import BeautifulSoup
- from xml_prettify import prettify_xml
- class ca_webscraper:
- webservice = "http://localhost:9300/bi/"
- templates_dir = "tools/cognos11/templates"
- # templates_dir = "C:/GlobalCube/Tasks/gctools/templates"
- export_dir = "C:/GlobalCube/ReportOutput"
- log_dir = "C:/GlobalCube/Tasks/gctools/logs"
- credentials = '{"parameters":[{"name":"CAMNamespace","value":"CognosEx"},{"name":"h_CAM_action","value":"logonAs"},{"name":"CAMUsername","value":"Global1"},{"name":"CAMPassword","value":"Cognos#11"}]}'
- headers = {}
- caf = ""
- cam = ""
- auth_token = ""
- def __init__(self):
- self._env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(self.templates_dir),
- autoescape=jinja2.select_autoescape(['html', 'xml'])
- )
- self.template = self._env.get_template('get_report.xml')
- @staticmethod
- def generate_token(message_base64):
- version = "V1".encode("utf-8")
- header_len = 4
- msg = base64.b64decode(message_base64)[1:]
- chunks = []
- while len(msg) >= header_len:
- chunk_len = int.from_bytes(msg[:header_len], byteorder="little")
- msg = msg[header_len:]
- chunks.append(msg[:chunk_len])
- msg = msg[chunk_len:]
- return base64.b64encode(version + chunks[-1]).decode("utf-8")
- def login(self):
- self.session = requests.Session()
- r = self.session.get(self.webservice)
- self.headers = {'Content-Type': "application/json; charset=UTF-8", 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN')}
- r = self.session.post(self.webservice + "v1/login", data=self.credentials, headers=self.headers)
- self.caf = r.json()['cafContextId']
- self.cam = self.generate_token(r.cookies["usersessionid"])
- return r.status_code
- def report_list(self):
- # "v1/objects/_dot_public_folders/items"
- filter = "type|analysis|interactiveReport|powerPlayReport|powerPlay8Report|powerPlay8ReportView|query|report|reportTemplate"
- res = self.session.get(
- self.webservice + "v1/search/cm?fields=defaultName|id|ancestors&results=1000&query=.&hide_internal=all&filter=" + filter,
- headers=self.headers
- )
- # print(res.content)
- filename = self.log_dir + '/reports_error.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(res.content)
- self.reports = res.json()['results']
- for r in self.reports:
- r['path'] = "/".join([a['defaultName'].replace('/', '_') for a in r['ancestors']])
- r['name'] = r['defaultName']
- del(r['ancestors'])
- del(r['defaultName'])
- filename = self.log_dir + '/config/reports.json'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(self.reports, open(filename, 'w'), indent=2)
- return self.reports
- def export_folder(self, folder, format='PDF'):
- reports = [r for r in self.reports if r['path'].startswith(folder)]
- for r in reports:
- print(r['name'])
- # continue
- path = r['path'].replace(folder, '')
- r['filename'] = f"{path}/{r['name']}.pdf"
- r['params'] = list(re.findall(r'\[([^\]]+)\]', r['filename']))
- for i, p in enumerate(r['params']):
- r['filename'] = r['filename'].replace('[' + p + ']', '{' + str(i) + '}')
- self.export(r, format)
- def export(self, report, format='XML'):
- headers = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report['id'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- soap = self.template.render({"caf": self.caf, "cam": self.cam,
- "report": report, "format": 'XHTML',
- "prompt": 'true', "tracking": "", "params": {}})
- r = None
- while r is None or r.status_code == 500:
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- time.sleep(2)
- parts = decoder.MultipartDecoder.from_response(r).parts
- self.auth_token = re.findall(
- r'<bus:authenticityToken xsi:type="xs:base64Binary">(.*)</bus:authenticityToken>',
- parts[0].text)[0]
- # for i, p in enumerate(parts):
- # with open(f"export/{report['report']}_{i}.xml", "w") as f:
- # f.write(p.text.replace('\x81', ''))
- meta = {'required': {}, 'optional': {}}
- bs = BeautifulSoup(parts[1].content, 'lxml')
- # print(bs.prettify())
- for sv in bs.find_all('selectvalue'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
- for sv in bs.find_all('selectdate'):
- k = sv['parameter']
- req = 'required' if sv['required'] == 'true' else 'optional'
- v = dict([(opt['usevalue'], opt['displayvalue']) for opt in sv.find_all('selectoption')])
- meta[req][k] = v
- filename = self.log_dir + f"/config/{report['path']}/{report['name']}.json"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- json.dump(meta, open(filename, 'w'), indent=2)
- spec = parts[2].text
- if format == 'PDF':
- return self.export_pdf(report, meta)
- return self.export_unstubbed(report, spec)
- def export_unstubbed(self, report, spec):
- payload = json.dumps({'reportspec_stubbed': spec, 'storeid': report['id']})
- headers = {
- 'Content-Type': 'application/json; charset=UTF-8',
- 'X-XSRF-TOKEN': self.session.cookies.get('XSRF-TOKEN'),
- 'authenticityToken': self.auth_token,
- 'X-UseRsConsumerMode': 'true',
- 'cafContextId': self.caf
- }
- r = self.session.post(self.webservice + 'v1/reports/unstubreport', data=payload, headers=headers)
- unstubbed = json.loads(r.text)['reportspec_full']
- unstubbed = re.sub(r' iid="[^"]*"', '', unstubbed)
- bs = BeautifulSoup(unstubbed, 'xml')
- for xa in bs.find_all('XMLAttributes'):
- if (
- xa.find_all('XMLAttribute', {'name': 'RS_dataType'}) or
- xa.find_all('XMLAttribute', {'name': 'RS_CreateExtendedDataItems'}) or
- xa.find_all('XMLAttribute', {'name': 'RS_legacyDrillDown'})
- ):
- continue
- if xa.find_all('XMLAttribute', {'name': 'supportsDefaultDataFormatting'}):
- for xa2 in xa.find_all('XMLAttribute'):
- if xa2.attrs['name'] != 'supportsDefaultDataFormatting':
- xa2.decompose()
- continue
- xa.decompose()
- for cti in bs.find_all('crosstabIntersection'):
- if len(list(cti.children)) == 0:
- cti.decompose()
- unstubbed_report = str(bs).replace("'", ''')
- unstubbed_report = prettify_xml(unstubbed_report)
- filename = self.log_dir + f"/config/{report['path']}/{report['name']}.xml"
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(unstubbed_report)
- return unstubbed_report
- def export_pdf(self, report, meta):
- headers = {
- 'Content-Type': 'text/xml; charset=UTF-8',
- 'X-XSRF-TOKEN': self.headers['X-XSRF-TOKEN'],
- 'X-RsCMStoreID': report['id'],
- 'X-UseRsConsumerMode': 'true',
- 'SOAPAction': 'http://www.ibm.com/xmlns/prod/cognos/reportService/202004/'
- }
- if len(report['params']) == 0:
- params = {}
- filename = report['filename']
- self.request_file(report, headers, params, filename)
- return True
- if len(report['params']) == 1:
- params = {}
- filename = report['filename'].format('1')
- self.request_file(report, headers, params, filename)
- key1 = report['params'][0]
- for k1, v1 in meta['optional'][key1].items():
- filename = report['filename'].format(v1)
- params = {key1: {k1: v1}}
- self.request_file(report, headers, params, filename)
- return True
- if len(report['params']) == 2:
- key1, key2 = report['params']
- for k1, v1 in meta['optional'][key1].items():
- for k2, v2 in meta['optional'][key2].items():
- filename = report['filename'].format(v1, v2)
- params = {key1: {k1: v1}, key2: {k2: v2}}
- self.request_file(report, headers, params, filename)
- return True
- def request_file(self, report, headers, params, filename):
- soap = self.template.render({"caf": self.caf, "cam": self.cam,
- "report": report, "format": 'PDF',
- "prompt": 'false', "tracking": "", "params": params}).encode("utf-8")
- try:
- r = self.session.post(self.webservice + 'v1/reports', data=soap, headers=headers)
- except UnicodeEncodeError:
- filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(soap)
- return False
- if r.status_code == 200:
- parts = decoder.MultipartDecoder.from_response(r).parts
- filename = self.export_dir + filename
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(parts[1].content)
- else:
- filename = self.log_dir + '/' + os.path.basename(filename) + '.log'
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(r.content)
- return True
- def admin(self):
- r = self.session.get(self.webservice + "v1/disp?m_p_owner=&changed_m_p_owner=0&changed_genprop=0&so.select=&so.return.m=portal%2Fproperties_general.xts&so.defaultLocation=&so.defaultObject=&ro_name=false&origDefLang=de&m_email=&m_defaultName=&m_t_default_name_de=CARLO_F_Belege&m_t_default_description_de=&m_t_default_screenTip_de=&ifrmcmd=save&m_p_disabled=false&m_p_hidden=false&icon_radio=standard&m_transloc=de&pty_activeLang=de&pty_deactivLang=&pty_del=&pty_add=&pty_name=CARLO_F_Belege&pty_scrt=&pty_desc=&m_new_class=&b_action=xts.run&sharedPagesChanged=&from_tool=true&backURL=%2Fbi%2Fv1%2Fdisp%3Fb_action%3Dxts.run%26m%3Dportal%2Flegacy_tools%2Ftools_directory.xts%26m_pathID%3Di339AF66BADEC411E943590402582B75B%26m_path%3DCAMID%28%2522%253a%2522%29%252fdataSource%255b%2540name%253d%2527CARLO_F_Belege%2527%255d%26tool_tab%3Dd&m_selectedPage=&m_classSubtype=&m_obj=CAMID%28%22%3A%22%29%2FdataSource%5B%40name%3D%27CARLO_F_Belege%27%5D%2FdataSourceConnection%5B%40name%3D%27CARLO_F_Belege%27%5D&b_report_type=&encoding=UTF-8&m=portal%2Fproperties_connection.xts&m_class=dataSourceConnection&m_name=CARLO_F_Belege&ui.cafcontextid=CAFW000000a0Q0FGQTYwMDAwMDAwMDlBaFFBQUFERWpZV1g4bEExbmlJd29ualF1cEgwWVVTeGtnY0FBQUJUU0VFdE1qVTJJQUFBQUxha3gqeHQ5TXN3Ukw2dGhjMTJVRzN1NVhaMWVzNU5FLXRvWXI1VzlwYTE0NDI0NzN8cHM_&m_path=CAMID%28%22%3A%22%29%2FdataSource%5B%40name%3D%27CARLO_F_Belege%27%5D&cmd=&m_location=&reportLocation=&ps_nav_op=maintain&ps_nav_stack=&ps_nav_source=portal%2Fproperties_general.xts")
- print(r.cookies.keys())
- # CRN=http%3A%2F%2Fdeveloper.cognos.com%2Fceba%2Fconstants%2FbiDirectionalOptionEnum%23biDirectionalFeaturesEnabled%3Dfalse%26http%3A%2F%2Fdeveloper.cognos.com%2Fceba%2Fconstants%2FsystemOptionEnum%23accessibilityFeatures%3Dfalse%26skin%3Dcorporate%26contentLocale%3Dde-de%26showHiddenObjects%3Dfalse%26showWelcomePage%3Dtrue%26backgroundSessionLogging%3D1970-01-01%2B00%253A00%253A00%26showOptionSummary%3Dtrue%26productLocale%3Dde%26listViewSeparator%3Dnone%26showHints%3DhideAll%26timeZoneID%3DEurope%252FBerlin%26linesPerPage%3D15%26displayMode%3Dlist%26automaticPageRefresh%3D30%26format%3DHTML%26columnsPerPage%3D3%26;
- # cea-ssa=false;
- # userCapabilities=f%3Bfdbffc6d%3Bf07c1faf%3Bff27defa%26AwcAAABTSEEtMjU2FAAAAMSNhZfyUDWeIjCieNC6kfRhRLGSDPui1gb2UKZPMWVW5x3QHrHM%2BIVk5gFTzsNq1oOMioU%3D;
- # userCapabilitiesEx=603%3Bf%3Bfdbffc6d%3Bf07c1faf%3Bff27defa%26AhQAAADEjYWX8lA1niIwonjQupH0YUSxkgcAAABTSEEtMjU2IAAAAMI%2BRz7opwjhYGXonwHJpD3Ya1agg0lFOa1JK%2FkZiDTS;
- # caf=CAFW000000e0Q0FGQTYwMDAwMDAwM2FBaFFBQUFERWpZV1g4bEExbmlJd29ualF1cEgwWVVTeGtnY0FBQUJUU0VFdE1qVTJJQUFBQUY3d3JJLW1zREhXcnQtT0VWLUhhKjF0c01TZFVrWlB0aXNYdEVJOUcqZm40NDI0NzN8MTAxOjc5ZTJiNDdhLTIyNjgtZDcxYS1jMGIxLWYwMDYyY2QxYWQwZTozNDAxNTQ5MzI1;
- # cc_session=s_cc:|s_conf:na|s_sch:td|s_hd:sa|s_serv:na|s_disp:na|s_set:|s_dep:na|s_dir:na|s_sms:dd|s_ct:sa|s_cs:sa|s_so:sa|e_hp:CAMID(*22CognosEx*3au*3auid*3dglobal1*22)|e_proot:Team*20Content|prootid:i1F610DE4196544319A27C5709282EF95|e_mroot:Eigene*20Ordner|mrootid:iBAA3C16EC2D743B7B221BBFCDBC625D7|e_mrootpath:CAMID(*22CognosEx*3au*3auid*3dglobal1*22)*2ffolder*5b*40name*3d*27Eigene*20Ordner*27*5d|e_user:Global*20Cube|e_tenantID:|e_tenantDisplayName:|e_showTenantInfo:false|e_isSysAdmin:true|e_isTenantAdmin:false|e_isImpersonating:false|cl:de-de|dcid:i1F610DE4196544319A27C5709282EF95|show_logon:false|uig:|ui:|rsuiprofile:all|lch:f|lca:f|ci:f|write:true|eom:0|pp:3401549325;
- # up=H4sIAAAAAAAAAFWQ204CMRCGX4X0GuIuF5rlDgXEBBQUgxIT0sOwW2k7mx4UNL67sxgRk0ma+Tr5+nc+WdQWVujgZsB6bJg81vBydgneaMfaLGzp6LFHF8E7qCw0cIPe8kh4vJhOqBdcbkuPyakHCEGjm2BZalfSRF5cZJ0sp2plWe9QjbXC97F2MdBEpRX0jSGaAvhbboHgtUHBTU4w7usGcCnJHwlUaGHGywZSR7x5Umij434EPCYPZN1wE6DNdOg7dHuL6Y/VHlWScYKSm0ai4BhIKXB34hVkPFEomtnNu4UTtvi4fyrP1aLO+XJerLq7t2c7CtP5ek0Kae0MfeTmJ5jQA+3JRNvg5jfY0HFhQB3lCjY8mfjv062rJJpIEmnn7jRn5xBV8AAL2MWjnu76KSL7+gaz/bFgzAEAAA==;
- # MRUStorage=%7B%22xQ29nbm9zRXg6dTp1aWQ9Z2xvYmFsMQ__%22%3Atrue%7D;
- # usersessionid=AggAAADOAvteAAAAAAoAAAAPMXuCsi7eLj2GFAAAAMSNhZfyUDWeIjCieNC6kfRhRLGSBwAAAFNIQS0yNTYgAAAAxhTjzCpYnRF3ryJ/z/gpU9G5UeFYyp1vKc/PjuCT/r4=;
- # cam_passport=MTsxMDE6NzllMmI0N2EtMjI2OC1kNzFhLWMwYjEtZjAwNjJjZDFhZDBlOjM0MDE1NDkzMjU7MDszOzA7
- if __name__ == '__main__':
- caws = ca_webscraper()
- caws.login()
- caws.report_list()
- caws.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'XML')
|