c11_export.py 8.1 KB


  1. from pathlib import Path
  2. from cognos11.c11_api import c11_api
  3. import config
  4. import json
  5. import logging
  6. import os
  7. from datetime import datetime
  8. from dataclasses import dataclass
  9. @dataclass
  10. class ReportRequest:
  11. report_id: str
  12. params: dict
  13. filename: str
  14. report_format: str
  15. class c11_export:
  16. api: c11_api
  17. cfg: config.Config = None
  18. def __init__(self, cfg=None, api=None):
  19. self.cfg = cfg
  20. if cfg is None:
  21. self.cfg = config.Config()
  22. self.api = api
  23. if api is None:
  24. # if self.cfg.cognos11.
  25. self.api = c11_api(cfg).login()
  26. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  27. prot_file = f"{self.cfg.cognos11.logs_dir}/c11_export_{now}.log"
  28. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  29. logging.basicConfig(
  30. filename=prot_file,
  31. filemode="w",
  32. encoding="utf-8",
  33. level=logging.INFO,
  34. force=True,
  35. )
  36. @staticmethod
  37. def get_folder(folder, format="PDF"):
  38. if folder == "":
  39. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  40. elif not folder.startswith("Team Content"):
  41. folder = "Team Content/ReportOutput/" + folder
  42. return folder
  43. def export_folder(self, folder="") -> None:
  44. folder = self.get_folder(folder, "XML")
  45. reports = self.api.get_reports_in_folder(folder, True)
  46. for r in reports:
  47. self.export_unstubbed(r["id"])
  48. def export_unstubbed(self, report_id):
  49. report = self.api.get_report(report_id)
  50. if "error" in report:
  51. return
  52. params = self.get_params(report, {})
  53. request = ReportRequest(
  54. report["id"], params, report["filename"], report["format"]
  55. )
  56. # test if execution of report is possible
  57. self.request_and_save_file(request, save=False)
  58. filename = (
  59. f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
  60. )
  61. modified = datetime.fromisoformat(report["modified"]).timestamp()
  62. if Path(filename).exists() and Path(filename).stat().st_mtime > modified:
  63. return
  64. unstubbed_report = self.api.request_unstubbed(report_id)
  65. if unstubbed_report:
  66. print(f"{report['path']}/{report['name']}")
  67. os.makedirs(os.path.dirname(filename), exist_ok=True)
  68. with open(filename, "w") as f:
  69. f.write(unstubbed_report)
  70. def get_folder_pdf_request_plan(self, folder=""):
  71. folder = self.get_folder(folder, "PDF")
  72. reports = self.api.get_reports_in_folder(folder, True)
  73. return [self.export_pdf(r["id"]) for r in reports]
  74. def export_pdf(self, report_id, folder=None):
  75. report = self.api.get_report(report_id)
  76. if report["type"] == "shortcut":
  77. report_link = report
  78. report = self.api.get_report(report["target_id"]).copy()
  79. for k in ("name", "description", "path", "type", "filename"):
  80. report[k] = report_link[k]
  81. json.dump(report, open("dump.json", "w"), indent=2)
  82. if "meta" not in report:
  83. logging.warning(report["name"] + " is not accessible!")
  84. return []
  85. if len(report["params"]) == 0:
  86. filename = report["filename"]
  87. params = self.get_params(report, {})
  88. return [ReportRequest(report["id"], params, filename, report["format"])]
  89. result = []
  90. if len(report["params"]) == 1:
  91. filename = report["filename"].format("_Summe")
  92. params = self.get_params(report, {})
  93. result.append(
  94. ReportRequest(report["id"], params, filename, report["format"])
  95. )
  96. key1 = report["params"][0]
  97. for k1, v1 in report["meta"]["optional"][key1].items():
  98. filename = report["filename"].format(v1)
  99. params = self.get_params(report, {key1: {k1: v1}})
  100. result.append(
  101. ReportRequest(report["id"], params, filename, report["format"])
  102. )
  103. return result
  104. if len(report["params"]) == 2:
  105. filename = report["filename"].format("_Summe", "").replace("_.", ".")
  106. params = self.get_params(report, {})
  107. result.append(
  108. ReportRequest(report["id"], params, filename, report["format"])
  109. )
  110. key1, key2 = report["params"]
  111. for k1, v1 in report["meta"]["optional"][key1].items():
  112. filename = report["filename"].format(v1, "_Summe")
  113. params = self.get_params(report, {key1: {k1: v1}})
  114. result.append(
  115. ReportRequest(report["id"], params, filename, report["format"])
  116. )
  117. for k2, v2 in report["meta"]["optional"][key2].items():
  118. filename = report["filename"].format(v1, v2)
  119. params = self.get_params(report, {key1: {k1: v1}, key2: {k2: v2}})
  120. result.append(
  121. ReportRequest(report["id"], params, filename, report["format"])
  122. )
  123. return result
  124. def get_params(self, report, optional_params):
  125. params = report["meta"]["required"].copy()
  126. params.update(optional_params)
  127. return params
  128. def execute_request_plan(self, req_plan):
  129. for req_group in req_plan:
  130. for report_req in req_group:
  131. self.request_and_save_file(report_req)
  132. def request_and_save_file(self, report_request: ReportRequest, save=True):
  133. logging.debug(report_request.filename)
  134. logging.debug(report_request.params)
  135. status_code, content = self.api.request_file(
  136. report_request.report_id,
  137. report_request.params,
  138. report_request.report_format,
  139. )
  140. if status_code != 200:
  141. return
  142. if not save:
  143. return
  144. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  145. with open(report_request.filename, "wb") as f:
  146. f.write(content)
  147. def get_merge_group(self, req_plan):
  148. res = {}
  149. for req_group in req_plan:
  150. if len(req_group) == 1:
  151. continue
  152. files = [r.filename for r in req_group]
  153. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  154. res[filename] = files
  155. files2 = [r for r in files if "_Summe" not in r]
  156. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  157. res[filename2] = files2
  158. return res
  159. def export_errors(self):
  160. reports = self.api.get_reports_in_folder(
  161. "Team Content", recursive=True, specs=True
  162. )
  163. errors = [r for r in reports if "error" in r]
  164. filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
  165. json.dump(errors, open(filename, "w"), indent=2)
  166. def mail_template(self, folder: str):
  167. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  168. subfolders = set(
  169. [f.name for f in current_dir.glob("*") if f.is_dir()]
  170. ).difference([".", "..", "leer"])
  171. folder2 = folder + "\\" if folder != "" else ""
  172. export_files = [
  173. f"{folder2}{f.name};"
  174. for f in current_dir.glob("*.*")
  175. if f.suffix in [".pdf", ".xls"]
  176. ]
  177. if len(export_files) > 0:
  178. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  179. template_file = Path(
  180. self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv"
  181. )
  182. print(template_file)
  183. with template_file.open("w") as fwh:
  184. fwh.write("Report;Empfaenger\n")
  185. fwh.write("\n".join(export_files))
  186. for f in subfolders:
  187. subfolder = folder + "\\" + f if folder != "" else f
  188. self.mail_template(subfolder)
  189. if __name__ == "__main__":
  190. api = c11_api()
  191. api.login()
  192. pdf = c11_export(None, api)
  193. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  194. # pdf.export_folder('Team Content/Aftersales/1. Service')
  195. pdf.export_errors()