c11_export.py 11 KB


  1. import json
  2. import logging
  3. import os
  4. from collections import defaultdict
  5. from dataclasses import dataclass
  6. from datetime import datetime
  7. from pathlib import Path
  8. import config
  9. from cognos11.c11_api import c11_api
  10. @dataclass
  11. class ReportRequest:
  12. report_id: str
  13. params: dict
  14. filename: str
  15. report_format: str
  16. class c11_export:
  17. api: c11_api
  18. cfg: config.Config = None
  19. def __init__(self, cfg=None, api=None):
  20. self.cfg = cfg
  21. if cfg is None:
  22. self.cfg = config.Config()
  23. self.api = api
  24. if api is None:
  25. # if self.cfg.cognos11.
  26. self.api = c11_api(cfg).login()
  27. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  28. prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log"
  29. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  30. logging.basicConfig(
  31. filename=prot_file,
  32. filemode="w",
  33. encoding="utf-8",
  34. level=logging.INFO,
  35. force=True,
  36. )
  37. @staticmethod
  38. def get_folder(folder, format="PDF"):
  39. if folder == "":
  40. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  41. elif not folder.startswith("Team Content"):
  42. folder = "Team Content/ReportOutput/" + folder
  43. return folder
  44. def export_folder(self, folder="", overwrite=False) -> None:
  45. folder = self.get_folder(folder, "XML")
  46. reports = self.api.get_reports_in_folder(folder, True)
  47. for r in reports:
  48. if self.is_obsolete_or_ignored(r):
  49. continue
  50. self.export_unstubbed(r["id"], overwrite)
  51. @staticmethod
  52. def is_obsolete_or_ignored(report):
  53. if report["type"] != "report":
  54. return True
  55. path: str = report["path"]
  56. if "_/" in path or path.endswith("_"):
  57. return True
  58. for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
  59. if path.startswith(prefix):
  60. return True
  61. name: str = report["name"]
  62. if name.endswith("_") or "Kopie" in name:
  63. return True
  64. return False
  65. def export_unstubbed(self, report_id, overwrite=False):
  66. report = self.api.get_report(report_id)
  67. if "error" in report:
  68. return
  69. params = self.get_params(report, {})
  70. request = ReportRequest(report["id"], params, report["filename"], report["format"])
  71. # test if execution of report is possible
  72. self.request_and_save_file(request, save=False)
  73. filename = f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
  74. modified = datetime.fromisoformat(report["modified"]).timestamp()
  75. if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
  76. return
  77. unstubbed_report = self.api.request_unstubbed(report_id)
  78. if unstubbed_report:
  79. print(f"{report['path']}/{report['name']}")
  80. os.makedirs(os.path.dirname(filename), exist_ok=True)
  81. with open(filename, "w") as f:
  82. f.write(unstubbed_report)
  83. def get_folder_pdf_request_plan(self, folder=""):
  84. folder = self.get_folder(folder, "PDF")
  85. reports = self.api.get_reports_in_folder(folder, True)
  86. return [self.export_pdf(r["id"]) for r in reports]
  87. def export_pdf(self, report_id, folder=None):
  88. report = self.api.get_report(report_id)
  89. if report["type"] == "shortcut":
  90. report_link = report
  91. report = self.api.get_report(report["target_id"]).copy()
  92. for k in ("name", "description", "path", "type", "filename"):
  93. report[k] = report_link[k]
  94. json.dump(report, open("dump.json", "w"), indent=2)
  95. if "meta" not in report:
  96. logging.warning(report["name"] + " is not accessible!")
  97. return []
  98. if len(report["params"]) == 0:
  99. filename = report["filename"]
  100. params = self.get_params(report, {})
  101. return [ReportRequest(report["id"], params, filename, report["format"])]
  102. result = []
  103. if len(report["params"]) == 1:
  104. filename = report["filename"].format("_Summe")
  105. params = self.get_params(report, {})
  106. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  107. key1 = report["params"][0]
  108. for k1, v1 in report["meta"]["optional"][key1].items():
  109. filename = report["filename"].format(v1)
  110. params = self.get_params(report, {key1: {k1: v1}})
  111. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  112. return result
  113. if len(report["params"]) == 2:
  114. filename = report["filename"].format("_Summe", "").replace("_.", ".")
  115. params = self.get_params(report, {})
  116. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  117. key1, key2 = report["params"]
  118. for k1, v1 in report["meta"]["optional"][key1].items():
  119. filename = report["filename"].format(v1, "_Summe")
  120. params = self.get_params(report, {key1: {k1: v1}})
  121. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  122. for k2, v2 in report["meta"]["optional"][key2].items():
  123. filename = report["filename"].format(v1, v2)
  124. params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
  125. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  126. return result
  127. def get_params(self, report, optional_params):
  128. params = report["meta"]["required"].copy()
  129. params.update(optional_params)
  130. return params
  131. def filter_request_plan(self, req_plan: list[list[ReportRequest]], mail_csv: str):
  132. if not Path(mail_csv).exists() or not Path(mail_csv).is_file():
  133. return req_plan
  134. print("Filtern auf Empfaenger: " + mail_csv)
  135. required_files = []
  136. required_full_export = []
  137. with open(Path(mail_csv), "r", encoding="latin-1") as rfh:
  138. for row in rfh.readlines():
  139. if ";" not in row:
  140. continue
  141. filename, mailto = row.split(";", 2)
  142. if "@" not in mailto:
  143. continue
  144. required_files.append(str(Path(self.cfg.cognos11.reportoutput_dir + "/" + filename).resolve()))
  145. if "_Schichten." in filename and "erstellte_Schichten" not in filename:
  146. filename2 = filename.replace("__Summe_und_Schichten", "__Summe").replace(
  147. "__nur_Schichten", "__Summe"
  148. )
  149. required_full_export.append(
  150. str(Path(self.cfg.cognos11.reportoutput_dir + "/" + filename2).resolve())
  151. )
  152. res = []
  153. for req_group in req_plan:
  154. if len(req_group) > 0 and str(Path(req_group[0].filename).resolve()) in required_full_export:
  155. res.append(req_group)
  156. else:
  157. res.append([req for req in req_group if str(Path(req.filename).resolve()) in required_files])
  158. return res
  159. def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
  160. for req_group in req_plan:
  161. for report_req in req_group:
  162. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  163. self.request_and_save_file(report_req)
  164. def request_and_save_file(self, report_request: ReportRequest, save=True):
  165. logging.debug(report_request.filename)
  166. logging.debug(report_request.params)
  167. status_code, content = self.api.request_file(
  168. report_request.report_id,
  169. report_request.params,
  170. report_request.report_format,
  171. )
  172. if status_code != 200:
  173. return
  174. if not save:
  175. return
  176. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  177. try:
  178. with open(report_request.filename, "wb") as f:
  179. f.write(content)
  180. except PermissionError:
  181. print("--> not accessible!")
  182. def get_merge_group(self, req_plan: list[list[ReportRequest]]):
  183. res = {}
  184. for req_group in req_plan:
  185. files = [r.filename for r in req_group if r.report_format == "PDF"]
  186. if len(files) <= 1:
  187. continue
  188. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  189. res[filename] = files
  190. files2 = [r for r in files if "_Summe" not in r]
  191. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  192. res[filename2] = files2
  193. filename3 = filename.replace("_Summe_und_Schichten", "_nur_erstellte_Schichten")
  194. res[filename3] = files2
  195. return res
  196. def export_errors(self):
  197. reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
  198. errors = [r for r in reports if "error" in r and not self.is_obsolete_or_ignored(r)]
  199. filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
  200. json.dump(errors, open(filename, "w"), indent=2)
  201. def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
  202. mail_groups = defaultdict(list)
  203. for report in merge_group.keys():
  204. filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
  205. folder = Path(filename).parent.name
  206. mail_groups[folder].append(filename)
  207. for req_group in req_plan:
  208. for report in req_group:
  209. filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  210. folder = Path(filename).parent.name
  211. mail_groups[folder].append(filename)
  212. for group, file_list in mail_groups.items():
  213. with open(
  214. self.cfg.cognos11.config_dir + f"/{group}.template.csv",
  215. "w",
  216. encoding="latin-1",
  217. ) as fwh:
  218. fwh.write("Datei;Empfaenger\n")
  219. for filename in file_list:
  220. fwh.write(filename + ";\n")
  221. return True
  222. def mail_template(self, folder: str):
  223. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  224. subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
  225. folder2 = folder + "\\" if folder != "" else ""
  226. export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
  227. if len(export_files) > 0:
  228. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  229. template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
  230. print(template_file)
  231. with template_file.open("w") as fwh:
  232. fwh.write("Report;Empfaenger\n")
  233. fwh.write("\n".join(export_files))
  234. for f in subfolders:
  235. subfolder = folder + "\\" + f if folder != "" else f
  236. self.mail_template(subfolder)
  237. if __name__ == "__main__":
  238. api = c11_api()
  239. api.login()
  240. pdf = c11_export(None, api)
  241. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  242. # pdf.export_folder('Team Content/Aftersales/1. Service')
  243. pdf.export_errors()