planner_combine_files.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from collections import defaultdict
  2. from plan_values2 import VALUES2_HEADER
  3. import json
  4. import csv
  5. import re
  6. from pathlib import Path
  7. class PlannerCombineFiles:
  8. def __init__(self, year, version, timestamp, username):
  9. self.year = year
  10. self.version = version
  11. self.timestamp = timestamp
  12. self.username = username
  13. self.base_dir = Path(__file__).parent.parent.resolve() / "save"
  14. self.file_list = [f.name for f in Path(self.base_dir).glob("*.*") if f.is_file()]
  15. self.file_list.sort()
  16. @property
  17. def data_file(self):
  18. file = f"data_{self.year}_{self.version}_{self.timestamp}.csv"
  19. file_search = rf"data_{self.year}_{self.version}_\d+\.csv"
  20. return self.get_filename(file, file_search)
  21. @property
  22. def user_file(self):
  23. file = f"user_{self.timestamp}.csv"
  24. file_search = rf"user_{self.username}_\d+\.json"
  25. return self.get_filename(file, file_search)
  26. @property
  27. def options_file(self):
  28. file = f"options_{self.year}_{self.version}_{self.timestamp}.json"
  29. file_search = rf"options_{self.year}_{self.version}_\d+\.json"
  30. return self.get_filename(file, file_search)
  31. @property
  32. def struct_file(self):
  33. file = f"struct_{self.year}_{self.version}_{self.timestamp}.json"
  34. file_search = rf"struct_{self.year}_{self.version}_\d+\.json"
  35. return self.get_filename(file, file_search)
  36. def get_filename(self, file, file_search):
  37. if not Path(self.base_dir + file).exists():
  38. data_files = [f for f in self.file_list if re.search(file_search, f)]
  39. if len(data_files) > 0:
  40. file = data_files[-1]
  41. return self.base_dir + file
  42. def __call__(self):
  43. self.struct = self.load_scaffold(self.struct_file)
  44. self.add_data_file(self.data_file)
  45. self.add_user_file(self.user_file)
  46. self.add_options_file(self.options_file)
  47. return self.struct
  48. def load_scaffold(self, struct_file):
  49. with open(struct_file, "r") as frh:
  50. res = json.load(frh)
  51. self.scaffold = dict([(s["id"], s) for s in res])
  52. return res
  53. def add_data_file(self, data_file):
  54. with open(data_file, "r") as frh:
  55. csv_reader = csv.reader(frh, delimiter="\t")
  56. _ = next(csv_reader)
  57. values2 = defaultdict(dict)
  58. for row in csv_reader:
  59. id = row.pop(0)
  60. department = row.pop(0)
  61. values2[id][department] = row
  62. for id, v2 in values2.items():
  63. self.scaffold[id]["values2"] = v2
  64. def add_user_file(self, user_file):
  65. with open(user_file, "r") as frh:
  66. res = json.load(frh)
  67. for id, s in self.scaffold.items():
  68. s["drilldown"] = res["drilldown"].get(id, False)
  69. def add_options_file(self, options_file):
  70. with open(options_file, "r") as frh:
  71. res = json.load(frh)
  72. for id, s in self.scaffold.items():
  73. if id in res:
  74. s["options"] = res[id]
  75. if __name__ == "__main__":
  76. pcf = PlannerCombineFiles("2023", "V1", "current", "global")
  77. res = pcf()
  78. assert res is not None