1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- from collections import defaultdict
- from plan_values2 import VALUES2_HEADER
- import json
- import csv
- import re
- from pathlib import Path
- class PlannerCombineFiles:
- def __init__(self, year, version, timestamp, username):
- self.year = year
- self.version = version
- self.timestamp = timestamp
- self.username = username
- self.base_dir = Path(__file__).parent.parent.resolve() / "save"
- self.file_list = [f.name for f in Path(self.base_dir).glob("*.*") if f.is_file()]
- self.file_list.sort()
- @property
- def data_file(self):
- file = f"data_{self.year}_{self.version}_{self.timestamp}.csv"
- file_search = rf"data_{self.year}_{self.version}_\d+\.csv"
- return self.get_filename(file, file_search)
- @property
- def user_file(self):
- file = f"user_{self.timestamp}.csv"
- file_search = rf"user_{self.username}_\d+\.json"
- return self.get_filename(file, file_search)
- @property
- def options_file(self):
- file = f"options_{self.year}_{self.version}_{self.timestamp}.json"
- file_search = rf"options_{self.year}_{self.version}_\d+\.json"
- return self.get_filename(file, file_search)
- @property
- def struct_file(self):
- file = f"struct_{self.year}_{self.version}_{self.timestamp}.json"
- file_search = rf"struct_{self.year}_{self.version}_\d+\.json"
- return self.get_filename(file, file_search)
- def get_filename(self, file, file_search):
- if not Path(self.base_dir + file).exists():
- data_files = [f for f in self.file_list if re.search(file_search, f)]
- if len(data_files) > 0:
- file = data_files[-1]
- return self.base_dir + file
- def __call__(self):
- self.struct = self.load_scaffold(self.struct_file)
- self.add_data_file(self.data_file)
- self.add_user_file(self.user_file)
- self.add_options_file(self.options_file)
- return self.struct
- def load_scaffold(self, struct_file):
- with open(struct_file, "r") as frh:
- res = json.load(frh)
- self.scaffold = dict([(s["id"], s) for s in res])
- return res
- def add_data_file(self, data_file):
- with open(data_file, "r") as frh:
- csv_reader = csv.reader(frh, delimiter="\t")
- _ = next(csv_reader)
- values2 = defaultdict(dict)
- for row in csv_reader:
- id = row.pop(0)
- department = row.pop(0)
- values2[id][department] = row
- for id, v2 in values2.items():
- self.scaffold[id]["values2"] = v2
- def add_user_file(self, user_file):
- with open(user_file, "r") as frh:
- res = json.load(frh)
- for id, s in self.scaffold.items():
- s["drilldown"] = res["drilldown"].get(id, False)
- def add_options_file(self, options_file):
- with open(options_file, "r") as frh:
- res = json.load(frh)
- for id, s in self.scaffold.items():
- if id in res:
- s["options"] = res[id]
- if __name__ == "__main__":
- pcf = PlannerCombineFiles("2023", "V1", "current", "global")
- res = pcf()
- assert res is not None
|