12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import csv
- import re
- from pathlib import Path
- from itertools import chain
- class IFTConverter:
- def __init__(self, base_dir):
- self.config = {
- "_A_": {
- "1": self.import_config(base_dir + "actuals_header.txt"),
- "2": self.import_config(base_dir + "actuals_item.txt"),
- },
- "_B_": {"010": self.import_config(base_dir + "budget_line.txt")},
- "_C_": {"010": self.import_config(base_dir + "commitment_line.txt")},
- "_K_": {
- "1": self.import_config(base_dir + "controllingdoc_header.txt"),
- "2": self.import_config(base_dir + "controllingdoc_item.txt"),
- },
- "_P_": {"010": self.import_config(base_dir + "plan_line.txt")},
- }
- self.is_number = re.compile(r"\d+\.\d+\-?$")
- def import_config(self, filename):
- with open(filename, "r") as frh:
- return [
- (int(line["start"]) - 1, int(line["start"]) + int(line["length"]) - 1)
- for line in csv.DictReader(frh, delimiter="\t")
- ]
- def convert_dir(self, path):
- source_path = Path(path)
- target_path = source_path.parent.joinpath("staging")
- for filename in source_path.glob("*"):
- print(filename.name)
- if filename.name.count("_") < 2:
- print("-> wrong file format")
- elif filename.stat().st_size == 0:
- print("-> file is empty!")
- else:
- self.convert_file(filename, target_path)
- def convert_file(self, source, target_path):
- cfg = self.conversion_config(source.name)
- target = target_path.joinpath(source.name + ".csv")
- content = {}
- last_key = list(cfg.keys())[-1]
- with open(source, "r", encoding="utf-8", errors="ignore") as frh:
- with open(target, "w", encoding="utf-8") as fwh:
- for line in frh.readlines():
- for key, rules in cfg.items():
- if line.startswith(key):
- content[key] = self.convert_line(line, rules)
- break
- if line.startswith(last_key):
- fwh.write("\t".join(chain(*content.values())) + "\n")
- def convert_line(self, line, rules):
- return [self.convert_field(line, rule) for rule in rules]
- def convert_field(self, line, rule):
- field = line[rule[0] : rule[1]].strip()
- if self.is_number.search(field):
- field = field.replace(",", "")
- if field[-1] == "-":
- field = "-" + field[:-1]
- return field
- def conversion_config(self, filename):
- for key, cfg in self.config.items():
- if key in filename:
- return cfg
- return {"0": []}
- def main():
- ift_conv = IFTConverter("E:\\GlobalCube\\Tasks\\Import\\config\\IFT\\")
- ift_conv.convert_dir("E:\\IFT\\prod")
- if __name__ == "__main__":
- main()
|