|
@@ -1,3 +1,4 @@
|
|
|
+from collections import defaultdict
|
|
|
from pathlib import Path
|
|
|
from cognos11.c11_api import c11_api
|
|
|
import config
|
|
@@ -48,29 +49,42 @@ class c11_export:
|
|
|
folder = "Team Content/ReportOutput/" + folder
|
|
|
return folder
|
|
|
|
|
|
- def export_folder(self, folder="") -> None:
|
|
|
+ def export_folder(self, folder="", overwrite=False) -> None:
|
|
|
folder = self.get_folder(folder, "XML")
|
|
|
reports = self.api.get_reports_in_folder(folder, True)
|
|
|
for r in reports:
|
|
|
- self.export_unstubbed(r["id"])
|
|
|
+ if self.is_obsolete_or_ignored(r):
|
|
|
+ continue
|
|
|
+ self.export_unstubbed(r["id"], overwrite)
|
|
|
|
|
|
- def export_unstubbed(self, report_id):
|
|
|
+ @staticmethod
|
|
|
+ def is_obsolete_or_ignored(report):
|
|
|
+ if report["type"] != "report":
|
|
|
+ return True
|
|
|
+ path: str = report["path"]
|
|
|
+ if "_/" in path or path.endswith("_"):
|
|
|
+ return True
|
|
|
+ for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
|
|
|
+ if path.startswith(prefix):
|
|
|
+ return True
|
|
|
+ name: str = report["name"]
|
|
|
+ if name.endswith("_") or "Kopie" in name:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def export_unstubbed(self, report_id, overwrite=False):
|
|
|
report = self.api.get_report(report_id)
|
|
|
if "error" in report:
|
|
|
return
|
|
|
|
|
|
params = self.get_params(report, {})
|
|
|
- request = ReportRequest(
|
|
|
- report["id"], params, report["filename"], report["format"]
|
|
|
- )
|
|
|
+ request = ReportRequest(report["id"], params, report["filename"], report["format"])
|
|
|
# test if execution of report is possible
|
|
|
self.request_and_save_file(request, save=False)
|
|
|
|
|
|
- filename = (
|
|
|
- f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
|
|
|
- )
|
|
|
+ filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
|
|
|
modified = datetime.fromisoformat(report["modified"]).timestamp()
|
|
|
- if Path(filename).exists() and Path(filename).stat().st_mtime > modified:
|
|
|
+ if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
|
|
|
return
|
|
|
|
|
|
unstubbed_report = self.api.request_unstubbed(report_id)
|
|
@@ -105,39 +119,29 @@ class c11_export:
|
|
|
if len(report["params"]) == 1:
|
|
|
filename = report["filename"].format("_Summe")
|
|
|
params = self.get_params(report, {})
|
|
|
- result.append(
|
|
|
- ReportRequest(report["id"], params, filename, report["format"])
|
|
|
- )
|
|
|
+ result.append(ReportRequest(report["id"], params, filename, report["format"]))
|
|
|
|
|
|
key1 = report["params"][0]
|
|
|
for k1, v1 in report["meta"]["optional"][key1].items():
|
|
|
filename = report["filename"].format(v1)
|
|
|
params = self.get_params(report, {key1: {k1: v1}})
|
|
|
- result.append(
|
|
|
- ReportRequest(report["id"], params, filename, report["format"])
|
|
|
- )
|
|
|
+ result.append(ReportRequest(report["id"], params, filename, report["format"]))
|
|
|
return result
|
|
|
if len(report["params"]) == 2:
|
|
|
filename = report["filename"].format("_Summe", "").replace("_.", ".")
|
|
|
params = self.get_params(report, {})
|
|
|
- result.append(
|
|
|
- ReportRequest(report["id"], params, filename, report["format"])
|
|
|
- )
|
|
|
+ result.append(ReportRequest(report["id"], params, filename, report["format"]))
|
|
|
|
|
|
key1, key2 = report["params"]
|
|
|
for k1, v1 in report["meta"]["optional"][key1].items():
|
|
|
filename = report["filename"].format(v1, "_Summe")
|
|
|
params = self.get_params(report, {key1: {k1: v1}})
|
|
|
- result.append(
|
|
|
- ReportRequest(report["id"], params, filename, report["format"])
|
|
|
- )
|
|
|
+ result.append(ReportRequest(report["id"], params, filename, report["format"]))
|
|
|
|
|
|
for k2, v2 in report["meta"]["optional"][key2].items():
|
|
|
filename = report["filename"].format(v1, v2)
|
|
|
params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
|
|
|
- result.append(
|
|
|
- ReportRequest(report["id"], params, filename, report["format"])
|
|
|
- )
|
|
|
+ result.append(ReportRequest(report["id"], params, filename, report["format"]))
|
|
|
return result
|
|
|
|
|
|
def get_params(self, report, optional_params):
|
|
@@ -145,9 +149,10 @@ class c11_export:
|
|
|
params.update(optional_params)
|
|
|
return params
|
|
|
|
|
|
- def execute_request_plan(self, req_plan):
|
|
|
+ def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
|
|
|
for req_group in req_plan:
|
|
|
for report_req in req_group:
|
|
|
+ print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
|
|
|
self.request_and_save_file(report_req)
|
|
|
|
|
|
def request_and_save_file(self, report_request: ReportRequest, save=True):
|
|
@@ -167,12 +172,12 @@ class c11_export:
|
|
|
with open(report_request.filename, "wb") as f:
|
|
|
f.write(content)
|
|
|
|
|
|
- def get_merge_group(self, req_plan):
|
|
|
+ def get_merge_group(self, req_plan: list[list[ReportRequest]]):
|
|
|
res = {}
|
|
|
for req_group in req_plan:
|
|
|
- if len(req_group) == 1:
|
|
|
+ if len(req_group) <= 1:
|
|
|
continue
|
|
|
- files = [r.filename for r in req_group]
|
|
|
+ files = [r.filename for r in req_group if r.report_format == "PDF"]
|
|
|
filename = files[0].replace("_Summe", "_Summe_und_Schichten")
|
|
|
res[filename] = files
|
|
|
|
|
@@ -182,33 +187,46 @@ class c11_export:
|
|
|
return res
|
|
|
|
|
|
def export_errors(self):
|
|
|
- reports = self.api.get_reports_in_folder(
|
|
|
- "Team Content", recursive=True, specs=True
|
|
|
- )
|
|
|
+ reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
|
|
|
errors = [r for r in reports if "error" in r]
|
|
|
filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
|
|
|
json.dump(errors, open(filename, "w"), indent=2)
|
|
|
|
|
|
+ def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
|
|
|
+ mail_groups = defaultdict(list)
|
|
|
+ for report in merge_group.keys():
|
|
|
+ filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
|
|
|
+ folder = Path(filename).parent.name
|
|
|
+ mail_groups[folder].append(filename)
|
|
|
+
|
|
|
+ for req_group in req_plan:
|
|
|
+ for report in req_group:
|
|
|
+ filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
|
|
|
+ folder = Path(filename).parent.name
|
|
|
+ mail_groups[folder].append(filename)
|
|
|
+ for group, file_list in mail_groups.items():
|
|
|
+ with open(
|
|
|
+ self.cfg.cognos11.config_dir + f"/{group}.template.csv",
|
|
|
+ "w",
|
|
|
+ encoding="latin-1",
|
|
|
+ ) as fwh:
|
|
|
+ fwh.write("Datei;Empfaenger\n")
|
|
|
+ for filename in file_list:
|
|
|
+ fwh.write(filename + ";\n")
|
|
|
+ return True
|
|
|
+
|
|
|
def mail_template(self, folder: str):
|
|
|
current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
|
|
|
|
|
|
- subfolders = set(
|
|
|
- [f.name for f in current_dir.glob("*") if f.is_dir()]
|
|
|
- ).difference([".", "..", "leer"])
|
|
|
+ subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
|
|
|
|
|
|
folder2 = folder + "\\" if folder != "" else ""
|
|
|
|
|
|
- export_files = [
|
|
|
- f"{folder2}{f.name};"
|
|
|
- for f in current_dir.glob("*.*")
|
|
|
- if f.suffix in [".pdf", ".xls"]
|
|
|
- ]
|
|
|
+ export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
|
|
|
|
|
|
if len(export_files) > 0:
|
|
|
folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
|
|
|
- template_file = Path(
|
|
|
- self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv"
|
|
|
- )
|
|
|
+ template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
|
|
|
print(template_file)
|
|
|
with template_file.open("w") as fwh:
|
|
|
fwh.write("Report;Empfaenger\n")
|