import pandas as pd import numpy as np import xml.etree.ElementTree as ET import csv from xml.dom import minidom from datetime import datetime import logging base_dir = '/home/robert/projekte/python/gcstruct/Luchtenberg/' account_translation = base_dir + 'Kontenrahmen_kombiniert.csv' account_bookings = base_dir + 'GuV_Bilanz_Salden.csv' first_month_of_financial_year = '01' current_year = '2022' current_month = '01' logging.basicConfig( filename=base_dir + f'protokoll_{current_year}-{current_month}.log', encoding='utf-8', level=logging.DEBUG ) account_ignored = base_dir + f'ignoriert_{current_year}-{current_month}.csv' account_invalid = base_dir + f'ungueltig_{current_year}-{current_month}.csv' last_year = str(int(current_year) - 1) last_year2 = str(int(current_year) - 2) next_year = str(int(current_year) + 1) pd.set_option('display.max_rows', 500) pd.set_option('display.float_format', lambda x: '%.2f' % x) def header(makes, sites): return { 'Country': 'DE', 'MainBmCode': sites[0]['Marke_HBV'], 'Month': current_month, 'Year': current_year, 'Currency': 'EUR', 'NumberOfMakes': len(makes), 'NumberOfSites': len(sites), 'ExtractionDate': datetime.now().strftime('%d.%m.%Y'), 'ExtractionTime': datetime.now().strftime('%H:%M:%S'), 'BeginFiscalYear': first_month_of_financial_year } def bookkeep_filter(): period = [current_year + str(i).zfill(2) for i in range(1, 13)] if first_month_of_financial_year != '01': if first_month_of_financial_year > current_month: period = [last_year + str(i).zfill(2) for i in range(1, 13)] + period else: period = period + [next_year + str(i).zfill(2) for i in range(1, 13)] fm = int(first_month_of_financial_year) period = period[fm - 1:fm + 12] rename_to = ['Period' + str(i).zfill(2) for i in range(1, 13)] return dict(zip(period, rename_to)) def extract_acct_info(df: pd.DataFrame): acct_info = ['Marke', 'Standort', 'Konto_Nr', 'Kostenstelle', 'Absatzkanal', 'Kostenträger'] acct_pos = [(0, 2), (3, 5), (6, 10), (11, 13), (14, 16), (17, 19)] for info, pos in zip(acct_info, acct_pos): df[info] = df.index.str.slice(*pos) return df def export_month(): # Übersetzungstabelle laden df_translate = pd.read_csv(account_translation, decimal=',', sep=';', encoding='latin-1', converters={i: str for i in range(0, 200)}) logging.info(df_translate.shape) df_translate['duplicated'] = df_translate.duplicated() logging.info(df_translate[df_translate['duplicated']]) df_translate.drop(columns=['duplicated'], inplace=True) df_translate.drop_duplicates(inplace=True) df_translate.set_index('Konto_Nr_Händler') # Kontensalden laden df_bookings = pd.read_csv(account_bookings, decimal=',', sep=';', encoding='latin-1', converters={0: str, 1: str}) # Kontensalden auf gegebenen Monat filtern filter_from = current_year + first_month_of_financial_year filter_prev = last_year + first_month_of_financial_year if first_month_of_financial_year > current_month: filter_from = last_year + first_month_of_financial_year filter_prev = last_year2 + first_month_of_financial_year filter_to = current_year + current_month df_opening_balance = df_bookings[(df_bookings['Bookkeep Period'] >= filter_prev) & (df_bookings['Bookkeep Period'] <= filter_from)] df_opening_balance = df_opening_balance.merge(df_translate, how='inner', on='Konto_Nr_Händler') df_opening_balance = df_opening_balance[(df_opening_balance['Konto_Nr'].str.match(r'^[013]'))] df_opening_balance['amount'] = (df_opening_balance['Debit Amount'] + df_opening_balance['Credit Amount']).round(2) df_opening_balance.drop(columns=['Debit Amount', 'Credit Amount', 'Debit Quantity', 'Credit Quantity'], inplace=True) df_opening_balance = df_opening_balance.groupby(['Marke', 'Standort']).sum() logging.info('Gewinn/Verlustvortrag') logging.info(df_opening_balance) logging.info(df_opening_balance.sum()) df_bookings = df_bookings[(df_bookings['Bookkeep Period'] >= filter_from) & (df_bookings['Bookkeep Period'] <= filter_to)] bk_filter = bookkeep_filter() period_no = list(bk_filter.keys()).index(filter_to) + 1 # Spalten konvertieren df_bookings['period'] = df_bookings['Bookkeep Period'].apply(lambda x: bk_filter[x]) df_bookings['amount'] = (df_bookings['Debit Amount'] + df_bookings['Credit Amount']).round(2) df_bookings['quantity'] = (df_bookings['Debit Quantity'] + df_bookings['Credit Quantity']).round(2) logging.info('df_bookings: ' + str(df_bookings.shape)) # Join auf Übersetzung df_combined = df_bookings.merge(df_translate, how='inner', on='Konto_Nr_Händler') logging.info('df_combined: ' + str(df_combined.shape)) makes = df_combined[['Marke', 'Marke_HBV']].drop_duplicates().sort_values(by=['Marke']) sites = df_combined[['Marke', 'Standort', 'Marke_HBV']].drop_duplicates().sort_values(by=['Marke', 'Standort']) # df_combined.to_csv(account_invalid, decimal=',', sep=';', encoding='latin-1', index=False) # Gruppieren # df_grouped = df_combined.groupby(['Konto_Nr_SKR51', 'period']).sum() df = df_combined.pivot_table(index=['Konto_Nr_SKR51'], columns=['period'], values='amount', aggfunc=np.sum, margins=True, margins_name='CumulatedYear') logging.info('df_pivot: ' + str(df.shape)) df = extract_acct_info(df) # df = df_translate.reset_index(drop=True).drop(columns=['Kostenträger_Ebene']).drop_duplicates() logging.info(df.shape) logging.info(df.columns) logging.info(df.head()) # df = df.merge(df_translate, how='inner', on='Konto_Nr_SKR51') logging.info('df: ' + str(df.shape)) df['Bilanz'] = (df['Konto_Nr'].str.match(r'^[013]')) df['Kontoart'] = np.where(df['Bilanz'], '1', '2') df['Konto_1'] = (df['Konto_Nr'].str.slice(0, 1)) df_debug = df.drop(columns=['Bilanz']) logging.info(df_debug.groupby(['Kontoart']).sum()) logging.info(df_debug.groupby(['Konto_1']).sum()) logging.info(df_debug.groupby(['Konto_Nr']).sum()) df_debug.groupby(['Konto_Nr']).sum().to_csv(base_dir + 'debug.csv', decimal=',', sep=';', encoding='latin-1') # Bereinigung GW-Kostenträger df['GW_Verkauf_1'] = (df['Konto_Nr'].str.match(r'^[78]0')) & (df['Kostenstelle'].str.match(r'^[^1]\d')) df['Kostenstelle'] = np.where(df['GW_Verkauf_1'] == True, '11', df['Kostenstelle']) df['GW_Verkauf_2'] = (df['Konto_Nr'].str.match(r'^[78]1')) & (df['Kostenstelle'].str.match(r'^[^2]\d')) df['Kostenstelle'] = np.where(df['GW_Verkauf_2'] == True, '21', df['Kostenstelle']) df['GW_Verkauf_3'] = (df['Konto_Nr'].str.match(r'^[78]3')) & (df['Kostenstelle'].str.match(r'^[^3]\d')) df['Kostenstelle'] = np.where(df['GW_Verkauf_3'] == True, '31', df['Kostenstelle']) df['GW_Verkauf_4'] = (df['Konto_Nr'].str.match(r'^[78]4')) & (df['Kostenstelle'].str.match(r'^[^4]\d')) df['Kostenstelle'] = np.where(df['GW_Verkauf_4'] == True, '41', df['Kostenstelle']) df['GW_Verkauf_5'] = (df['Konto_Nr'].str.match(r'^[78]5')) & (df['Kostenstelle'].str.match(r'^[^5]\d')) df['Kostenstelle'] = np.where(df['GW_Verkauf_5'] == True, '51', df['Kostenstelle']) df['GW_Verkauf_50'] = (df['Konto_Nr'].str.match(r'^[78]')) & (df['Kostenstelle'].str.match(r'^2')) df['Kostenträger'] = np.where(df['GW_Verkauf_50'] == True, '52', df['Kostenträger']) df['Kostenträger'] = np.where((df['GW_Verkauf_50'] == True) & (df['Marke'] == '01'), '50', df['Kostenträger']) df['Kostenträger'] = np.where(df['Bilanz'] == True, '00', df['Kostenträger']) df['Konto_5er'] = (df['Konto_Nr'].str.match('^5')) df['Absatzkanal'] = np.where(df['Konto_5er'] == True, '99', df['Absatzkanal']) df['Teile_30_60'] = (df['Konto_Nr'].str.match(r'^[578]')) & \ (df['Kostenstelle'].str.match(r'^[3]')) & \ (df['Kostenträger'].str.match(r'^[^6]')) df['Kostenträger'] = np.where(df['Teile_30_60'] == True, '60', df['Kostenträger']) df['Service_40_70'] = (df['Konto_Nr'].str.match(r'^[578]')) & \ (df['Kostenstelle'].str.match(r'^[4]')) & \ (df['Kostenträger'].str.match(r'^[^7]')) df['Kostenträger'] = np.where(df['Service_40_70'] == True, '70', df['Kostenträger']) from_label = ['Marke', 'Standort', 'Konto_Nr', 'Kostenstelle', 'Absatzkanal', 'Kostenträger'] to_label = ['Make', 'Site', 'Account', 'Origin', 'SalesChannel', 'CostCarrier'] df = df.rename(columns=dict(zip(from_label, to_label))) makes = makes.rename(columns=dict(zip(from_label, to_label))).to_dict(orient='records') sites = sites.rename(columns=dict(zip(from_label, to_label))).to_dict(orient='records') df['CostAccountingString'] = df['Make'] + df['Site'] + df['Origin'] + \ df['SalesChannel'] + df['CostCarrier'] df['IsNumeric'] = df['CostAccountingString'].str.isdigit() df_invalid = df[df['IsNumeric'] == False] df_invalid.to_csv(account_invalid + '.2.csv', decimal=',', sep=';', encoding='latin-1', index=False) export_csv = base_dir + 'export_' + current_year + '-' + current_month + '.csv' xmlfile = export_csv[:-4] + '.xml' df.to_csv(export_csv, decimal=',', sep=';', encoding='latin-1', index=False) df = df[df['IsNumeric']].groupby(['Account', 'Make', 'Site', 'Origin', 'SalesChannel', 'CostCarrier', 'CostAccountingString'], as_index=False).sum() # Infos ergänzen df['Decimals'] = 2 df['OpeningBalance'] = 0.0 logging.info(df.shape) export_file = export_xml(df.to_dict(orient='records'), bk_filter, period_no, makes, sites, xmlfile) # Join auf Übersetzung - nicht zugeordnet df_ignored = df_bookings.merge(df_translate, how='left', on='Konto_Nr_Händler') df_ignored = df_ignored[df_ignored['Konto_Nr_SKR51'].isna()] # [['Konto_Nr_Händler', 'Bookkeep Period', 'amount', 'quantity']] if not df_ignored.empty: df_ignored = df_ignored.pivot_table(index=['Konto_Nr_Händler'], columns=['period'], values='amount', aggfunc=np.sum, margins=True, margins_name='CumulatedYear') df_ignored.to_csv(account_ignored, decimal=',', sep=';', encoding='latin-1') return export_file def export_xml(records, bk_filter, period_no, makes, sites, export_file): record_elements = ['Account', 'Make', 'Site', 'Origin', 'SalesChannel', 'CostCarrier', 'CostAccountingString', 'Decimals', 'OpeningBalance'] + list(bk_filter.values())[:period_no] + ['CumulatedYear'] root = ET.Element('HbvData') h = ET.SubElement(root, 'Header') for k, v in header(makes, sites).items(): ET.SubElement(h, k).text = str(v) make_list = ET.SubElement(root, 'MakeList') for m in makes: e = ET.SubElement(make_list, 'MakeListEntry') ET.SubElement(e, 'Make').text = m['Make'] ET.SubElement(e, 'MakeCode').text = m['Marke_HBV'] bm_code_list = ET.SubElement(root, 'BmCodeList') for s in sites: e = ET.SubElement(bm_code_list, 'BmCodeEntry') ET.SubElement(e, 'Make').text = s['Make'] ET.SubElement(e, 'Site').text = s['Site'] ET.SubElement(e, 'BmCode').text = s['Marke_HBV'] record_list = ET.SubElement(root, 'RecordList') for row in records: record = ET.SubElement(record_list, 'Record') for e in record_elements: child = ET.SubElement(record, e) field = row.get(e, 0.0) if str(field) == 'nan': field = '0' elif type(field) is float: field = '{:.0f}'.format(field * 100) child.text = str(field) with open(export_file, 'w', encoding='utf-8') as fwh: fwh.write(minidom.parseString(ET.tostring(root)).toprettyxml(indent=' ')) # with open(export_file, 'wb') as fwh: # fwh.write(ET.tostring(root)) return export_file def convert_to_row(node): return [child.text for child in node] def convert_xml_to_csv(xmlfile, csvfile): record_list = ET.parse(xmlfile).getroot().find('RecordList') header = [child.tag for child in record_list.find('Record')] bookings = [convert_to_row(node) for node in record_list.findall('Record')] with open(csvfile, 'w') as fwh: cwh = csv.writer(fwh, delimiter=';') cwh.writerow(header) cwh.writerows(bookings) return True def convert_csv_to_xml(csvfile, xmlfile): makes = [{'Make': '01', 'Marke_HBV': '1844'}] sites = [{'Make': '01', 'Site': '01', 'Marke_HBV': '1844'}] with open(csvfile, 'r', encoding='latin-1') as frh: csv_reader = csv.DictReader(frh, delimiter=';') export_xml(csv_reader, bookkeep_filter(), 1, makes, sites, xmlfile) if __name__ == '__main__': export_file = export_month() # convert_xml_to_csv(export_file, export_file[:-4] + '.csv') # convert_xml_to_csv('DE2119_2105_150621_080944.xml', 'DE2119_2105_150621_080944.csv') # convert_csv_to_xml(base_dir + '../SKR51/maximum.csv', base_dir + '../maximum_2022-01.xml')