123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- from webservice.plan_values2 import VALUES2_MAPPING
- import json
- from pathlib import Path
- class PlannerLoad:
- base_dir: Path
- account_values: list = None
- account_info: list = None
- plan_values: list = None
- marketing_values: list = None
- new_structure: list
- structure: list
- config = {
- "department": [1, 2, 3, 10, 30, 40, 50, 55, 81, 82],
- }
- def __init__(self, base_dir: str):
- self.base_dir = Path(base_dir)
- def new_file(self, year: str):
- self.load_values(year)
- return self.convert_file(self.new_structure)
- def load_values(self, year: str):
- with open(self.base_dir.joinpath(f"accounts_{year}.json"), "r") as frh:
- self.account_values = json.load(frh)["values"]
- with open(self.base_dir.joinpath(f"planning_{year}.json"), "r") as frh:
- self.plan_values = json.load(frh)["values"]
- with open(self.base_dir.joinpath(f"marketing_{year}.json"), "r") as frh:
- self.marketing_values = json.load(frh)
- with open(self.base_dir.joinpath("gcstruct.json"), "r") as frh:
- gcstruct = json.load(frh)
- self.account_info = dict([(a["Konto_Nr"], a) for a in gcstruct["accounts"]])
- self.new_structure = gcstruct["flat"]["Struktur_FB"]
- def set_structure(self, structure: list):
- self.structure = structure
- def convert_file(self, structure_source):
- self.structure = []
- for s in structure_source:
- accts = s["accounts"]
- s["accounts"] = []
- s["planlevel"] = len(accts) > 0
- self.structure.append(s)
- for a in accts:
- if type(a) is str:
- acct = {
- "text": a + " - " + self.account_info[a]["Konto_Bezeichnung"],
- "id": a,
- "costcenter": self.account_info[a]["Konto_KST"],
- "values": [],
- "values2": None,
- }
- else:
- acct = a
- a = acct["id"]
- acct["id"] = s["id"].replace(";;", ";" + acct["text"] + ";", 1)
- s["children"].append(acct["id"])
- new_account = {
- "id": acct["id"],
- "text": acct["text"],
- "children": [],
- "children2": [],
- "parents": [s["id"]] + s["parents"],
- "accounts": [a],
- "costcenter": acct["costcenter"],
- "level": s["level"] + 1,
- "drilldown": False,
- "form": s["form"],
- "accountlevel": False,
- "absolute": True,
- "seasonal": True,
- "status": "0",
- "values": acct["values"],
- "values2": acct["values2"],
- }
- self.structure.append(new_account)
- for s in self.structure:
- self.get_values2(s)
- s["options"] = self.get_options(s)
- del s["accountlevel"]
- del s["absolute"]
- del s["seasonal"]
- del s["status"]
- s["sumvalues"] = [0] * 30
- return self.structure
- def get_values2(self, s):
- if "values2" not in s or s["values2"] is None or len(s["values2"].keys()) == 0:
- s["values2"] = dict([(str(d), [0] * 30) for d in self.config["department"]])
- else:
- for d in s["values2"].keys():
- if len(s["values2"][d]) < 30:
- s["values2"][d] = [
- s["values2"][d][i] if 0 <= i < len(s["values2"][d]) else 0 for i in VALUES2_MAPPING
- ]
- self.update_account_values(s)
- self.update_plan_values(s)
- self.update_marketing_values(s)
- def update_values(self, value_type):
- v_types = {
- "accounts": self.update_account_values,
- "plan": self.update_plan_values,
- "marketing": self.update_marketing_values,
- }
- parents = []
- for s in self.structure:
- if v_types[value_type](s):
- parents.append(s["parents"][0])
- if value_type == "marketing":
- for p_id in set(parents):
- parent = self.get_structure_by_id(p_id)
- parent["form"] = "9"
- for d in parent["options"].keys():
- parent["options"][d]["status"] = "0"
- for d in parent["values2"].keys():
- for i in range(15):
- parent["values2"][d][i] = 0.0
- return self.structure
- def get_structure_by_id(self, id):
- for s in self.structure:
- if s["id"] == id:
- return s
- return None
- def update_account_values(self, s):
- if len(s["accounts"]) == 0:
- return False
- a_values = self.account_values.get(s["accounts"][0], dict())
- for d in s["values2"].keys():
- if d in a_values:
- for i, v in enumerate(a_values[d], 20):
- s["values2"][d][i] = v
- return s["values2"]
- def update_plan_values(self, s):
- if self.plan_values is None:
- return False
- p_values = self.plan_values.get(s["id"], dict())
- for d in s["values2"].keys():
- if d in p_values:
- for i, v in enumerate(p_values[d], 28):
- s["values2"][d][i] = v
- return True
- def update_marketing_values(self, s):
- if len(s["accounts"]) == 0:
- return False
- m_values = self.marketing_values.get(s["accounts"][0], dict())
- if not m_values:
- return False
- for d in s["values2"].keys():
- if d in m_values:
- for i, v in enumerate(m_values[d], 1):
- s["values2"][d][i] = v
- s["values2"][d][0] = sum(m_values[d])
- s["values2"][d][14] = sum(m_values[d])
- if "options" in s:
- s["options"][d]["seasonal"] = False
- return True
- def convert_values2(self, s):
- a_values = {}
- if s["accounts"]:
- a_values = self.account_values.get(s["accounts"][0], dict())
- p_values = self.plan_values.get(s["id"], dict())
- values = dict([(str(d), [0] * 30) for d in self.config["department"]])
- for d in values.keys():
- if d in s["values2"]:
- if len(s["values2"][d]) == 30:
- values[d] = s["values2"][d]
- else:
- values[d] = [s["values2"][d].get(i, 0) for i in VALUES2_MAPPING]
- if d in p_values:
- for i, v in enumerate(p_values[d], 28):
- values[d][i] = v
- if d in a_values:
- for i, v in enumerate(a_values[d], 20):
- values[d][i] = v
- return values
- def get_options(self, s):
- status = "0"
- seasonal = True
- absolute = True
- planlevel = False
- if isinstance(s["absolute"], bool):
- absolute = s["absolute"]
- s["absolute"] = {}
- if isinstance(s["seasonal"], bool):
- seasonal = s["seasonal"]
- s["seasonal"] = {}
- if isinstance(s["status"], str):
- status = s["status"]
- s["status"] = {}
- if "planlevel" in s:
- planlevel = s["planlevel"]
- opts = dict(
- [
- (str(d), {"absolute": absolute, "seasonal": seasonal, "status": status, "planlevel": planlevel})
- for d in self.config["department"]
- ]
- )
- for d in opts.keys():
- if d in s["absolute"]:
- opts[d]["absolute"] = s["absolute"][d]
- if d in s["seasonal"]:
- opts[d]["seasonal"] = s["seasonal"][d]
- if d in s["status"]:
- opts[d]["status"] = s["status"][d]
- return opts
- if __name__ == "__main__":
- planner_dir = Path(__file__).parent.parent.resolve() / "export"
- p_load = PlannerLoad(planner_dir)
- print(p_load.convert_file(json.load(open(planner_dir / "2022_V1.json", "r")))[12])
|