import csv import logging import os import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import datetime from pathlib import Path from xml.dom import minidom import numpy as np import pandas as pd ACCOUNT_INFO = [ "Account", "Make", "Site", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString", ] TRANSLATE = [ "Konto_Nr_Händler", "Konto_Nr_SKR51", "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "Kontoart", "Konto_1", "KRM", "IsNumeric", ] @dataclass class GchrConfig: first_month_of_financial_year: str class GCHR: booking_date: datetime df_bookings: pd.DataFrame = None df_translate: pd.DataFrame = None df_translate2: pd.DataFrame = None makes: dict[str, str] = None sites: dict[str, str] = None def __init__(self, base_dir) -> None: self.base_dir = base_dir self.account_translation = f"{self.base_dir}/data/Kontenrahmen_uebersetzt.csv" self.account_bookings = list(Path(self.base_dir).joinpath("data").glob("GuV_Bilanz_Salden*.csv")) self.first_month_of_financial_year = "10" pd.set_option("display.max_rows", 500) pd.set_option("display.float_format", lambda x: "%.2f" % x) def set_bookkeep_period(self, year, month): self.current_year = year self.current_month = month period = f"{year}-{month}" prot_file = f"{self.export_info_dir}/protokoll_{period}.log" logging.basicConfig( filename=prot_file, filemode="w", encoding="utf-8", level=logging.DEBUG, force=True, ) self.debug_file = f"{self.export_info_dir}/debug_{period}.csv" self.account_ignored = f"{self.export_info_dir}/ignoriert_{period}.csv" # self.account_invalid = f"{self.export_info_dir}/ungueltig_{period}.csv" self.last_year = str(int(self.current_year) - 1) self.last_year2 = str(int(self.current_year) - 2) self.next_year = str(int(self.current_year) + 1) def header(self, makes_used, sites_used, main_site): return { "Country": "DE", "MainBmCode": main_site, "Month": self.current_month, "Year": self.current_year, "Currency": "EUR", "NumberOfMakes": len(makes_used), "NumberOfSites": len(sites_used), "ExtractionDate": self.booking_date.strftime("%d.%m.%Y"), "ExtractionTime": self.booking_date.strftime("%H:%M:%S"), "BeginFiscalYear": self.first_month_of_financial_year, } @property def bookkeep_filter(self): period = [self.current_year + str(i).zfill(2) for i in range(1, 13)] if self.first_month_of_financial_year != "01": if self.first_month_of_financial_year > self.current_month: period = [self.last_year + str(i).zfill(2) for i in range(1, 13)] + period else: period = period + [self.next_year + str(i).zfill(2) for i in range(1, 13)] fm = int(self.first_month_of_financial_year) period = period[fm - 1 : fm + 12] period = [self.current_year + "00"] + period rename_to = ["OpeningBalance"] + ["Period" + str(i).zfill(2) for i in range(1, 13)] return dict(zip(period, rename_to)) def extract_acct_info(self, df: pd.DataFrame): acct_info = [ "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", ] df["HasFiveDashes"] = df["Konto_Nr_SKR51"].str.count("-") == 5 df["Invalid"] = "XX-XX-XXXX-XX-XX-XX" df["Konto_Nr_SKR51"] = np.where( df["HasFiveDashes"], df["Konto_Nr_SKR51"], df["Invalid"], ) df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True) return df def export_all_periods(self, overwrite=False, today=None): dt = datetime.now() if today is not None: dt = datetime.fromisoformat(today) prev = str(dt.year - 1) periods = [(prev, str(x).zfill(2)) for x in range(dt.month, 13)] + [ (str(dt.year), str(x).zfill(2)) for x in range(1, dt.month) ] for year, month in periods: filename = self.export_filename_for_period(year, month) if overwrite or not Path(filename).exists(): os.makedirs(Path(filename).parent.joinpath("info"), exist_ok=True) self.export_period(year, month) def export_period(self, year, month): self.set_bookkeep_period(year, month) # Übersetzungstabelle laden self.get_translation() # Kontensalden laden df_bookings = self.filter_bookings() all_periods = set(df_bookings["Bookkeep Period"].to_list()) bookkeep_period_date = datetime(int(year), int(month), 28) if df_bookings.shape[0] == 0 or len(all_periods) <= 1 or self.booking_date < bookkeep_period_date: logging.error("ABBRUCH!!! Keine Daten vorhanden!") return False filter_to = self.current_year + self.current_month period_no = list(self.bookkeep_filter.keys()).index(filter_to) + 1 logging.info("df_bookings: " + str(df_bookings.shape)) # Join auf Übersetzung df_combined = df_bookings.merge(self.df_translate, how="inner", on="Konto_Nr_Händler") logging.info(f"df_combined: {df_combined.shape}") df_pivot = df_combined.pivot_table( index=["Konto_Nr_SKR51"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_pivot.drop(index="CumulatedYear", inplace=True) logging.info("df_pivot: " + str(df_pivot.shape)) df = df_pivot.merge(self.df_translate2, how="inner", on="Konto_Nr_SKR51") makes_used = sorted(list(set(df["Marke"].to_list()))) sites_used = sorted(list(set((df["Marke"] + "-" + df["Standort"]).to_list()))) from_label = ["Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "KRM"] to_label = ["Make", "Site", "Account", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString"] col_dict = dict(zip(from_label, to_label)) df = df.rename(columns=col_dict) export_csv = self.export_filename[:-4] + ".csv" df.to_csv(export_csv, decimal=",", sep=";", encoding="latin-1", index=False) df = df[df["IsNumeric"] != False].groupby(ACCOUNT_INFO, as_index=False).aggregate("sum") # Infos ergänzen df["Decimals"] = 2 # df.sort_values(by=["Konto_Nr_SKR51"], inplace=True) logging.info(df.shape) main_sites = [self.sites[s] for s in sites_used if s in self.sites and self.sites[s] != "0000"] for i, main_site in enumerate(main_sites): filename = self.export_filename if i > 0: filename = f"{filename[:-4]}_{main_site}.xml" self.export_skr51_xml( df.to_dict(orient="records"), self.bookkeep_filter, period_no, makes_used, sites_used, main_site, filename, ) # Join auf Übersetzung - nicht zugeordnet df_ignored = df_bookings.merge(self.df_translate, how="left", on="Konto_Nr_Händler") df_ignored = df_ignored[df_ignored["Konto_Nr_SKR51"].isna()] if not df_ignored.empty: df_ignored = df_ignored.pivot_table( index=["Konto_Nr_Händler"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_ignored.to_csv(self.account_ignored, decimal=",", sep=";", encoding="latin-1") return self.export_filename def get_translation(self): if self.df_translate is None: df_translate_import = pd.read_csv( self.account_translation, decimal=",", sep=";", encoding="latin-1", converters={i: str for i in range(0, 200)}, ).reset_index() df_makes = df_translate_import[["Marke", "Marke_HBV"]].copy().drop_duplicates() df_makes = df_makes[df_makes["Marke_HBV"] != "0000"] self.makes = dict([(e["Marke"], e["Marke_HBV"]) for e in df_makes.to_dict(orient="records")]) self.makes["99"] = "0000" df_sites = df_translate_import[["Marke", "Standort", "Standort_HBV"]].copy().drop_duplicates() df_sites["Standort_HBV"] = np.where( df_sites["Standort_HBV"].str.len() != 6, "0000", df_sites["Standort_HBV"] ) self.sites = dict( [(e["Marke"] + "-" + e["Standort"], e["Standort_HBV"]) for e in df_sites.to_dict(orient="records")] ) df_prepared = self.prepare_translation(df_translate_import) self.df_translate = self.special_translation(df_prepared) self.df_translate2 = ( self.df_translate.drop(columns=["Konto_Nr_Händler"]) .copy() .drop_duplicates() .set_index("Konto_Nr_SKR51") ) return self.df_translate def prepare_translation(self, df_translate_import: pd.DataFrame): df_translate = df_translate_import[ [ "Konto_Nr_Händler", "Konto_Nr_SKR51", ] ].drop_duplicates() logging.info(df_translate.shape) row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Konto_Nr_SKR51": "01-01-0861-00-00-00", } df_translate = pd.concat([df_translate, pd.DataFrame.from_records([row])]) df_translate.set_index("Konto_Nr_Händler") return df_translate def special_translation(self, df: pd.DataFrame): df["Konto_Nr_Händler"] = df["Konto_Nr_Händler"].str.upper() df["Konto_Nr_SKR51"] = df["Konto_Nr_SKR51"].str.upper() df = self.extract_acct_info(df) df["Konto_Nr"] = df["Konto_Nr"].str.upper() logging.info(df.shape) logging.info(df.columns) logging.info(df.head()) logging.info("df: " + str(df.shape)) df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]") df["Kontoart"] = np.where(df["Bilanz"], "1", "2") df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"]) df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"]) df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1) # fehlende Marken- und Standortzuordnung df["Marke"] = np.where(df["Marke"].isin(self.makes.keys()), df["Marke"], "99") df["Marke_Standort"] = df["Marke"] + "-" + df["Standort"] df["Standort"] = np.where(df["Marke_Standort"].isin(self.sites.keys()), df["Standort"], "01") df_debug = df.drop(columns=["Bilanz"]) logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum")) logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum")) logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum")) df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv( self.debug_file, decimal=",", sep=";", encoding="latin-1" ) # Bereinigung GW-Kostenträger df["NW_Verkauf_1"] = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d")) df["Kostenstelle"] = np.where(df["NW_Verkauf_1"] == True, "11", df["Kostenstelle"]) df["Konto_7010"] = df["Konto_Nr"].str.match(r"^[78]01[01]") df["Kostenstelle"] = np.where(df["Konto_7010"] == True, "14", df["Kostenstelle"]) df["GW_Verkauf_2"] = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_2"] == True, "21", df["Kostenstelle"]) df["GW_Verkauf_3"] = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_3"] == True, "31", df["Kostenstelle"]) df["GW_Verkauf_4"] = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_4"] == True, "41", df["Kostenstelle"]) df["GW_Verkauf_x420"] = df["Konto_Nr"].str.match(r"^[78]420") df["Kostenstelle"] = np.where(df["GW_Verkauf_x420"] == True, "42", df["Kostenstelle"]) df["GW_Verkauf_5"] = (df["Konto_Nr"].str.match(r"^[78]5")) & (df["Kostenstelle"].str.match(r"^[^5]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_5"] == True, "51", df["Kostenstelle"]) df["GW_Verkauf_50"] = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Verkauf_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where( (df["GW_Verkauf_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"], ) df["NW_Verkauf_00"] = ( (df["Konto_Nr"].str.match(r"^[78]2")) & (df["Kostenstelle"].str.match(r"^1")) & (df["Kostenträger"].str.match(r"^[^01234]")) ) df["Kostenträger"] = np.where(df["NW_Verkauf_00"] == True, "00", df["Kostenträger"]) df["GW_Stk_50"] = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Stk_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where((df["GW_Stk_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"]) df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"]) df["Konto_5er"] = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143")) df["Absatzkanal"] = np.where(df["Konto_5er"] == True, "99", df["Absatzkanal"]) df["Konto_5005"] = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]")) df["Kostenstelle"] = np.where(df["Konto_5005"] == True, "20", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5005"] == True, "50", df["Kostenträger"]) df["Konto_5007"] = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)")) df["Kostenstelle"] = np.where(df["Konto_5007"] == True, "41", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5007"] == True, "70", df["Kostenträger"]) df["Konto_914er"] = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]")) df["Kostenträger"] = np.where(df["Konto_914er"] == True, "70", df["Kostenträger"]) df["Teile_30_60"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[3]")) & (df["Kostenträger"].str.match(r"^[^6]")) ) df["Kostenträger"] = np.where(df["Teile_30_60"] == True, "60", df["Kostenträger"]) df["Service_40_70"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[4]")) & (df["Kostenträger"].str.match(r"^[^7]")) ) df["Kostenträger"] = np.where(df["Service_40_70"] == True, "70", df["Kostenträger"]) df["KRM"] = df["Marke"] + df["Standort"] + df["Kostenstelle"] + df["Absatzkanal"] + df["Kostenträger"] df["Konto_Nr_SKR51"] = ( (df["Marke"] + "-" + df["Standort"] + "-" + df["Konto_Nr"]) + "-" + (df["Kostenstelle"] + "-" + df["Absatzkanal"] + "-" + df["Kostenträger"]) ) df["IsNumeric"] = ( (df["KRM"].str.isdigit()) & (df["Konto_Nr"].str.isdigit()) & (df["Konto_Nr"].str.len() == 4) # & (df["Konto_Nr_SKR51"].str.len() == 19) ) df_invalid = df[df["IsNumeric"] == False] df_invalid.to_csv(self.export_invalid_filename, decimal=",", sep=";", encoding="latin-1", index=False) return df[df["IsNumeric"] == True][TRANSLATE] def load_bookings_from_file(self): df2 = [] timestamps = [] for csv_file in self.account_bookings: df2.append( pd.read_csv( csv_file, decimal=",", sep=";", encoding="latin-1", converters={0: str, 1: str}, ) ) timestamps.append(Path(csv_file).stat().st_mtime) self.booking_date = datetime.fromtimestamp(max(timestamps)) self.df_bookings = pd.concat(df2) self.df_bookings["amount"] = (self.df_bookings["Debit Amount"] + self.df_bookings["Credit Amount"]).round(2) def filter_bookings(self): if self.df_bookings is None: self.load_bookings_from_file() # Kontensalden auf gegebenen Monat filtern filter_from = self.current_year + self.first_month_of_financial_year filter_prev = self.last_year + self.first_month_of_financial_year if self.first_month_of_financial_year > self.current_month: filter_from = self.last_year + self.first_month_of_financial_year filter_prev = self.last_year2 + self.first_month_of_financial_year filter_to = self.current_year + self.current_month filter_opening = self.current_year + "00" filter_prev_opening = self.last_year + "00" prev_year_closed = True df_opening_balance = self.df_bookings[(self.df_bookings["Bookkeep Period"] == filter_opening)] if df_opening_balance.shape[0] == 0: df_opening_balance = self.df_bookings[ (self.df_bookings["Bookkeep Period"] == filter_prev_opening) | ( (self.df_bookings["Bookkeep Period"] >= filter_prev) & (self.df_bookings["Bookkeep Period"] < filter_from) ) ].copy() df_opening_balance["Bookkeep Period"] = filter_opening prev_year_closed = False df_opening_balance = df_opening_balance[(df_opening_balance["Konto_Nr_Händler"].str.contains(r"-[013]\d\d+-"))] opening_balance = df_opening_balance["amount"].aggregate("sum").round(2) logging.info("Gewinn/Verlustvortrag") logging.info(opening_balance) if not prev_year_closed: row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Bookkeep Period": filter_opening, "Debit Amount": opening_balance * -1, "Credit Amount": 0, "Debit Quantity": 0, "Credit Quantity": 0, "amount": opening_balance * -1, } df_opening_balance = pd.concat([df_opening_balance, pd.DataFrame.from_records([row])]) df_filtered = self.df_bookings[ (self.df_bookings["Bookkeep Period"] >= filter_from) & (self.df_bookings["Bookkeep Period"] <= filter_to) ] # Buchungen kopieren und als Statistikkonten anhängen df_stats = df_filtered.copy() # df_stats = df_stats[df_stats['Konto_Nr_Händler'].str.match(r'-[24578]\d\d\d-')] df_stats["Konto_Nr_Händler"] = df_stats["Konto_Nr_Händler"].str.replace(r"-(\d\d\d+)-", r"-\1_STK-", regex=True) df_stats["amount"] = (df_filtered["Debit Quantity"] + df_filtered["Credit Quantity"]).round(2) df_combined = pd.concat([df_opening_balance, df_filtered, df_stats]) # Spalten konvertieren df_combined["period"] = df_combined["Bookkeep Period"].apply(lambda x: self.bookkeep_filter[x]) return df_combined[df_combined["amount"] != 0.00] @property def export_filename(self): return self.export_filename_for_period(self.current_year, self.current_month) @property def export_info_dir(self): return f"{self.base_dir}/Export/{self.current_year}/info/" @property def export_invalid_filename(self): return f"{self.base_dir}/Export/ungueltig.csv" def export_filename_for_period(self, year, month): return f"{self.base_dir}/Export/{year}/export_{year}-{month}.xml" def export_skr51_xml(self, records, bk_filter, period_no, makes_used, sites_used, main_site, filename): record_elements = ACCOUNT_INFO + ["Decimals"] + list(bk_filter.values())[:period_no] + ["CumulatedYear"] root = ET.Element("HbvData") h = ET.SubElement(root, "Header") for k, v in self.header(makes_used, sites_used, main_site).items(): ET.SubElement(h, k).text = str(v) make_list = ET.SubElement(root, "MakeList") for make in makes_used: if make not in self.makes: continue e = ET.SubElement(make_list, "MakeListEntry") ET.SubElement(e, "Make").text = make ET.SubElement(e, "MakeCode").text = self.makes[make] bm_code_list = ET.SubElement(root, "BmCodeList") for s in sites_used: make, site = s.split("-") if s not in self.sites: continue e = ET.SubElement(bm_code_list, "BmCodeEntry") ET.SubElement(e, "Make").text = make ET.SubElement(e, "Site").text = site ET.SubElement(e, "BmCode").text = self.sites[s] record_list = ET.SubElement(root, "RecordList") for row in records: record = ET.SubElement(record_list, "Record") for e in record_elements: child = ET.SubElement(record, e) field = row.get(e, 0.0) if str(field) == "nan": field = "0" elif type(field) is float: field = "{:.0f}".format(field * 100) child.text = str(field) with open(filename, "w", encoding="utf-8") as fwh: fwh.write(minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")) def convert_to_row(self, node): return [child.text for child in node] def convert_xml_to_csv(self, xmlfile, csvfile): record_list = ET.parse(xmlfile).getroot().find("RecordList") header = [child.tag for child in record_list.find("Record")] bookings = [self.convert_to_row(node) for node in record_list.findall("Record")] with open(csvfile, "w") as fwh: cwh = csv.writer(fwh, delimiter=";") cwh.writerow(header) cwh.writerows(bookings) return True def convert_csv_to_xml(self, csvfile, xmlfile): self.makes = {"01": "1844"} self.sites = {"01-01": "1844"} with open(csvfile, "r", encoding="latin-1") as frh: csv_reader = csv.DictReader(frh, delimiter=";") self.export_skr51_xml(csv_reader, self.bookkeep_filter(), 1, list(self.sites.values())[0], xmlfile) def gchr_local(): base_dir = os.getcwd() + "/../GCHR2_Testdaten/Kunden" for path in Path(base_dir).glob("*"): if path.is_dir(): print(path.name) gchr_export(str(path)) def gchr_export(base_dir): gchr = GCHR(base_dir) # gchr.export_all_periods(overwrite=True, today="2022-08-01") gchr.export_all_periods() if __name__ == "__main__": gchr_local() # import cProfile # cProfile.run( # "gchr_local()", # "gchr_local.prof", # )