c11_export.py 12 KB


  1. import json
  2. import logging
  3. import os
  4. import time
  5. from collections import defaultdict
  6. from dataclasses import dataclass
  7. from datetime import datetime
  8. from pathlib import Path
  9. import config
  10. from cognos11.c11_api import c11_api, convert_filename
  11. @dataclass
  12. class ReportRequest:
  13. report_id: str
  14. params: dict
  15. filename: str
  16. report_format: str
  17. class c11_export:
  18. api: c11_api
  19. cfg: config.Config = None
  20. def __init__(self, cfg=None, api=None):
  21. self.cfg = cfg
  22. if cfg is None:
  23. self.cfg = config.Config()
  24. self.api = api
  25. if api is None:
  26. # if self.cfg.cognos11.
  27. self.api = c11_api(cfg).login()
  28. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  29. prot_file = f"{self.cfg.cognos11.logs_dir}\\c11_export_{now}.log"
  30. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  31. logging.basicConfig(
  32. filename=prot_file,
  33. filemode="w",
  34. encoding="utf-8",
  35. level=logging.INFO,
  36. force=True,
  37. )
  38. @staticmethod
  39. def get_folder(folder, format="PDF"):
  40. if folder == "":
  41. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  42. elif not folder.startswith("Team Content"):
  43. folder = "Team Content/ReportOutput/" + folder
  44. return folder
  45. def export_folder(self, folder="", overwrite=False) -> None:
  46. folder = self.get_folder(folder, "XML")
  47. reports = self.api.get_reports_in_folder(folder, True)
  48. for r in reports:
  49. if self.is_obsolete_or_ignored(r):
  50. continue
  51. self.export_unstubbed(r["id"], overwrite)
  52. @staticmethod
  53. def is_obsolete_or_ignored(report):
  54. if report["type"] != "report":
  55. return True
  56. path: str = report["path"]
  57. if "_/" in path or path.endswith("_"):
  58. return True
  59. for prefix in ["Team Content/Templates", "Team Content/Beispiele", "Team Content/Kalender"]:
  60. if path.startswith(prefix):
  61. return True
  62. name: str = report["name"]
  63. if name.endswith("_") or "Kopie" in name:
  64. return True
  65. return False
  66. def export_unstubbed(self, report_id, overwrite=False):
  67. report = self.api.get_report(report_id)
  68. if "error" in report:
  69. return
  70. params = self.get_params(report, {})
  71. request = ReportRequest(report["id"], params, report["filename"], report["format"])
  72. # test if execution of report is possible
  73. self.request_and_save_file(request, save=False)
  74. filename = f"{self.cfg.cognos11.specs_dir}\\{report['path']}\\{report['name']}.xml"
  75. modified = datetime.fromisoformat(report["modified"]).timestamp()
  76. if not overwrite and Path(filename).exists() and Path(filename).stat().st_mtime > modified:
  77. return
  78. unstubbed_report = self.api.request_unstubbed(report_id)
  79. if unstubbed_report:
  80. print(f"{report['path']}/{report['name']}")
  81. os.makedirs(os.path.dirname(filename), exist_ok=True)
  82. with open(filename, "w") as f:
  83. f.write(unstubbed_report)
  84. def get_folder_pdf_request_plan(self, folder=""):
  85. folder = self.get_folder(folder, "PDF")
  86. reports = self.api.get_reports_in_folder(folder, True)
  87. return [self.export_pdf(r["id"]) for r in reports]
  88. def export_pdf(self, report_id, folder=None):
  89. report = self.api.get_report(report_id)
  90. if report["type"] == "shortcut":
  91. report_link = report
  92. report = self.api.get_report(report["target_id"]).copy()
  93. for k in ("name", "description", "path", "type", "filename"):
  94. report[k] = report_link[k]
  95. json.dump(report, open("dump.json", "w"), indent=2)
  96. if "meta" not in report:
  97. logging.warning(report["name"] + " is not accessible!")
  98. return []
  99. if len(report["params"]) == 0:
  100. filename = convert_filename(report["filename"])
  101. params = self.get_params(report, {})
  102. return [ReportRequest(report["id"], params, filename, report["format"])]
  103. result = []
  104. if len(report["params"]) == 1:
  105. filename = convert_filename(report["filename"].format("_Summe"))
  106. params = self.get_params(report, {})
  107. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  108. key1 = report["params"][0]
  109. for k1, v1 in report["meta"]["optional"][key1].items():
  110. filename = convert_filename(report["filename"].format(v1))
  111. params = self.get_params(report, {key1: {k1: v1}})
  112. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  113. return result
  114. if len(report["params"]) == 2:
  115. filename = convert_filename(report["filename"].format("_Summe", "").replace("_.", "."))
  116. params = self.get_params(report, {})
  117. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  118. key1, key2 = report["params"]
  119. for k1, v1 in report["meta"]["optional"][key1].items():
  120. filename = convert_filename(report["filename"].format(v1, "_Summe"))
  121. params = self.get_params(report, {key1: {k1: v1}})
  122. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  123. for k2, v2 in report["meta"]["optional"][key2].items():
  124. filename = convert_filename(report["filename"].format(v1, v2))
  125. params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
  126. result.append(ReportRequest(report["id"], params, filename, report["format"]))
  127. return result
  128. def get_params(self, report, optional_params):
  129. params = report["meta"]["required"].copy()
  130. params.update(optional_params)
  131. return params
  132. def filter_request_plan(self, req_plan: list[list[ReportRequest]], mail_csv: str):
  133. if not Path(mail_csv).exists() or not Path(mail_csv).is_file():
  134. return req_plan
  135. print("Filtern auf Empfaenger: " + mail_csv)
  136. required_files = []
  137. required_full_export = []
  138. with open(Path(mail_csv), "r", encoding="latin-1") as rfh:
  139. for row in rfh.readlines():
  140. if ";" not in row:
  141. continue
  142. filename, mailto = row.split(";", 2)
  143. if "@" not in mailto:
  144. continue
  145. required_files.append(str(Path(self.cfg.cognos11.reportoutput_dir + "\\" + filename).resolve()))
  146. if "_Schichten." in filename and "erstellte_Schichten" not in filename:
  147. filename2 = filename.replace("__Summe_und_Schichten", "__Summe").replace(
  148. "__nur_Schichten", "__Summe"
  149. )
  150. required_full_export.append(
  151. str(Path(self.cfg.cognos11.reportoutput_dir + "\\" + filename2).resolve())
  152. )
  153. res = []
  154. for req_group in req_plan:
  155. if len(req_group) > 0 and str(Path(req_group[0].filename).resolve()) in required_full_export:
  156. res.append(req_group)
  157. else:
  158. res.append([req for req in req_group if str(Path(req.filename).resolve()) in required_files])
  159. return res
  160. def execute_request_plan(self, req_plan: list[list[ReportRequest]]):
  161. failed_requests = []
  162. for req_group in req_plan:
  163. for report_req in req_group:
  164. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  165. if not self.request_and_save_file(report_req):
  166. failed_requests.append(report_req)
  167. if len(failed_requests) == 0:
  168. return
  169. time.sleep(20) # wait for a while before retrying
  170. failed_again = []
  171. print("Zweiter Versuch bei folgenden Berichten:")
  172. for report_req in failed_requests:
  173. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  174. if not self.request_and_save_file(report_req):
  175. failed_again.append(report_req)
  176. if len(failed_again) == 0:
  177. return
  178. print("Die folgenden Berichte konnten nicht erstellt werden:")
  179. for report_req in failed_again:
  180. print(Path(report_req.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  181. def request_and_save_file(self, report_request: ReportRequest, save=True):
  182. logging.debug(report_request.filename)
  183. logging.debug(report_request.params)
  184. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  185. Path(report_request.filename).unlink(missing_ok=True)
  186. status_code, content = self.api.request_file(
  187. report_request.report_id,
  188. report_request.params,
  189. report_request.report_format,
  190. )
  191. if status_code != 200:
  192. return False
  193. if save:
  194. try:
  195. with open(report_request.filename, "wb") as f:
  196. f.write(content)
  197. except PermissionError:
  198. print("--> not accessible!")
  199. return False
  200. return True
  201. def get_merge_group(self, req_plan: list[list[ReportRequest]]):
  202. res = {}
  203. for req_group in req_plan:
  204. files = [r.filename for r in req_group if r.report_format == "PDF"]
  205. if len(files) <= 1:
  206. continue
  207. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  208. res[filename] = files
  209. files2 = [r for r in files if "_Summe" not in r]
  210. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  211. res[filename2] = files2
  212. filename3 = filename.replace("_Summe_und_Schichten", "_nur_erstellte_Schichten")
  213. res[filename3] = files2
  214. return res
  215. def export_errors(self):
  216. reports = self.api.get_reports_in_folder("Team Content", recursive=True, specs=True)
  217. errors = [r for r in reports if "error" in r and not self.is_obsolete_or_ignored(r)]
  218. filename = self.cfg.cognos11.logs_dir + "\\c11_report_errors.json"
  219. json.dump(errors, open(filename, "w"), indent=2)
  220. def template(self, req_plan: list[list[ReportRequest]], merge_group: dict[str, list[str]]):
  221. mail_groups = defaultdict(list)
  222. for report in merge_group.keys():
  223. filename = str(Path(report).relative_to(self.cfg.cognos11.reportoutput_dir))
  224. folder = Path(filename).parent.name
  225. mail_groups[folder].append(filename)
  226. for req_group in req_plan:
  227. for report in req_group:
  228. filename = str(Path(report.filename).relative_to(self.cfg.cognos11.reportoutput_dir))
  229. folder = Path(filename).parent.name
  230. mail_groups[folder].append(filename)
  231. for group, file_list in mail_groups.items():
  232. with open(
  233. self.cfg.cognos11.config_dir + f"\\{group}.template.csv",
  234. "w",
  235. encoding="latin-1",
  236. ) as fwh:
  237. fwh.write("Datei;Empfaenger\n")
  238. for filename in file_list:
  239. fwh.write(filename + ";\n")
  240. return True
  241. def mail_template(self, folder: str):
  242. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  243. subfolders = set([f.name for f in current_dir.glob("*") if f.is_dir()]).difference([".", "..", "leer"])
  244. folder2 = folder + "\\" if folder != "" else ""
  245. export_files = [f"{folder2}{f.name};" for f in current_dir.glob("*.*") if f.suffix in [".pdf", ".xls"]]
  246. if len(export_files) > 0:
  247. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  248. template_file = Path(self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv")
  249. print(template_file)
  250. with template_file.open("w") as fwh:
  251. fwh.write("Report;Empfaenger\n")
  252. fwh.write("\n".join(export_files))
  253. for f in subfolders:
  254. subfolder = folder + "\\" + f if folder != "" else f
  255. self.mail_template(subfolder)
  256. if __name__ == "__main__":
  257. api = c11_api()
  258. api.login()
  259. pdf = c11_export(None, api)
  260. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  261. # pdf.export_folder('Team Content/Aftersales/1. Service')
  262. pdf.export_errors()