1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import csv
- import re
- from pathlib import Path
- from itertools import chain
- class IFTConverter:
- def __init__(self, base_dir):
- self.config = {
- '_A_': {
- '1': self.import_config(base_dir + 'actuals_header.txt'),
- '2': self.import_config(base_dir + 'actuals_item.txt')
- },
- '_B_': {
- '010': self.import_config(base_dir + 'budget_line.txt')
- },
- '_C_': {
- '010': self.import_config(base_dir + 'commitment_line.txt')
- },
- '_K_': {
- '1': self.import_config(base_dir + 'controllingdoc_header.txt'),
- '2': self.import_config(base_dir + 'controllingdoc_item.txt')
- },
- '_P_': {
- '010': self.import_config(base_dir + 'plan_line.txt')
- }
- }
- self.is_number = re.compile(r'\d+\.\d+\-?$')
- def import_config(self, filename):
- with open(filename, 'r') as frh:
- return [(int(line['start']) - 1, int(line['start']) + int(line['length']) - 1)
- for line in csv.DictReader(frh, delimiter='\t')]
- def convert_dir(self, path):
- source_path = Path(path)
- target_path = source_path.parent.joinpath('staging')
- for filename in source_path.glob('*'):
- print(filename.name)
- if filename.name.count('_') < 2:
- print('-> wrong file format')
- elif filename.stat().st_size == 0:
- print('-> file is empty!')
- else:
- self.convert_file(filename, target_path)
- def convert_file(self, source, target_path):
- cfg = self.conversion_config(source.name)
- target = target_path.joinpath(source.name + '.csv')
- content = {}
- last_key = list(cfg.keys())[-1]
- with open(source, 'r', encoding='utf-8', errors='ignore') as frh:
- with open(target, 'w', encoding='utf-8') as fwh:
- for line in frh.readlines():
- for key, rules in cfg.items():
- if line.startswith(key):
- content[key] = self.convert_line(line, rules)
- break
- if line.startswith(last_key):
- fwh.write('\t'.join(chain(*content.values())) + '\n')
- def convert_line(self, line, rules):
- return [self.convert_field(line, rule) for rule in rules]
- def convert_field(self, line, rule):
- field = line[rule[0]:rule[1]].strip()
- if self.is_number.search(field):
- field = field.replace(',', '')
- if field[-1] == '-':
- field = '-' + field[:-1]
- return field
- def conversion_config(self, filename):
- for key, cfg in self.config.items():
- if key in filename:
- return cfg
- return {
- '0': []
- }
- def main():
- ift_conv = IFTConverter('E:\\GlobalCube\\Tasks\\Import\\config\\IFT\\')
- ift_conv.convert_dir('E:\\IFT\\prod')
- if __name__ == '__main__':
- main()
|