|
- import json
- import logging
- import os
- from collections import defaultdict
- from dataclasses import dataclass
- from datetime import datetime
- from pathlib import Path
- import config
- from cognos11.c11_api import c11_api
- @dataclass
- class ReportRequest:
- report_id: str
- params: dict
- filename: str
- report_format: str
- class c11_export:
- api: c11_api
- cfg: config.Config = None
- def __init__(self, cfg=None, api=None):
- self.cfg = cfg
- if cfg is None:
- self.cfg = config.Config()
- self.api = api
- if api is None:
- # if self.cfg.cognos11.
- self.api = c11_api(cfg).login()
- now = datetime.now().strftime("%Y%m%d_%H%M%S")
- prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log"
- os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
- logging.basicConfig(
- filename=prot_file,
- filemode="w",
- encoding="utf-8",
- level=logging.INFO,
- force=True,
- )
- @staticmethod
- def get_folder(folder, format="PDF"):
- if folder == "":
- folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
- elif not folder.startswith("Team Content"):
- folder = "Team Content/ReportOutput/" + folder
- return folder
- def export_folder(self, folder="", overwrite=False) -> None:
- folder = self.get_folder(folder, "XML")
- reports = self.api.get_reports_in_folder(folder, True)
- for r in reports:
- if self.is_obsolete_or_ignored(r):
- continue
- self.export_unstubbed(r["id"], overwrite)
- @staticmethod
- def is_obsolete_or_ignored(report):
- if report["type"] != "report":
- return True
- path: str = report["path"]
- if "_/" in path or path.endswith("_"):
- return True
- for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
- if path.startswith(prefix):
- return True
- name: str = report["name"]
- if name.endswith("_") or "Kopie" in name:
- return True
- return False
- def export_unstubbed(self, report_id, overwrite=False):
- report = self.api.get_report(report_id)
- if "error" in report:
- return
- params = self.get_params(report, {})
- request = ReportRequest(report["id"], params, report["filename"], report["format"])
- # test if execution of report is possible
- self.request_and_save_file(request, save=False)
- filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
- modified = datetime.fromisoformat(report["modified"]).timestamp()
- if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
- return
- unstubbed_report = self.api.request_unstubbed(report_id)
- if unstubbed_report:
- print(f"{report['path']}/{report['name']}")
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(unstubbed_report)
- def get_folder_pdf_request_plan(self, folder=""):
- folder = self.get_folder(folder, "PDF")
- reports = self.api.get_reports_in_folder(folder, True)
- return [self.export_pdf(r["id"]) for r in reports]
- def export_pdf(self, report_id, folder=None):
- report = self.api.get_report(report_id)
- if report["type"] == "shortcut":
- report_link = report
- report = self.api.get_report(report["target_id"]).copy()
- for k in ("name", "description", "path", "type", "filename"):
- report[k] = report_link[k]
- json.dump(report, open("dump.json", "w"), indent=2)
- if "meta" not in report:
- logging.warning(report["name"] + " is not accessible!")
- return []
- if len(report["params"]) == 0:
- filename = report["filename"]
- params = self.get_params(report, {})
- return [ReportRequest(report["id"], params, filename, report["format"])]
- result = []
- if len(report["params"]) == 1:
- filename = report["filename"].format("_Summe")
- params = self.get_params(report, {})
- result.append(ReportRequest(report["id"], params, filename, report["format"]))
- key1 = report["params"][0]
- for k1, v1 in report["meta"]["optional"][key1].items():
- filename = report["filename"].format(v1)
- params = self.get_params(report, {key1: {k1: v1}})
- result.append(ReportRequest(report["id"], params, filename, report["format"]))
- return result
- if len(report["params"]) == 2:
- filename = report["filename"].format("_Summe", "").replace("_.", ".")
- params = self.get_params(report, {})
- result.append(ReportRequest(report["id"], params, filename, report["format"]))
- key1, key2 = report["params"]
- for k1, v1 in report["meta"]["optional"][key1].items():
- filename = report["filename"].format(v1, "_Summe")
- params = self.get_params(report, {key1: {k1: v1}})
- result.append(ReportRequest(report["id"], params, filename, report["format"]))
- for k2, v2 in report["meta"]["optional"][key2].items():
- filename = report["filename"].format(v1, v2)
- params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
- result.append(ReportRequest(report["id"], params, filename, report["format"]))
- return result
- def get_params(self, report, optional_params):
- params = report["meta"]["required"].copy()
- params.update(optional_params)
- return params
- def filter_request_plan(self, req_plan: list[list[ReportRequest]], mail_csv: str):
- if not Path(mail_csv).exists() or not Path(mail_csv).is_file():
- return req_plan
- print("Filtern auf Empfaenger: " + mail_csv)
- required_files = []
- required_full_export = []
- with open(Path(mail_csv), "r", encoding="latin-1") as rfh:
- for row in rfh.readlines():
- if ";" not in row:
- continue
- filename, mailto = row.split(";", 2)
- if "@" not in mailto:
- continue
- required_files.append(str(Path(self.cfg.cognos11.reportoutput_dir + "/" + filename).resolve()))
- if "_Schichten." in filename and "erstellte_Schichten" not in filename:
- filename2 = filename.replace("__Summe_und_Schichten", "__Summe").replace(
- "__nur_Schichten", "__Summe"
- )
- required_full_export.append(
- str(Path(self.cfg.cognos11.reportoutput_dir + "/" + filename2).resolve())
- )
- res = []
- for req_group in req_plan:
- if len(req_group) > 0 and str(Path(req_group[0].filename).resolve()) in required_full_export:
- res.append(req_group)
- else:
- res.append([req for req in req_group if str(Path(req.filename).resolve()) in required_files])
- return res
- def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
- for req_group in req_plan:
- for report_req in req_group:
- print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
- self.request_and_save_file(report_req)
- def request_and_save_file(self, report_request: ReportRequest, save=True):
- logging.debug(report_request.filename)
- logging.debug(report_request.params)
- status_code, content = self.api.request_file(
- report_request.report_id,
- report_request.params,
- report_request.report_format,
- )
- if status_code != 200:
- return
- if not save:
- return
- os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
- try:
- with open(report_request.filename, "wb") as f:
- f.write(content)
- except PermissionError:
- print("--> not accessible!")
- def get_merge_group(self, req_plan: list[list[ReportRequest]]):
- res = {}
- for req_group in req_plan:
- files = [r.filename for r in req_group if r.report_format == "PDF"]
- if len(files) <= 1:
- continue
- filename = files[0].replace("_Summe", "_Summe_und_Schichten")
- res[filename] = files
- files2 = [r for r in files if "_Summe" not in r]
- filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
- res[filename2] = files2
- filename3 = filename.replace("_Summe_und_Schichten", "_nur_erstellte_Schichten")
- res[filename3] = files2
- return res
- def export_errors(self):
- reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
- errors = [r for r in reports if "error" in r and not self.is_obsolete_or_ignored(r)]
- filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
- json.dump(errors, open(filename, "w"), indent=2)
- def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
- mail_groups = defaultdict(list)
- for report in merge_group.keys():
- filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
- folder = Path(filename).parent.name
- mail_groups[folder].append(filename)
- for req_group in req_plan:
- for report in req_group:
- filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
- folder = Path(filename).parent.name
- mail_groups[folder].append(filename)
- for group, file_list in mail_groups.items():
- with open(
- self.cfg.cognos11.config_dir + f"/{group}.template.csv",
- "w",
- encoding="latin-1",
- ) as fwh:
- fwh.write("Datei;Empfaenger\n")
- for filename in file_list:
- fwh.write(filename + ";\n")
- return True
- def mail_template(self, folder: str):
- current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
- subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
- folder2 = folder + "\\" if folder != "" else ""
- export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
- if len(export_files) > 0:
- folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
- template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
- print(template_file)
- with template_file.open("w") as fwh:
- fwh.write("Report;Empfaenger\n")
- fwh.write("\n".join(export_files))
- for f in subfolders:
- subfolder = folder + "\\" + f if folder != "" else f
- self.mail_template(subfolder)
- if __name__ == "__main__":
- api = c11_api()
- api.login()
- pdf = c11_export(None, api)
- # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
- # pdf.export_folder('Team Content/Aftersales/1. Service')
- pdf.export_errors()
|