from collections import defaultdict from pathlib import Path from cognos11.c11_api import c11_api import config import json import logging import os from datetime import datetime from dataclasses import dataclass @dataclass class ReportRequest: report_id: str params: dict filename: str report_format: str class c11_export: api: c11_api cfg: config.Config = None def __init__(self, cfg=None, api=None): self.cfg = cfg if cfg is None: self.cfg = config.Config() self.api = api if api is None: # if self.cfg.cognos11. self.api = c11_api(cfg).login() now = datetime.now().strftime("%Y%m%d_%H%M%S") prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log" os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True) logging.basicConfig( filename=prot_file, filemode="w", encoding="utf-8", level=logging.INFO, force=True, ) @staticmethod def get_folder(folder, format="PDF"): if folder == "": folder = "Team Content" if format == "XML" else "Team Content/ReportOutput" elif not folder.startswith("Team Content"): folder = "Team Content/ReportOutput/" + folder return folder def export_folder(self, folder="", overwrite=False) -> None: folder = self.get_folder(folder, "XML") reports = self.api.get_reports_in_folder(folder, True) for r in reports: if self.is_obsolete_or_ignored(r): continue self.export_unstubbed(r["id"], overwrite) @staticmethod def is_obsolete_or_ignored(report): if report["type"] != "report": return True path: str = report["path"] if "_/" in path or path.endswith("_"): return True for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]: if path.startswith(prefix): return True name: str = report["name"] if name.endswith("_") or "Kopie" in name: return True return False def export_unstubbed(self, report_id, overwrite=False): report = self.api.get_report(report_id) if "error" in report: return params = self.get_params(report, {}) request = ReportRequest(report["id"], params, report["filename"], report["format"]) # test if execution of report is possible self.request_and_save_file(request, save=False) filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml" modified = datetime.fromisoformat(report["modified"]).timestamp() if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified: return unstubbed_report = self.api.request_unstubbed(report_id) if unstubbed_report: print(f"{report['path']}/{report['name']}") os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w") as f: f.write(unstubbed_report) def get_folder_pdf_request_plan(self, folder=""): folder = self.get_folder(folder, "PDF") reports = self.api.get_reports_in_folder(folder, True) return [self.export_pdf(r["id"]) for r in reports] def export_pdf(self, report_id, folder=None): report = self.api.get_report(report_id) if report["type"] == "shortcut": report_link = report report = self.api.get_report(report["target_id"]).copy() for k in ("name", "description", "path", "type", "filename"): report[k] = report_link[k] json.dump(report, open("dump.json", "w"), indent=2) if "meta" not in report: logging.warning(report["name"] + " is not accessible!") return [] if len(report["params"]) == 0: filename = report["filename"] params = self.get_params(report, {}) return [ReportRequest(report["id"], params, filename, report["format"])] result = [] if len(report["params"]) == 1: filename = report["filename"].format("_Summe") params = self.get_params(report, {}) result.append(ReportRequest(report["id"], params, filename, report["format"])) key1 = report["params"][0] for k1, v1 in report["meta"]["optional"][key1].items(): filename = report["filename"].format(v1) params = self.get_params(report, {key1: {k1: v1}}) result.append(ReportRequest(report["id"], params, filename, report["format"])) return result if len(report["params"]) == 2: filename = report["filename"].format("_Summe", "").replace("_.", ".") params = self.get_params(report, {}) result.append(ReportRequest(report["id"], params, filename, report["format"])) key1, key2 = report["params"] for k1, v1 in report["meta"]["optional"][key1].items(): filename = report["filename"].format(v1, "_Summe") params = self.get_params(report, {key1: {k1: v1}}) result.append(ReportRequest(report["id"], params, filename, report["format"])) for k2, v2 in report["meta"]["optional"][key2].items(): filename = report["filename"].format(v1, v2) params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}}) result.append(ReportRequest(report["id"], params, filename, report["format"])) return result def get_params(self, report, optional_params): params = report["meta"]["required"].copy() params.update(optional_params) return params def filter_request_plan(self, req_plan: list[list[ReportRequest]], mail_csv: str): if not Path(mail_csv).exists() or not Path(mail_csv).is_file(): return req_plan required_files = [] required_full_export = [] with open(Path(mail_csv), "r", encoding="latin-1") as rfh: for row in rfh.readlines(): filename, mailto = row.split(";") if "@" in mailto: required_files.append(str(Path(self.cfg.cognos11.reportoutput_dir + '/' + filename).resolve())) if "_Schichten." in filename: filename2 = filename.replace("__Summe_und_Schichten", "__Summe").replace("__nur_Schichten", "__Summe") required_full_export.append(str(Path(self.cfg.cognos11.reportoutput_dir + '/' + filename2).resolve())) res = [] for req_group in req_plan: if len(req_group) > 0 and str(Path(req_group[0].filename).resolve()) in required_full_export: res.append(req_group) else: res.append([req for req in req_group if str(Path(req.filename).resolve()) in required_files]) return res def execute_request_plan(self, req_plan: list[list[ReportRequest]]): for req_group in req_plan: for report_req in req_group: print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir)) self.request_and_save_file(report_req) def request_and_save_file(self, report_request: ReportRequest, save=True): logging.debug(report_request.filename) logging.debug(report_request.params) status_code, content = self.api.request_file( report_request.report_id, report_request.params, report_request.report_format, ) if status_code != 200: return if not save: return os.makedirs(os.path.dirname(report_request.filename), exist_ok=True) try: with open(report_request.filename, "wb") as f: f.write(content) except PermissionError: print("--> not accessible!") def get_merge_group(self, req_plan: list[list[ReportRequest]]): res = {} for req_group in req_plan: if len(req_group) <= 1: continue files = [r.filename for r in req_group if r.report_format == "PDF"] filename = files[0].replace("_Summe", "_Summe_und_Schichten") res[filename] = files files2 = [r for r in files if "_Summe" not in r] filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten") res[filename2] = files2 return res def export_errors(self): reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True) errors = [r for r in reports if "error" in r and not self.is_obsolete_or_ignored(r)] filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json" json.dump(errors, open(filename, "w"), indent=2) def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]): mail_groups = defaultdict(list) for report in merge_group.keys(): filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir)) folder = Path(filename).parent.name mail_groups[folder].append(filename) for req_group in req_plan: for report in req_group: filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir)) folder = Path(filename).parent.name mail_groups[folder].append(filename) for group, file_list in mail_groups.items(): with open( self.cfg.cognos11.config_dir + f"/{group}.template.csv", "w", encoding="latin-1", ) as fwh: fwh.write("Datei;Empfaenger\n") for filename in file_list: fwh.write(filename + ";\n") return True def mail_template(self, folder: str): current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder) subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"]) folder2 = folder + "\\" if folder != "" else "" export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]] if len(export_files) > 0: folder2 = folder.replace("\\", "_") if folder != "" else "Versand" template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv") print(template_file) with template_file.open("w") as fwh: fwh.write("Report;Empfaenger\n") fwh.write("\n".join(export_files)) for f in subfolders: subfolder = folder + "\\" + f if folder != "" else f self.mail_template(subfolder) if __name__ == "__main__": api = c11_api() api.login() pdf = c11_export(None, api) # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF') # pdf.export_folder('Team Content/Aftersales/1. Service') pdf.export_errors()