import csv import logging import os import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Callable from xml.dom import minidom import numpy as np import pandas as pd ACCOUNT_INFO = [ "Account", "Make", "Site", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString", ] TRANSLATE = [ "Konto_Nr_Händler", "Konto_Nr_SKR51", "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "Kontoart", "Konto_1", "KRM", "IsNumeric", ] @dataclass class GchrExportConfig: main_site: str current_year: str current_month: str makes_used: dict[str, str] sites_used: dict[str, str] first_month: str period_no: str bookkeep_filter: dict[str, str] extraction_date: datetime export_file: str bookkeep_records = dict[str, list[str]] header: dict[str, str] | None = None @dataclass class GchrConfig: first_month_of_financial_year: str data_path: str gcstruct_path: str export_path: str export_fn = Callable[[GchrExportConfig], None] class GCHR: booking_date: datetime df_bookings: pd.DataFrame = None df_translate: pd.DataFrame = None df_translate2: pd.DataFrame = None makes: dict[str, str] = None sites: dict[str, str] = None current_year: str current_month: str def __init__(self, base_dir: str) -> None: self.base_dir = base_dir self.account_translation = f"{self.base_dir}/data/Kontenrahmen_uebersetzt.csv" self.account_bookings = list(Path(self.base_dir).joinpath("data").glob("GuV_Bilanz_Salden*.csv")) self.first_month_of_financial_year = "10" pd.set_option("display.max_rows", 500) pd.set_option("display.float_format", lambda x: "%.2f" % x) def set_bookkeep_period(self, year: str, month: str) -> None: self.current_year = year self.current_month = month period = f"{year}-{month}" prot_file = f"{self.export_info_dir}/protokoll_{period}.log" logging.basicConfig( filename=prot_file, filemode="w", encoding="utf-8", level=logging.DEBUG, force=True, ) self.debug_file = f"{self.export_info_dir}/debug_{period}.csv" self.account_ignored = f"{self.export_info_dir}/ignoriert_{period}.csv" # self.account_invalid = f"{self.export_info_dir}/ungueltig_{period}.csv" self.last_year = str(int(self.current_year) - 1) self.last_year2 = str(int(self.current_year) - 2) self.next_year = str(int(self.current_year) + 1) @staticmethod def header(export_cfg: GchrExportConfig) -> dict[str, str]: return { "Country": "DE", "MainBmCode": export_cfg.main_site, "Month": export_cfg.current_month, "Year": export_cfg.current_year, "Currency": "EUR", "NumberOfMakes": len(export_cfg.makes_used), "NumberOfSites": len(export_cfg.sites_used), "ExtractionDate": export_cfg.extraction_date.strftime("%d.%m.%Y"), "ExtractionTime": export_cfg.extraction_date.strftime("%H:%M:%S"), "BeginFiscalYear": export_cfg.first_month, } @property def bookkeep_filter(self) -> dict[str, str]: period = [self.current_year + str(i).zfill(2) for i in range(1, 13)] if self.first_month_of_financial_year != "01": if self.first_month_of_financial_year > self.current_month: period = [self.last_year + str(i).zfill(2) for i in range(1, 13)] + period else: period = period + [self.next_year + str(i).zfill(2) for i in range(1, 13)] fm = int(self.first_month_of_financial_year) period = period[fm - 1 : fm + 12] period = [self.current_year + "00"] + period rename_to = ["OpeningBalance"] + ["Period" + str(i).zfill(2) for i in range(1, 13)] return dict(zip(period, rename_to)) def extract_acct_info(self, df: pd.DataFrame) -> pd.DataFrame: acct_info = [ "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", ] df["HasFiveDashes"] = df["Konto_Nr_SKR51"].str.count("-") == 5 df["Invalid"] = "XX-XX-XXXX-XX-XX-XX" df["Konto_Nr_SKR51"] = np.where( df["HasFiveDashes"], df["Konto_Nr_SKR51"], df["Invalid"], ) df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True) return df def export_all_periods(self, overwrite=False, today=None) -> None: dt = datetime.now() if today is not None: dt = datetime.fromisoformat(today) prev = str(dt.year - 1) periods = [(prev, str(x).zfill(2)) for x in range(dt.month, 13)] + [ (str(dt.year), str(x).zfill(2)) for x in range(1, dt.month) ] for year, month in periods: filename = self.export_filename_for_period(year, month) if overwrite or not Path(filename).exists(): os.makedirs(Path(filename).parent.joinpath("info"), exist_ok=True) self.export_period(year, month) def export_period(self, year: str, month: str) -> str: self.set_bookkeep_period(year, month) # Übersetzungstabelle laden self.get_translation() # Kontensalden laden df_bookings = self.filter_bookings() all_periods = set(df_bookings["Bookkeep Period"].to_list()) bookkeep_period_date = datetime(int(year), int(month), 28) if df_bookings.shape[0] == 0 or len(all_periods) <= 1 or self.booking_date < bookkeep_period_date: logging.error("ABBRUCH!!! Keine Daten vorhanden!") return False filter_to = self.current_year + self.current_month period_no = list(self.bookkeep_filter.keys()).index(filter_to) + 1 logging.info("df_bookings: " + str(df_bookings.shape)) # Join auf Übersetzung df_combined = df_bookings.merge(self.df_translate, how="inner", on="Konto_Nr_Händler") logging.info(f"df_combined: {df_combined.shape}") df_pivot = df_combined.pivot_table( index=["Konto_Nr_SKR51"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_pivot.drop(index="CumulatedYear", inplace=True) logging.info("df_pivot: " + str(df_pivot.shape)) df = df_pivot.merge(self.df_translate2, how="inner", on="Konto_Nr_SKR51") makes_used = {} for m in sorted(list(set(df["Marke"].to_list()))): if m not in self.makes: continue makes_used[m] = self.makes[m] sites_used = {} for s in sorted(list(set((df["Marke"] + "-" + df["Standort"]).to_list()))): if s not in self.sites: continue sites_used[s] = self.sites[s] from_label = ["Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "KRM"] to_label = ["Make", "Site", "Account", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString"] col_dict = dict(zip(from_label, to_label)) df = df.rename(columns=col_dict) export_csv = self.export_filename[:-4] + ".csv" df.to_csv(export_csv, decimal=",", sep=";", encoding="latin-1", index=False) df = df[df["IsNumeric"] != False].groupby(ACCOUNT_INFO, as_index=False).aggregate("sum") # Infos ergänzen df["Decimals"] = 2 # df.sort_values(by=["Konto_Nr_SKR51"], inplace=True) logging.info(df.shape) main_sites = [self.sites[s] for s in sites_used if s in self.sites and self.sites[s] != "0000"] for i, main_site in enumerate(main_sites): filename = self.export_filename if i > 0: filename = f"{filename[:-4]}_{main_site}.xml" export_cfg = GchrExportConfig( main_site, year, month, makes_used, sites_used, self.first_month_of_financial_year, period_no, self.bookkeep_filter, filename, df.to_dict(orient="records"), ) export_cfg.header = self.header(export_cfg) self.export_skr51_xml(export_cfg) # Join auf Übersetzung - nicht zugeordnet df_ignored = df_bookings.merge(self.df_translate, how="left", on="Konto_Nr_Händler") df_ignored = df_ignored[df_ignored["Konto_Nr_SKR51"].isna()] if not df_ignored.empty: df_ignored = df_ignored.pivot_table( index=["Konto_Nr_Händler"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_ignored.to_csv(self.account_ignored, decimal=",", sep=";", encoding="latin-1") return self.export_filename def get_translation(self) -> pd.DataFrame: if self.df_translate is None: df_translate_import = pd.read_csv( self.account_translation, decimal=",", sep=";", encoding="latin-1", converters={i: str for i in range(0, 200)}, ).reset_index() df_makes = df_translate_import[["Marke", "Marke_HBV"]].copy().drop_duplicates() df_makes = df_makes[df_makes["Marke_HBV"] != "0000"] self.makes = dict([(e["Marke"], e["Marke_HBV"]) for e in df_makes.to_dict(orient="records")]) self.makes["99"] = "0000" df_sites = df_translate_import[["Marke", "Standort", "Standort_HBV"]].copy().drop_duplicates() df_sites["Standort_HBV"] = np.where( df_sites["Standort_HBV"].str.len() != 6, "0000", df_sites["Standort_HBV"] ) self.sites = dict( [(e["Marke"] + "-" + e["Standort"], e["Standort_HBV"]) for e in df_sites.to_dict(orient="records")] ) df_prepared = self.prepare_translation(df_translate_import) self.df_translate = self.special_translation(df_prepared) self.df_translate2 = ( self.df_translate.drop(columns=["Konto_Nr_Händler"]) .copy() .drop_duplicates() .set_index("Konto_Nr_SKR51") ) return self.df_translate def prepare_translation(self, df_translate_import: pd.DataFrame): df_translate = df_translate_import[ [ "Konto_Nr_Händler", "Konto_Nr_SKR51", ] ].drop_duplicates() logging.info(df_translate.shape) row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Konto_Nr_SKR51": "01-01-0861-00-00-00", } df_translate = pd.concat([df_translate, pd.DataFrame.from_records([row])]) df_translate.set_index("Konto_Nr_Händler") return df_translate def special_translation(self, df: pd.DataFrame) -> pd.DataFrame: df["Konto_Nr_Händler"] = df["Konto_Nr_Händler"].str.upper() df["Konto_Nr_SKR51"] = df["Konto_Nr_SKR51"].str.upper() df = self.extract_acct_info(df) df["Konto_Nr"] = df["Konto_Nr"].str.upper() logging.info(df.shape) logging.info(df.columns) logging.info(df.head()) logging.info("df: " + str(df.shape)) df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]") df["Kontoart"] = np.where(df["Bilanz"], "1", "2") df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"]) df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"]) df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1) # fehlende Marken- und Standortzuordnung df["Marke"] = np.where(df["Marke"].isin(self.makes.keys()), df["Marke"], "99") df["Marke_Standort"] = df["Marke"] + "-" + df["Standort"] df["Standort"] = np.where(df["Marke_Standort"].isin(self.sites.keys()), df["Standort"], "01") df_debug = df.drop(columns=["Bilanz"]) logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum")) logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum")) logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum")) df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv( self.debug_file, decimal=",", sep=";", encoding="latin-1" ) # Bereinigung GW-Kostenträger df["NW_Verkauf_1"] = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d")) df["Kostenstelle"] = np.where(df["NW_Verkauf_1"] == True, "11", df["Kostenstelle"]) df["Konto_7010"] = df["Konto_Nr"].str.match(r"^[78]01[01]") df["Kostenstelle"] = np.where(df["Konto_7010"] == True, "14", df["Kostenstelle"]) df["GW_Verkauf_2"] = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_2"] == True, "21", df["Kostenstelle"]) df["GW_Verkauf_3"] = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_3"] == True, "31", df["Kostenstelle"]) df["GW_Verkauf_4"] = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_4"] == True, "41", df["Kostenstelle"]) df["GW_Verkauf_x420"] = df["Konto_Nr"].str.match(r"^[78]420") df["Kostenstelle"] = np.where(df["GW_Verkauf_x420"] == True, "42", df["Kostenstelle"]) df["GW_Verkauf_5"] = (df["Konto_Nr"].str.match(r"^[78]5")) & (df["Kostenstelle"].str.match(r"^[^5]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_5"] == True, "51", df["Kostenstelle"]) df["GW_Verkauf_50"] = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Verkauf_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where( (df["GW_Verkauf_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"], ) df["NW_Verkauf_00"] = ( (df["Konto_Nr"].str.match(r"^[78]2")) & (df["Kostenstelle"].str.match(r"^1")) & (df["Kostenträger"].str.match(r"^[^01234]")) ) df["Kostenträger"] = np.where(df["NW_Verkauf_00"] == True, "00", df["Kostenträger"]) df["GW_Stk_50"] = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Stk_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where((df["GW_Stk_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"]) df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"]) df["Konto_5er"] = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143")) df["Absatzkanal"] = np.where(df["Konto_5er"] == True, "99", df["Absatzkanal"]) df["Konto_5005"] = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]")) df["Kostenstelle"] = np.where(df["Konto_5005"] == True, "20", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5005"] == True, "50", df["Kostenträger"]) df["Konto_5007"] = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)")) df["Kostenstelle"] = np.where(df["Konto_5007"] == True, "41", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5007"] == True, "70", df["Kostenträger"]) df["Konto_914er"] = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]")) df["Kostenträger"] = np.where(df["Konto_914er"] == True, "70", df["Kostenträger"]) df["Teile_30_60"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[3]")) & (df["Kostenträger"].str.match(r"^[^6]")) ) df["Kostenträger"] = np.where(df["Teile_30_60"] == True, "60", df["Kostenträger"]) df["Service_40_70"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[4]")) & (df["Kostenträger"].str.match(r"^[^7]")) ) df["Kostenträger"] = np.where(df["Service_40_70"] == True, "70", df["Kostenträger"]) df["KRM"] = df["Marke"] + df["Standort"] + df["Kostenstelle"] + df["Absatzkanal"] + df["Kostenträger"] df["Konto_Nr_SKR51"] = ( (df["Marke"] + "-" + df["Standort"] + "-" + df["Konto_Nr"]) + "-" + (df["Kostenstelle"] + "-" + df["Absatzkanal"] + "-" + df["Kostenträger"]) ) df["IsNumeric"] = ( (df["KRM"].str.isdigit()) & (df["Konto_Nr"].str.isdigit()) & (df["Konto_Nr"].str.len() == 4) # & (df["Konto_Nr_SKR51"].str.len() == 19) ) df_invalid = df[df["IsNumeric"] == False] df_invalid.to_csv(self.export_invalid_filename, decimal=",", sep=";", encoding="latin-1", index=False) return df[df["IsNumeric"] == True][TRANSLATE] def load_bookings_from_file(self) -> None: df2 = [] timestamps = [] for csv_file in self.account_bookings: df2.append( pd.read_csv( csv_file, decimal=",", sep=";", encoding="latin-1", converters={0: str, 1: str}, ) ) timestamps.append(Path(csv_file).stat().st_mtime) self.booking_date = datetime.fromtimestamp(max(timestamps)) self.df_bookings = pd.concat(df2) self.df_bookings["amount"] = (self.df_bookings["Debit Amount"] + self.df_bookings["Credit Amount"]).round(2) def filter_bookings(self) -> pd.DataFrame: if self.df_bookings is None: self.load_bookings_from_file() # Kontensalden auf gegebenen Monat filtern filter_from = self.current_year + self.first_month_of_financial_year filter_prev = self.last_year + self.first_month_of_financial_year if self.first_month_of_financial_year > self.current_month: filter_from = self.last_year + self.first_month_of_financial_year filter_prev = self.last_year2 + self.first_month_of_financial_year filter_to = self.current_year + self.current_month filter_opening = self.current_year + "00" filter_prev_opening = self.last_year + "00" prev_year_closed = True df_opening_balance = self.df_bookings[(self.df_bookings["Bookkeep Period"] == filter_opening)] if df_opening_balance.shape[0] == 0: df_opening_balance = self.df_bookings[ (self.df_bookings["Bookkeep Period"] == filter_prev_opening) | ( (self.df_bookings["Bookkeep Period"] >= filter_prev) & (self.df_bookings["Bookkeep Period"] < filter_from) ) ].copy() df_opening_balance["Bookkeep Period"] = filter_opening prev_year_closed = False df_opening_balance = df_opening_balance[(df_opening_balance["Konto_Nr_Händler"].str.contains(r"-[013]\d\d+-"))] opening_balance = df_opening_balance["amount"].aggregate("sum").round(2) logging.info("Gewinn/Verlustvortrag") logging.info(opening_balance) if not prev_year_closed: row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Bookkeep Period": filter_opening, "Debit Amount": opening_balance * -1, "Credit Amount": 0, "Debit Quantity": 0, "Credit Quantity": 0, "amount": opening_balance * -1, } df_opening_balance = pd.concat([df_opening_balance, pd.DataFrame.from_records([row])]) df_filtered = self.df_bookings[ (self.df_bookings["Bookkeep Period"] >= filter_from) & (self.df_bookings["Bookkeep Period"] <= filter_to) ] # Buchungen kopieren und als Statistikkonten anhängen df_stats = df_filtered.copy() # df_stats = df_stats[df_stats['Konto_Nr_Händler'].str.match(r'-[24578]\d\d\d-')] df_stats["Konto_Nr_Händler"] = df_stats["Konto_Nr_Händler"].str.replace(r"-(\d\d\d+)-", r"-\1_STK-", regex=True) df_stats["amount"] = (df_filtered["Debit Quantity"] + df_filtered["Credit Quantity"]).round(2) df_combined = pd.concat([df_opening_balance, df_filtered, df_stats]) # Spalten konvertieren df_combined["period"] = df_combined["Bookkeep Period"].apply(lambda x: self.bookkeep_filter[x]) return df_combined[df_combined["amount"] != 0.00] @property def export_filename(self) -> str: return self.export_filename_for_period(self.current_year, self.current_month) @property def export_info_dir(self) -> str: return f"{self.base_dir}/Export/{self.current_year}/info/" @property def export_invalid_filename(self) -> str: return f"{self.base_dir}/Export/ungueltig.csv" def export_filename_for_period(self, year: str, month: str) -> str: return f"{self.base_dir}/Export/{year}/export_{year}-{month}.xml" @staticmethod def export_skr51_xml(export_cfg: GchrExportConfig): record_elements = ( ACCOUNT_INFO + ["Decimals"] + list(export_cfg.bookkeep_filter.values())[: export_cfg.period_no] + ["CumulatedYear"] ) root = ET.Element("HbvData") h = ET.SubElement(root, "Header") for k, v in export_cfg.header.items(): ET.SubElement(h, k).text = str(v) make_list = ET.SubElement(root, "MakeList") for make, make_code in export_cfg.makes_used.items(): e = ET.SubElement(make_list, "MakeListEntry") ET.SubElement(e, "Make").text = make ET.SubElement(e, "MakeCode").text = make_code bm_code_list = ET.SubElement(root, "BmCodeList") for s, bmcode in export_cfg.sites_used.items(): make, site = s.split("-") e = ET.SubElement(bm_code_list, "BmCodeEntry") ET.SubElement(e, "Make").text = make ET.SubElement(e, "Site").text = site ET.SubElement(e, "BmCode").text = bmcode record_list = ET.SubElement(root, "RecordList") for row in export_cfg.bookkeep_records: record = ET.SubElement(record_list, "Record") for e in record_elements: child = ET.SubElement(record, e) field = row.get(e, 0.0) if str(field) == "nan": field = "0" elif type(field) is float: field = "{:.0f}".format(field * 100) child.text = str(field) with open(export_cfg.export_file, "w", encoding="utf-8") as fwh: fwh.write(minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")) def convert_to_row(self, node: list[ET.Element]) -> list[str]: return [child.text for child in node] def convert_xml_to_csv(self, xmlfile: str, csvfile: str) -> bool: with open(xmlfile) as frh: record_list = ET.parse(frh).getroot().find("RecordList") header = [child.tag for child in record_list.find("Record")] bookings = [self.convert_to_row(node) for node in record_list.findall("Record")] with open(csvfile, "w") as fwh: cwh = csv.writer(fwh, delimiter=";") cwh.writerow(header) cwh.writerows(bookings) return True def convert_csv_to_xml(self, csvfile: str, xmlfile: str) -> None: self.makes = {"01": "1844"} self.sites = {"01-01": "1844"} with open(csvfile, "r", encoding="latin-1") as frh: csv_reader = csv.DictReader(frh, delimiter=";") self.export_skr51_xml(csv_reader, self.bookkeep_filter(), 1, list(self.sites.values())[0], xmlfile) def gchr_local() -> None: base_dir = os.getcwd() + "/../GCHR2_Testdaten/Kunden" for path in Path(base_dir).glob("*"): if path.is_dir(): print(path.name) gchr_export(str(path)) def gchr_export(base_dir: str) -> None: gchr = GCHR(base_dir) # gchr.export_all_periods(overwrite=True, today="2022-08-01") gchr.export_all_periods() if __name__ == "__main__": gchr_local() # import cProfile # cProfile.run( # "gchr_local()", # "gchr_local.prof", # )