ift_convert.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import csv
  2. import re
  3. from pathlib import Path
  4. from itertools import chain
  5. class IFTConverter:
  6. def __init__(self, base_dir):
  7. self.config = {
  8. '_A_': {
  9. '1': self.import_config(base_dir + 'actuals_header.txt'),
  10. '2': self.import_config(base_dir + 'actuals_item.txt')
  11. },
  12. '_B_': {
  13. '010': self.import_config(base_dir + 'budget_line.txt')
  14. },
  15. '_C_': {
  16. '010': self.import_config(base_dir + 'commitment_line.txt')
  17. },
  18. '_K_': {
  19. '1': self.import_config(base_dir + 'controllingdoc_header.txt'),
  20. '2': self.import_config(base_dir + 'controllingdoc_item.txt')
  21. },
  22. '_P_': {
  23. '010': self.import_config(base_dir + 'plan_line.txt')
  24. }
  25. }
  26. self.is_number = re.compile(r'\d+\.\d+\-?$')
  27. def import_config(self, filename):
  28. with open(filename, 'r') as frh:
  29. return [(int(line['start']) - 1, int(line['start']) + int(line['length']) - 1)
  30. for line in csv.DictReader(frh, delimiter='\t')]
  31. def convert_dir(self, path):
  32. source_path = Path(path)
  33. target_path = source_path.parent.joinpath('staging')
  34. for filename in source_path.glob('*'):
  35. print(filename.name)
  36. if filename.name.count('_') < 2:
  37. print('-> wrong file format')
  38. elif filename.stat().st_size == 0:
  39. print('-> file is empty!')
  40. else:
  41. self.convert_file(filename, target_path)
  42. def convert_file(self, source, target_path):
  43. cfg = self.conversion_config(source.name)
  44. target = target_path.joinpath(source.name + '.csv')
  45. content = {}
  46. last_key = list(cfg.keys())[-1]
  47. with open(source, 'r', encoding='utf-8', errors='ignore') as frh:
  48. with open(target, 'w', encoding='utf-8') as fwh:
  49. for line in frh.readlines():
  50. for key, rules in cfg.items():
  51. if line.startswith(key):
  52. content[key] = self.convert_line(line, rules)
  53. break
  54. if line.startswith(last_key):
  55. fwh.write('\t'.join(chain(*content.values())) + '\n')
  56. def convert_line(self, line, rules):
  57. return [self.convert_field(line, rule) for rule in rules]
  58. def convert_field(self, line, rule):
  59. field = line[rule[0]:rule[1]].strip()
  60. if self.is_number.search(field):
  61. field = field.replace(',', '')
  62. if field[-1] == '-':
  63. field = '-' + field[:-1]
  64. return field
  65. def conversion_config(self, filename):
  66. for key, cfg in self.config.items():
  67. if key in filename:
  68. return cfg
  69. return {
  70. '0': []
  71. }
  72. def main():
  73. ift_conv = IFTConverter('E:\\GlobalCube\\Tasks\\Import\\config\\IFT\\')
  74. ift_conv.convert_dir('E:\\IFT\\prod')
  75. if __name__ == '__main__':
  76. main()