c11_export.py 11 KB


  1. from collections import defaultdict
  2. from pathlib import Path
  3. from cognos11.c11_api import c11_api
  4. import config
  5. import json
  6. import logging
  7. import os
  8. from datetime import datetime
  9. from dataclasses import dataclass
  10. @dataclass
  11. class ReportRequest:
  12. report_id: str
  13. params: dict
  14. filename: str
  15. report_format: str
  16. class c11_export:
  17. api: c11_api
  18. cfg: config.Config = None
  19. def __init__(self, cfg=None, api=None):
  20. self.cfg = cfg
  21. if cfg is None:
  22. self.cfg = config.Config()
  23. self.api = api
  24. if api is None:
  25. # if self.cfg.cognos11.
  26. self.api = c11_api(cfg).login()
  27. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  28. prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log"
  29. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  30. logging.basicConfig(
  31. filename=prot_file,
  32. filemode="w",
  33. encoding="utf-8",
  34. level=logging.INFO,
  35. force=True,
  36. )
  37. @staticmethod
  38. def get_folder(folder, format="PDF"):
  39. if folder == "":
  40. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  41. elif not folder.startswith("Team Content"):
  42. folder = "Team Content/ReportOutput/" + folder
  43. return folder
  44. def export_folder(self, folder="", overwrite=False) -> None:
  45. folder = self.get_folder(folder, "XML")
  46. reports = self.api.get_reports_in_folder(folder, True)
  47. for r in reports:
  48. if self.is_obsolete_or_ignored(r):
  49. continue
  50. self.export_unstubbed(r["id"], overwrite)
  51. @staticmethod
  52. def is_obsolete_or_ignored(report):
  53. if report["type"] != "report":
  54. return True
  55. path: str = report["path"]
  56. if "_/" in path or path.endswith("_"):
  57. return True
  58. for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
  59. if path.startswith(prefix):
  60. return True
  61. name: str = report["name"]
  62. if name.endswith("_") or "Kopie" in name:
  63. return True
  64. return False
  65. def export_unstubbed(self, report_id, overwrite=False):
  66. report = self.api.get_report(report_id)
  67. if "error" in report:
  68. return
  69. params = self.get_params(report, {})
  70. request = ReportRequest(report["id"], params, report["filename"], report["format"])
  71. # test if execution of report is possible
  72. self.request_and_save_file(request, save=False)
  73. filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
  74. modified = datetime.fromisoformat(report["modified"]).timestamp()
  75. if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
  76. return
  77. unstubbed_report = self.api.request_unstubbed(report_id)
  78. if unstubbed_report:
  79. print(f"{report['path']}/{report['name']}")
  80. os.makedirs(os.path.dirname(filename), exist_ok=True)
  81. with open(filename, "w") as f:
  82. f.write(unstubbed_report)
  83. def get_folder_pdf_request_plan(self, folder=""):
  84. folder = self.get_folder(folder, "PDF")
  85. reports = self.api.get_reports_in_folder(folder, True)
  86. return [self.export_pdf(r["id"]) for r in reports]
  87. def export_pdf(self, report_id, folder=None):
  88. report = self.api.get_report(report_id)
  89. if report["type"] == "shortcut":
  90. report_link = report
  91. report = self.api.get_report(report["target_id"]).copy()
  92. for k in ("name", "description", "path", "type", "filename"):
  93. report[k] = report_link[k]
  94. json.dump(report, open("dump.json", "w"), indent=2)
  95. if "meta" not in report:
  96. logging.warning(report["name"] + " is not accessible!")
  97. return []
  98. if len(report["params"]) == 0:
  99. filename = report["filename"]
  100. params = self.get_params(report, {})
  101. return [ReportRequest(report["id"], params, filename, report["format"])]
  102. result = []
  103. if len(report["params"]) == 1:
  104. filename = report["filename"].format("_Summe")
  105. params = self.get_params(report, {})
  106. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  107. key1 = report["params"][0]
  108. for k1, v1 in report["meta"]["optional"][key1].items():
  109. filename = report["filename"].format(v1)
  110. params = self.get_params(report, {key1: {k1: v1}})
  111. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  112. return result
  113. if len(report["params"]) == 2:
  114. filename = report["filename"].format("_Summe", "").replace("_.", ".")
  115. params = self.get_params(report, {})
  116. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  117. key1, key2 = report["params"]
  118. for k1, v1 in report["meta"]["optional"][key1].items():
  119. filename = report["filename"].format(v1, "_Summe")
  120. params = self.get_params(report, {key1: {k1: v1}})
  121. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  122. for k2, v2 in report["meta"]["optional"][key2].items():
  123. filename = report["filename"].format(v1, v2)
  124. params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
  125. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  126. return result
  127. def get_params(self, report, optional_params):
  128. params = report["meta"]["required"].copy()
  129. params.update(optional_params)
  130. return params
  131. def filter_request_plan(self, req_plan: list[list[ReportRequest]], mail_csv: str):
  132. if not Path(mail_csv).exists() or not Path(mail_csv).is_file():
  133. return req_plan
  134. required_files = []
  135. required_full_export = []
  136. with open(Path(mail_csv), "r", encoding="latin-1") as rfh:
  137. for row in rfh.readlines():
  138. filename, mailto = row.split(";")
  139. if "@" in mailto:
  140. required_files.append(str(Path(self.cfg.cognos11.reportoutput_dir + '/' + filename).resolve()))
  141. if "_Schichten." in filename:
  142. filename2 = filename.replace("__Summe_und_Schichten", "__Summe").replace("__nur_Schichten", "__Summe")
  143. required_full_export.append(str(Path(self.cfg.cognos11.reportoutput_dir + '/' + filename2).resolve()))
  144. res = []
  145. for req_group in req_plan:
  146. if len(req_group) > 0 and str(Path(req_group[0].filename).resolve()) in required_full_export:
  147. res.append(req_group)
  148. else:
  149. res.append([req for req in req_group if str(Path(req.filename).resolve()) in required_files])
  150. return res
  151. def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
  152. for req_group in req_plan:
  153. for report_req in req_group:
  154. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  155. self.request_and_save_file(report_req)
  156. def request_and_save_file(self, report_request: ReportRequest, save=True):
  157. logging.debug(report_request.filename)
  158. logging.debug(report_request.params)
  159. status_code, content = self.api.request_file(
  160. report_request.report_id,
  161. report_request.params,
  162. report_request.report_format,
  163. )
  164. if status_code != 200:
  165. return
  166. if not save:
  167. return
  168. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  169. try:
  170. with open(report_request.filename, "wb") as f:
  171. f.write(content)
  172. except PermissionError:
  173. print("--> not accessible!")
  174. def get_merge_group(self, req_plan: list[list[ReportRequest]]):
  175. res = {}
  176. for req_group in req_plan:
  177. if len(req_group) <= 1:
  178. continue
  179. files = [r.filename for r in req_group if r.report_format == "PDF"]
  180. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  181. res[filename] = files
  182. files2 = [r for r in files if "_Summe" not in r]
  183. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  184. res[filename2] = files2
  185. return res
  186. def export_errors(self):
  187. reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
  188. errors = [r for r in reports if "error" in r and not self.is_obsolete_or_ignored(r)]
  189. filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
  190. json.dump(errors, open(filename, "w"), indent=2)
  191. def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
  192. mail_groups = defaultdict(list)
  193. for report in merge_group.keys():
  194. filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
  195. folder = Path(filename).parent.name
  196. mail_groups[folder].append(filename)
  197. for req_group in req_plan:
  198. for report in req_group:
  199. filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  200. folder = Path(filename).parent.name
  201. mail_groups[folder].append(filename)
  202. for group, file_list in mail_groups.items():
  203. with open(
  204. self.cfg.cognos11.config_dir + f"/{group}.template.csv",
  205. "w",
  206. encoding="latin-1",
  207. ) as fwh:
  208. fwh.write("Datei;Empfaenger\n")
  209. for filename in file_list:
  210. fwh.write(filename + ";\n")
  211. return True
  212. def mail_template(self, folder: str):
  213. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  214. subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
  215. folder2 = folder + "\\" if folder != "" else ""
  216. export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
  217. if len(export_files) > 0:
  218. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  219. template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
  220. print(template_file)
  221. with template_file.open("w") as fwh:
  222. fwh.write("Report;Empfaenger\n")
  223. fwh.write("\n".join(export_files))
  224. for f in subfolders:
  225. subfolder = folder + "\\" + f if folder != "" else f
  226. self.mail_template(subfolder)
  227. if __name__ == "__main__":
  228. api = c11_api()
  229. api.login()
  230. pdf = c11_export(None, api)
  231. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  232. # pdf.export_folder('Team Content/Aftersales/1. Service')
  233. pdf.export_errors()