c11_export.py 8.0 KB


  1. from pathlib import Path
  2. from cognos11.c11_api import c11_api
  3. import config
  4. import json
  5. import logging
  6. import os
  7. from datetime import datetime
  8. from dataclasses import dataclass
  9. @dataclass
  10. class ReportRequest:
  11. report_id: str
  12. params: dict
  13. filename: str
  14. report_format: str
  15. class c11_export:
  16. api: c11_api
  17. cfg: config.Config = None
  18. def __init__(self, cfg=None, api=None):
  19. self.cfg = cfg
  20. if cfg is None:
  21. self.cfg = config.Config()
  22. self.api = api
  23. if api is None:
  24. # if self.cfg.cognos11.
  25. self.api = c11_api(cfg).login()
  26. now = datetime.now().strftime("%Y%m%d_%H%M%S")
  27. prot_file = f"{self.cfg.cognos11.logs_dir}/error_{now}.log"
  28. os.makedirs(self.cfg.cognos11.logs_dir, exist_ok=True)
  29. logging.basicConfig(
  30. filename=prot_file,
  31. filemode="w",
  32. encoding="utf-8",
  33. level=logging.DEBUG,
  34. force=True,
  35. )
  36. @staticmethod
  37. def get_folder(folder, format="PDF"):
  38. if folder == "":
  39. folder = "Team Content" if format == "XML" else "Team Content/ReportOutput"
  40. elif not folder.startswith("Team Content"):
  41. folder = "Team Content/ReportOutput/" + folder
  42. return folder
  43. def export_folder(self, folder="") -> None:
  44. folder = self.get_folder(folder, "XML")
  45. reports = self.api.get_reports_in_folder(folder, True)
  46. for r in reports:
  47. print(r["name"])
  48. self.export_unstubbed(r["id"])
  49. def export_unstubbed(self, report_id):
  50. report = self.api.get_report(report_id)
  51. unstubbed_report = self.api.request_unstubbed(report_id)
  52. if unstubbed_report:
  53. filename = (
  54. f"{self.cfg.cognos11.specs_dir}/{report['path']}/{report['name']}.xml"
  55. )
  56. os.makedirs(os.path.dirname(filename), exist_ok=True)
  57. with open(filename, "w") as f:
  58. f.write(unstubbed_report)
  59. def get_folder_pdf_request_plan(self, folder=""):
  60. folder = self.get_folder(folder, "PDF")
  61. reports = self.api.get_reports_in_folder(folder, True)
  62. return [self.export_pdf(r) for r in reports]
  63. def export_pdf(self, report_id, folder=None):
  64. report = self.api.get_report(report_id)
  65. if report["type"] == "shortcut":
  66. report_link = report
  67. report = self.api.get_report(report["target_id"]).copy()
  68. for k in ("name", "description", "path", "type", "filename"):
  69. report[k] = report_link[k]
  70. json.dump(report, open("dump.json", "w"), indent=2)
  71. if "meta" not in report:
  72. logging.warning(report["name"] + " is not accessible!")
  73. return False
  74. params = {}
  75. if len(report["meta"]["required"]) > 0:
  76. if set(report["meta"]["required"].keys()) != {"p_Von", "p_Bis"}:
  77. return False
  78. params["p_Von"] = {"2022-10-12": "12.10.2022"}
  79. params["p_Bis"] = {"2022-10-12": "12.10.2022"}
  80. # params["p_Zeitraum"] = {"Einzelne Monate": "Einzelne Monate"}
  81. for k, v in report["meta"]["optional"].items():
  82. if k in ["p_Zeit", "p_Auswahl_Monate", "p_12_Monate"]:
  83. for k1, v1 in reversed(v.items()):
  84. if v1 != "Invalid Dates":
  85. params[k] = {k1: v1}
  86. continue # use last element only
  87. if len(report["params"]) == 0:
  88. filename = report["filename"]
  89. return [ReportRequest(report["id"], params, filename, report["format"])]
  90. result = []
  91. if len(report["params"]) == 1:
  92. filename = report["filename"].format("_Summe")
  93. result.append(
  94. ReportRequest(report["id"], params, filename, report["format"])
  95. )
  96. key1 = report["params"][0]
  97. for k1, v1 in report["meta"]["optional"][key1].items():
  98. filename = report["filename"].format(v1)
  99. params[key1] = {k1: v1}
  100. result.append(
  101. ReportRequest(report["id"], params, filename, report["format"])
  102. )
  103. return result
  104. if len(report["params"]) == 2:
  105. filename = report["filename"].format("_Summe", "").replace("_.", ".")
  106. result.append(
  107. ReportRequest(report["id"], params, filename, report["format"])
  108. )
  109. key1, key2 = report["params"]
  110. for k1, v1 in report["meta"]["optional"][key1].items():
  111. filename = report["filename"].format(v1, "_Summe")
  112. params = {}
  113. params[key1] = {k1: v1}
  114. result.append(
  115. ReportRequest(report["id"], params, filename, report["format"])
  116. )
  117. for k2, v2 in report["meta"]["optional"][key2].items():
  118. filename = report["filename"].format(v1, v2)
  119. params[key2] = {k2: v2}
  120. result.append(
  121. ReportRequest(report["id"], params, filename, report["format"])
  122. )
  123. return result
  124. def execute_request_plan(self, req_plan):
  125. for req_group in req_plan:
  126. for report_req in req_group:
  127. self.request_and_save_file(report_req)
  128. def request_and_save_file(self, report_request: ReportRequest):
  129. logging.debug(report_request.filename)
  130. logging.debug(report_request.params)
  131. status_code, content = self.api.request_file(
  132. report_request.report_id,
  133. report_request.params,
  134. report_request.report_format,
  135. )
  136. if status_code == 200:
  137. os.makedirs(os.path.dirname(report_request.filename), exist_ok=True)
  138. with open(report_request.filename, "wb") as f:
  139. f.write(content)
  140. else:
  141. logging.warning(content)
  142. def get_merge_group(self, req_plan):
  143. res = {}
  144. for req_group in req_plan:
  145. if len(req_group) == 1:
  146. continue
  147. files = [r.filename for r in req_group]
  148. filename = files[0].replace("_Summe", "_Summe_und_Schichten")
  149. res[filename] = files
  150. files2 = [r for r in files if "_Summe" not in r]
  151. filename2 = filename.replace("_Summe_und_Schichten", "_nur_Schichten")
  152. res[filename2] = files2
  153. return res
  154. def export_errors(self):
  155. reports = self.api.get_reports_in_folder(
  156. "Team Content", recursive=True, specs=True
  157. )
  158. errors = [r for r in reports if "error" in r]
  159. filename = self.cfg.cognos11.logs_dir + "/c11_report_errors.json"
  160. json.dump(errors, open(filename, "w"), indent=2)
  161. def mail_template(self, folder: str):
  162. current_dir = Path(self.cfg.cognos11.reportoutput_dir + "\\" + folder)
  163. subfolders = set(
  164. [f.name for f in current_dir.glob("*") if f.is_dir()]
  165. ).difference([".", "..", "leer"])
  166. folder2 = folder + "\\" if folder != "" else ""
  167. export_files = [
  168. f"{folder2}{f.name};"
  169. for f in current_dir.glob("*.*")
  170. if f.suffix in [".pdf", ".xls"]
  171. ]
  172. if len(export_files) > 0:
  173. folder2 = folder.replace("\\", "_") if folder != "" else "Versand"
  174. template_file = Path(
  175. self.cfg.cognos11.config_dir + "\\" + folder2 + ".template.csv"
  176. )
  177. print(template_file)
  178. with template_file.open("w") as fwh:
  179. fwh.write("Report;Empfaenger\n")
  180. fwh.write("\n".join(export_files))
  181. for f in subfolders:
  182. subfolder = folder + "\\" + f if folder != "" else f
  183. self.mail_template(subfolder)
  184. if __name__ == "__main__":
  185. api = c11_api()
  186. api.login()
  187. pdf = c11_export(None, api)
  188. # pdf.export_folder('Team Content/Verkauf/1. Gesamtverkauf', 'PDF')
  189. # pdf.export_folder('Team Content/Aftersales/1. Service')
  190. pdf.export_errors()