c11_export.py 9.7 KB


  1. from collections import defaultdict
  2. from pathlib import Path
  3. from cognos11.c11_api import c11_api
  4. import config
  5. import json
  6. import logging
  7. import os
  8. from datetime import datetime
  9. from dataclasses import dataclass
  10. @dataclass
  11. class ReportRequest:
  12. report_id: str
  13. params: dict
  14. filename: str
  15. report_format: str
  16. class c11_export:
  17. api: c11_api
  18. cfg: config.Config = None
  19. def __init__(self, cfg=None, api=None):
  20. self.cfg = cfg
  21. if cfg is None:
  22. self.cfg = config.Config()
  23. self.api = api
  24. if api is None:
  25. # if self.cfg.cognos11.
  26. self.api = c11_api(cfg).login()
  27. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  28. prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log"
  29. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  30. logging.basicConfig(
  31. filename=prot_file,
  32. filemode="w",
  33. encoding="utf-8",
  34. level=logging.INFO,
  35. force=True,
  36. )
  37. @staticmethod
  38. def get_folder(folder, format="PDF"):
  39. if folder == "":
  40. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  41. elif not folder.startswith("Team Content"):
  42. folder = "Team Content/ReportOutput/" + folder
  43. return folder
  44. def export_folder(self, folder="", overwrite=False) -> None:
  45. folder = self.get_folder(folder, "XML")
  46. reports = self.api.get_reports_in_folder(folder, True)
  47. for r in reports:
  48. if self.is_obsolete_or_ignored(r):
  49. continue
  50. self.export_unstubbed(r["id"], overwrite)
  51. @staticmethod
  52. def is_obsolete_or_ignored(report):
  53. if report["type"] != "report":
  54. return True
  55. path: str = report["path"]
  56. if "_/" in path or path.endswith("_"):
  57. return True
  58. for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
  59. if path.startswith(prefix):
  60. return True
  61. name: str = report["name"]
  62. if name.endswith("_") or "Kopie" in name:
  63. return True
  64. return False
  65. def export_unstubbed(self, report_id, overwrite=False):
  66. report = self.api.get_report(report_id)
  67. if "error" in report:
  68. return
  69. params = self.get_params(report, {})
  70. request = ReportRequest(report["id"], params, report["filename"], report["format"])
  71. # test if execution of report is possible
  72. self.request_and_save_file(request, save=False)
  73. filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
  74. modified = datetime.fromisoformat(report["modified"]).timestamp()
  75. if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
  76. return
  77. unstubbed_report = self.api.request_unstubbed(report_id)
  78. if unstubbed_report:
  79. print(f"{report['path']}/{report['name']}")
  80. os.makedirs(os.path.dirname(filename), exist_ok=True)
  81. with open(filename, "w") as f:
  82. f.write(unstubbed_report)
  83. def get_folder_pdf_request_plan(self, folder=""):
  84. folder = self.get_folder(folder, "PDF")
  85. reports = self.api.get_reports_in_folder(folder, True)
  86. return [self.export_pdf(r["id"]) for r in reports]
  87. def export_pdf(self, report_id, folder=None):
  88. report = self.api.get_report(report_id)
  89. if report["type"] == "shortcut":
  90. report_link = report
  91. report = self.api.get_report(report["target_id"]).copy()
  92. for k in ("name", "description", "path", "type", "filename"):
  93. report[k] = report_link[k]
  94. json.dump(report, open("dump.json", "w"), indent=2)
  95. if "meta" not in report:
  96. logging.warning(report["name"] + " is not accessible!")
  97. return []
  98. if len(report["params"]) == 0:
  99. filename = report["filename"]
  100. params = self.get_params(report, {})
  101. return [ReportRequest(report["id"], params, filename, report["format"])]
  102. result = []
  103. if len(report["params"]) == 1:
  104. filename = report["filename"].format("_Summe")
  105. params = self.get_params(report, {})
  106. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  107. key1 = report["params"][0]
  108. for k1, v1 in report["meta"]["optional"][key1].items():
  109. filename = report["filename"].format(v1)
  110. params = self.get_params(report, {key1: {k1: v1}})
  111. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  112. return result
  113. if len(report["params"]) == 2:
  114. filename = report["filename"].format("_Summe", "").replace("_.", ".")
  115. params = self.get_params(report, {})
  116. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  117. key1, key2 = report["params"]
  118. for k1, v1 in report["meta"]["optional"][key1].items():
  119. filename = report["filename"].format(v1, "_Summe")
  120. params = self.get_params(report, {key1: {k1: v1}})
  121. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  122. for k2, v2 in report["meta"]["optional"][key2].items():
  123. filename = report["filename"].format(v1, v2)
  124. params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
  125. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  126. return result
  127. def get_params(self, report, optional_params):
  128. params = report["meta"]["required"].copy()
  129. params.update(optional_params)
  130. return params
  131. def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
  132. for req_group in req_plan:
  133. for report_req in req_group:
  134. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  135. self.request_and_save_file(report_req)
  136. def request_and_save_file(self, report_request: ReportRequest, save=True):
  137. logging.debug(report_request.filename)
  138. logging.debug(report_request.params)
  139. status_code, content = self.api.request_file(
  140. report_request.report_id,
  141. report_request.params,
  142. report_request.report_format,
  143. )
  144. if status_code != 200:
  145. return
  146. if not save:
  147. return
  148. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  149. with open(report_request.filename, "wb") as f:
  150. f.write(content)
  151. def get_merge_group(self, req_plan: list[list[ReportRequest]]):
  152. res = {}
  153. for req_group in req_plan:
  154. if len(req_group) <= 1:
  155. continue
  156. files = [r.filename for r in req_group if r.report_format == "PDF"]
  157. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  158. res[filename] = files
  159. files2 = [r for r in files if "_Summe" not in r]
  160. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  161. res[filename2] = files2
  162. return res
  163. def export_errors(self):
  164. reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
  165. errors = [r for r in reports if "error" in r]
  166. filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
  167. json.dump(errors, open(filename, "w"), indent=2)
  168. def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
  169. mail_groups = defaultdict(list)
  170. for report in merge_group.keys():
  171. filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
  172. folder = Path(filename).parent.name
  173. mail_groups[folder].append(filename)
  174. for req_group in req_plan:
  175. for report in req_group:
  176. filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  177. folder = Path(filename).parent.name
  178. mail_groups[folder].append(filename)
  179. for group, file_list in mail_groups.items():
  180. with open(
  181. self.cfg.cognos11.config_dir + f"/{group}.template.csv",
  182. "w",
  183. encoding="latin-1",
  184. ) as fwh:
  185. fwh.write("Datei;Empfaenger\n")
  186. for filename in file_list:
  187. fwh.write(filename + ";\n")
  188. return True
  189. def mail_template(self, folder: str):
  190. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  191. subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
  192. folder2 = folder + "\\" if folder != "" else ""
  193. export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
  194. if len(export_files) > 0:
  195. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  196. template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
  197. print(template_file)
  198. with template_file.open("w") as fwh:
  199. fwh.write("Report;Empfaenger\n")
  200. fwh.write("\n".join(export_files))
  201. for f in subfolders:
  202. subfolder = folder + "\\" + f if folder != "" else f
  203. self.mail_template(subfolder)
  204. if __name__ == "__main__":
  205. api = c11_api()
  206. api.login()
  207. pdf = c11_export(None, api)
  208. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  209. # pdf.export_folder('Team Content/Aftersales/1. Service')
  210. pdf.export_errors()