123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- from pathlib import Path
- from cognos11.c11_api import c11_api
- import config
- import json
- import logging
- import os
- from datetime import datetime
- class c11_export:
- api: c11_api
- cfg: config.Config = None
- def __init__(self, cfg=None, api=None):
- self.cfg = cfg
- if cfg is None:
- self.cfg = config.Config()
- self.api = api
- if api is None:
- self.api = c11_api(cfg).login()
- now = datetime.now().strftime("%Y%m%d_%H%M%S")
- prot_file = f"{self.cfg.cognos11.logs_dir}/error_{now}.log"
- os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
- logging.basicConfig(
- filename=prot_file,
- filemode="w",
- encoding="utf-8",
- level=logging.DEBUG,
- force=True,
- )
- def get_folder(self, folder, format="PDF"):
- if folder == "":
- folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
- elif not folder.startswith("Team Content"):
- folder = "Team Content/ReportOutput/" + folder
- return folder
- def export_folder(self, folder="", format="PDF") -> None:
- folder = self.get_folder(folder, format)
- reports = self.api.get_reports_in_folder(folder, True)
- for r in reports:
- print(r["name"])
- if format == "PDF":
- self.export_pdf(r["id"], folder)
- if format == "XML":
- self.export_unstubbed(r["id"])
- def export_unstubbed(self, report_id):
- report = self.api.get_report(report_id)
- unstubbed_report = self.api.request_unstubbed(report_id)
- if unstubbed_report:
- filename = (
- f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
- )
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "w") as f:
- f.write(unstubbed_report)
- def export_pdf(self, report_id, folder=None):
- report = self.api.get_report(report_id)
- if "meta" not in report:
- logging.warning(report["name"] + " is not accessible!")
- return False
- params = {}
- if len(report["meta"]["required"]) > 0:
- if set(report["meta"]["required"].keys()) != {"p_Von", "p_Bis"}:
- return False
- params["p_Von"] = {"2022-10-12": "12.10.2022"}
- params["p_Bis"] = {"2022-10-12": "12.10.2022"}
- params["p_Zeitraum"] = {"Einzelne Monate": "Einzelne Monate"}
- for k, v in report["meta"]["optional"].items():
- if k in ["p_Zeit", "p_Auswahl_Monate", "p_12_Monate"]:
- for k1, v1 in reversed(v.items()):
- if v1 != "Invalid Dates":
- params[k] = {k1: v1}
- continue # use last element only
- if len(report["params"]) == 0:
- filename = report["filename"]
- self.request_and_save_file(report["id"], params, filename, report["format"])
- return True
- if len(report["params"]) == 1:
- filename = report["filename"].format("_1")
- self.request_and_save_file(report["id"], params, filename, report["format"])
- key1 = report["params"][0]
- for k1, v1 in report["meta"]["optional"][key1].items():
- filename = report["filename"].format(v1)
- params[key1] = {k1: v1}
- self.request_and_save_file(
- report["id"], params, filename, report["format"]
- )
- return True
- if len(report["params"]) == 2:
- key1, key2 = report["params"]
- for k1, v1 in report["meta"]["optional"][key1].items():
- for k2, v2 in report["meta"]["optional"][key2].items():
- filename = report["filename"].format(v1, v2)
- params[key1] = {k1: v1}
- params[key2] = {k2: v2}
- self.request_and_save_file(
- report["id"], params, filename, report["format"]
- )
- return True
- def request_and_save_file(self, report_id, params, filename, format="PDF"):
- logging.debug(filename)
- logging.debug(params)
- status_code, content = self.api.request_file(report_id, params, format)
- if status_code == 200:
- os.makedirs(os.path.dirname(filename), exist_ok=True)
- with open(filename, "wb") as f:
- f.write(content)
- else:
- logging.warning(content)
- def export_errors(self):
- reports = self.api.get_reports_in_folder(
- "Team Content", recursive=True, specs=True
- )
- errors = [r for r in reports if "error" in r]
- filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
- json.dump(errors, open(filename, "w"), indent=2)
- def mail_template(self, folder: str):
- current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
- subfolders = set(
- [f.name for f in current_dir.glob("*") if f.is_dir()]
- ).difference([".", "..", "leer"])
- folder2 = folder + "\\" if folder != "" else ""
- export_files = [
- f"{folder2}{f.name};"
- for f in current_dir.glob("*.*")
- if f.suffix in [".pdf", ".xls"]
- ]
- if len(export_files) > 0:
- folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
- template_file = Path(
- self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv"
- )
- print(template_file)
- with template_file.open("w") as fwh:
- fwh.write("Report;Empfaenger\n")
- fwh.write("\n".join(export_files))
- for f in subfolders:
- subfolder = folder + "\\" + f if folder != "" else f
- self.mail_template(subfolder)
- if __name__ == "__main__":
- api = c11_api()
- api.login()
- pdf = c11_export(None, api)
- # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
- # pdf.export_folder('Team Content/Aftersales/1. Service')
- pdf.export_errors()
|