ift_convert.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import csv
  2. import re
  3. from pathlib import Path
  4. from itertools import chain
  5. class IFTConverter:
  6. def __init__(self, base_dir):
  7. self.config = {
  8. "_A_": {
  9. "1": self.import_config(base_dir + "actuals_header.txt"),
  10. "2": self.import_config(base_dir + "actuals_item.txt"),
  11. },
  12. "_B_": {"010": self.import_config(base_dir + "budget_line.txt")},
  13. "_C_": {"010": self.import_config(base_dir + "commitment_line.txt")},
  14. "_K_": {
  15. "1": self.import_config(base_dir + "controllingdoc_header.txt"),
  16. "2": self.import_config(base_dir + "controllingdoc_item.txt"),
  17. },
  18. "_P_": {"010": self.import_config(base_dir + "plan_line.txt")},
  19. }
  20. self.is_number = re.compile(r"\d+\.\d+\-?$")
  21. def import_config(self, filename):
  22. with open(filename, "r") as frh:
  23. return [
  24. (int(line["start"]) - 1, int(line["start"]) + int(line["length"]) - 1)
  25. for line in csv.DictReader(frh, delimiter="\t")
  26. ]
  27. def convert_dir(self, path):
  28. source_path = Path(path)
  29. target_path = source_path.parent.joinpath("staging")
  30. for filename in source_path.glob("*"):
  31. print(filename.name)
  32. if filename.name.count("_") < 2:
  33. print("-> wrong file format")
  34. elif filename.stat().st_size == 0:
  35. print("-> file is empty!")
  36. else:
  37. self.convert_file(filename, target_path)
  38. def convert_file(self, source, target_path):
  39. cfg = self.conversion_config(source.name)
  40. target = target_path.joinpath(source.name + ".csv")
  41. content = {}
  42. last_key = list(cfg.keys())[-1]
  43. with open(source, "r", encoding="utf-8", errors="ignore") as frh:
  44. with open(target, "w", encoding="utf-8") as fwh:
  45. for line in frh.readlines():
  46. for key, rules in cfg.items():
  47. if line.startswith(key):
  48. content[key] = self.convert_line(line, rules)
  49. break
  50. if line.startswith(last_key):
  51. fwh.write("\t".join(chain(*content.values())) + "\n")
  52. def convert_line(self, line, rules):
  53. return [self.convert_field(line, rule) for rule in rules]
  54. def convert_field(self, line, rule):
  55. field = line[rule[0] : rule[1]].strip()
  56. if self.is_number.search(field):
  57. field = field.replace(",", "")
  58. if field[-1] == "-":
  59. field = "-" + field[:-1]
  60. return field
  61. def conversion_config(self, filename):
  62. for key, cfg in self.config.items():
  63. if key in filename:
  64. return cfg
  65. return {"0": []}
  66. def main():
  67. ift_conv = IFTConverter("E:\\GlobalCube\\Tasks\\Import\\config\\IFT\\")
  68. ift_conv.convert_dir("E:\\IFT\\prod")
  69. if __name__ == "__main__":
  70. main()