c11_export.py 10 KB


  1. from collections import defaultdict
  2. from pathlib import Path
  3. from cognos11.c11_api import c11_api
  4. import config
  5. import json
  6. import logging
  7. import os
  8. from datetime import datetime
  9. from dataclasses import dataclass
  10. @dataclass
  11. class ReportRequest:
  12. report_id: str
  13. params: dict
  14. filename: str
  15. report_format: str
  16. class c11_export:
  17. api: c11_api
  18. cfg: config.Config = None
  19. def __init__(self, cfg=None, api=None):
  20. self.cfg = cfg
  21. if cfg is None:
  22. self.cfg = config.Config()
  23. self.api = api
  24. if api is None:
  25. # if self.cfg.cognos11.
  26. self.api = c11_api(cfg).login()
  27. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  28. prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log"
  29. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  30. logging.basicConfig(
  31. filename=prot_file,
  32. filemode="w",
  33. encoding="utf-8",
  34. level=logging.INFO,
  35. force=True,
  36. )
  37. @staticmethod
  38. def get_folder(folder, format="PDF"):
  39. if folder == "":
  40. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  41. elif not folder.startswith("Team Content"):
  42. folder = "Team Content/ReportOutput/" + folder
  43. return folder
  44. def export_folder(self, folder="", overwrite=False) -> None:
  45. folder = self.get_folder(folder, "XML")
  46. reports = self.api.get_reports_in_folder(folder, True)
  47. for r in reports:
  48. if self.is_obsolete_or_ignored(r):
  49. continue
  50. self.export_unstubbed(r["id"], overwrite)
  51. @staticmethod
  52. def is_obsolete_or_ignored(report):
  53. if report["type"] != "report":
  54. return True
  55. path: str = report["path"]
  56. if "_/" in path or path.endswith("_"):
  57. return True
  58. for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
  59. if path.startswith(prefix):
  60. return True
  61. name: str = report["name"]
  62. if name.endswith("_") or "Kopie" in name:
  63. return True
  64. return False
  65. def export_unstubbed(self, report_id, overwrite=False):
  66. report = self.api.get_report(report_id)
  67. if "error" in report:
  68. return
  69. params = self.get_params(report, {})
  70. request = ReportRequest(report["id"], params, report["filename"], report["format"])
  71. # test if execution of report is possible
  72. self.request_and_save_file(request, save=False)
  73. filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
  74. modified = datetime.fromisoformat(report["modified"]).timestamp()
  75. if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
  76. return
  77. unstubbed_report = self.api.request_unstubbed(report_id)
  78. if unstubbed_report:
  79. print(f"{report['path']}/{report['name']}")
  80. os.makedirs(os.path.dirname(filename), exist_ok=True)
  81. with open(filename, "w") as f:
  82. f.write(unstubbed_report)
  83. def get_folder_pdf_request_plan(self, folder=""):
  84. folder = self.get_folder(folder, "PDF")
  85. reports = self.api.get_reports_in_folder(folder, True)
  86. return [self.export_pdf(r["id"]) for r in reports]
  87. def export_pdf(self, report_id, folder=None):
  88. report = self.api.get_report(report_id)
  89. if report["type"] == "shortcut":
  90. report_link = report
  91. report = self.api.get_report(report["target_id"]).copy()
  92. for k in ("name", "description", "path", "type", "filename"):
  93. report[k] = report_link[k]
  94. json.dump(report, open("dump.json", "w"), indent=2)
  95. if "meta" not in report:
  96. logging.warning(report["name"] + " is not accessible!")
  97. return []
  98. if len(report["params"]) == 0:
  99. filename = report["filename"]
  100. params = self.get_params(report, {})
  101. return [ReportRequest(report["id"], params, filename, report["format"])]
  102. result = []
  103. if len(report["params"]) == 1:
  104. filename = report["filename"].format("_Summe")
  105. params = self.get_params(report, {})
  106. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  107. key1 = report["params"][0]
  108. for k1, v1 in report["meta"]["optional"][key1].items():
  109. filename = report["filename"].format(v1)
  110. params = self.get_params(report, {key1: {k1: v1}})
  111. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  112. return result
  113. if len(report["params"]) == 2:
  114. filename = report["filename"].format("_Summe", "").replace("_.", ".")
  115. params = self.get_params(report, {})
  116. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  117. key1, key2 = report["params"]
  118. for k1, v1 in report["meta"]["optional"][key1].items():
  119. filename = report["filename"].format(v1, "_Summe")
  120. params = self.get_params(report, {key1: {k1: v1}})
  121. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  122. for k2, v2 in report["meta"]["optional"][key2].items():
  123. filename = report["filename"].format(v1, v2)
  124. params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
  125. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  126. return result
  127. def get_params(self, report, optional_params):
  128. params = report["meta"]["required"].copy()
  129. params.update(optional_params)
  130. return params
  131. def filter_request_plan(req_plan: list[list[ReportRequest]], mail_csv: str):
  132. if not Path(mail_csv).exists():
  133. return req_plan
  134. required_files = []
  135. with open(Path(mail_csv), "r", encoding="latin-1") as rfh:
  136. for row in rfh.readlines():
  137. filename, mailto = row.removesuffix("\r\n").split(";")
  138. if mailto == "":
  139. continue
  140. required_files.append(filename)
  141. res = []
  142. for req_group in required_files:
  143. res.append([req for req in req_group if req.filename in required_files])
  144. return res
  145. def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
  146. for req_group in req_plan:
  147. for report_req in req_group:
  148. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  149. self.request_and_save_file(report_req)
  150. def request_and_save_file(self, report_request: ReportRequest, save=True):
  151. logging.debug(report_request.filename)
  152. logging.debug(report_request.params)
  153. status_code, content = self.api.request_file(
  154. report_request.report_id,
  155. report_request.params,
  156. report_request.report_format,
  157. )
  158. if status_code != 200:
  159. return
  160. if not save:
  161. return
  162. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  163. with open(report_request.filename, "wb") as f:
  164. f.write(content)
  165. def get_merge_group(self, req_plan: list[list[ReportRequest]]):
  166. res = {}
  167. for req_group in req_plan:
  168. if len(req_group) <= 1:
  169. continue
  170. files = [r.filename for r in req_group if r.report_format == "PDF"]
  171. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  172. res[filename] = files
  173. files2 = [r for r in files if "_Summe" not in r]
  174. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  175. res[filename2] = files2
  176. return res
  177. def export_errors(self):
  178. reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
  179. errors = [r for r in reports if "error" in r and not self.is_obsolete_or_ignored(r)]
  180. filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
  181. json.dump(errors, open(filename, "w"), indent=2)
  182. def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
  183. mail_groups = defaultdict(list)
  184. for report in merge_group.keys():
  185. filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
  186. folder = Path(filename).parent.name
  187. mail_groups[folder].append(filename)
  188. for req_group in req_plan:
  189. for report in req_group:
  190. filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  191. folder = Path(filename).parent.name
  192. mail_groups[folder].append(filename)
  193. for group, file_list in mail_groups.items():
  194. with open(
  195. self.cfg.cognos11.config_dir + f"/{group}.template.csv",
  196. "w",
  197. encoding="latin-1",
  198. ) as fwh:
  199. fwh.write("Datei;Empfaenger\n")
  200. for filename in file_list:
  201. fwh.write(filename + ";\n")
  202. return True
  203. def mail_template(self, folder: str):
  204. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  205. subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
  206. folder2 = folder + "\\" if folder != "" else ""
  207. export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
  208. if len(export_files) > 0:
  209. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  210. template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
  211. print(template_file)
  212. with template_file.open("w") as fwh:
  213. fwh.write("Report;Empfaenger\n")
  214. fwh.write("\n".join(export_files))
  215. for f in subfolders:
  216. subfolder = folder + "\\" + f if folder != "" else f
  217. self.mail_template(subfolder)
  218. if __name__ == "__main__":
  219. api = c11_api()
  220. api.login()
  221. pdf = c11_export(None, api)
  222. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  223. # pdf.export_folder('Team Content/Aftersales/1. Service')
  224. pdf.export_errors()