import csv import logging import os import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Callable from xml.dom import minidom import numpy as np import pandas as pd ACCOUNT_INFO = [ "Account", "Make", "Site", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString", ] TRANSLATE = [ "Konto_Nr_Händler", "Konto_Nr_SKR51", "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "Kontoart", "Konto_1", "KRM", "IsNumeric", ] @dataclass class GchrExportConfig: main_site: str current_year: str current_month: str makes_used: dict[str, str] sites_used: dict[str, str] first_month: str period_no: str bookkeep_filter: dict[str, str] extraction_date: datetime export_file: str bookkeep_records = dict[str, list[str]] header: dict[str, str] | None = None @dataclass class GchrConfig: first_month_of_financial_year: str data_path: str gcstruct_path: str export_path: str export_fn = Callable[[GchrExportConfig], None] class GCHR: booking_date: datetime _df_bookings: pd.DataFrame = None _df_translate: pd.DataFrame = None df_translate2: pd.DataFrame = None makes: dict[str, str] = None sites: dict[str, str] = None current_year: str current_month: str def __init__(self, base_dir: str) -> None: self.base_dir = base_dir self.account_translation = f"{self.base_dir}/data/Kontenrahmen_uebersetzt.csv" self.account_bookings = list(Path(self.base_dir).joinpath("data").glob("GuV_Bilanz_Salden*.csv")) self.first_month_of_financial_year = "10" pd.set_option("display.max_rows", 500) pd.set_option("display.float_format", lambda x: "%.2f" % x) def set_bookkeep_period(self, year: str, month: str) -> None: self.current_year = year self.current_month = month self.period = f"{year}-{month}" prot_file = f"{self.export_info_dir}/protokoll_{self.period}.log" logging.basicConfig( filename=prot_file, filemode="w", encoding="utf-8", level=logging.DEBUG, force=True, ) @property def debug_file(self) -> str: return f"{self.export_info_dir}/debug_{self.period}.csv" @property def account_ignored(self) -> str: return f"{self.export_info_dir}/ignoriert_{self.period}.csv" # self.account_invalid = f"{self.export_info_dir}/ungueltig_{self.period}.csv" @property def last_year(self) -> str: return str(int(self.current_year) - 1) @property def last_year2(self) -> str: return str(int(self.current_year) - 2) @property def next_year(self) -> str: return str(int(self.current_year) + 1) @staticmethod def header(export_cfg: GchrExportConfig) -> dict[str, str]: return { "Country": "DE", "MainBmCode": export_cfg.main_site, "Month": export_cfg.current_month, "Year": export_cfg.current_year, "Currency": "EUR", "NumberOfMakes": len(export_cfg.makes_used), "NumberOfSites": len(export_cfg.sites_used), "ExtractionDate": export_cfg.extraction_date.strftime("%d.%m.%Y"), "ExtractionTime": export_cfg.extraction_date.strftime("%H:%M:%S"), "BeginFiscalYear": export_cfg.first_month, } @property def bookkeep_filter(self) -> dict[str, str]: period = [self.current_year + str(i).zfill(2) for i in range(1, 13)] if self.first_month_of_financial_year != "01": if self.first_month_of_financial_year > self.current_month: period = [self.last_year + str(i).zfill(2) for i in range(1, 13)] + period else: period = period + [self.next_year + str(i).zfill(2) for i in range(1, 13)] fm = int(self.first_month_of_financial_year) period = period[fm - 1 : fm + 12] period = [self.current_year + "00"] + period rename_to = ["OpeningBalance"] + ["Period" + str(i).zfill(2) for i in range(1, 13)] return dict(zip(period, rename_to)) @staticmethod def extract_acct_info(df: pd.DataFrame) -> pd.DataFrame: acct_info = [ "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", ] df["HasFiveDashes"] = df["Konto_Nr_SKR51"].str.count("-") == 5 df["Invalid"] = "XX-XX-XXXX-XX-XX-XX" df["Konto_Nr_SKR51"] = np.where( df["HasFiveDashes"], df["Konto_Nr_SKR51"], df["Invalid"], ) df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True) return df def export_all_periods(self, overwrite=False, today=None) -> None: dt = datetime.now() if today is not None: dt = datetime.fromisoformat(today) prev = str(dt.year - 1) periods = [(prev, str(x).zfill(2)) for x in range(dt.month, 13)] + [ (str(dt.year), str(x).zfill(2)) for x in range(1, dt.month) ] for year, month in periods: filename = self.export_filename_for_period(year, month) if overwrite or not Path(filename).exists(): os.makedirs(Path(filename).parent.joinpath("info"), exist_ok=True) self.export_period(year, month) def export_period(self, year: str, month: str) -> str: self.set_bookkeep_period(year, month) # Kontensalden laden df_bookings = self.filter_bookings() all_periods = set(df_bookings["Bookkeep Period"].to_list()) bookkeep_period_date = datetime(int(year), int(month), 28) if df_bookings.shape[0] == 0 or len(all_periods) <= 1 or self.booking_date < bookkeep_period_date: logging.error("ABBRUCH!!! Keine Daten vorhanden!") return False filter_to = self.current_year + self.current_month period_no = list(self.bookkeep_filter.keys()).index(filter_to) + 1 logging.info("df_bookings: " + str(df_bookings.shape)) # Join auf Übersetzung df_combined = df_bookings.merge(self._df_translate, how="inner", on="Konto_Nr_Händler") logging.info(f"df_combined: {df_combined.shape}") df_pivot = df_combined.pivot_table( index=["Konto_Nr_SKR51"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_pivot.drop(index="CumulatedYear", inplace=True) logging.info("df_pivot: " + str(df_pivot.shape)) df = df_pivot.merge(self.df_translate2, how="inner", on="Konto_Nr_SKR51") makes_used = {} for m in sorted(list(set(df["Marke"].to_list()))): if m not in self.makes: continue makes_used[m] = self.makes[m] sites_used = {} for s in sorted(list(set((df["Marke"] + "-" + df["Standort"]).to_list()))): if s not in self.sites: continue sites_used[s] = self.sites[s] from_label = ["Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "KRM"] to_label = ["Make", "Site", "Account", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString"] col_dict = dict(zip(from_label, to_label)) df = df.rename(columns=col_dict) export_csv = self.export_filename[:-4] + ".csv" df.to_csv(export_csv, decimal=",", sep=";", encoding="latin-1", index=False) df = df[df["IsNumeric"] != False].groupby(ACCOUNT_INFO, as_index=False).aggregate("sum") # Infos ergänzen df["Decimals"] = 2 # df.sort_values(by=["Konto_Nr_SKR51"], inplace=True) logging.info(df.shape) main_sites = [self.sites[s] for s in sites_used if s in self.sites and self.sites[s] != "0000"] for i, main_site in enumerate(main_sites): filename = self.export_filename if i > 0: filename = f"{filename[:-4]}_{main_site}.xml" export_cfg = GchrExportConfig( main_site, year, month, makes_used, sites_used, self.first_month_of_financial_year, period_no, self.bookkeep_filter, filename, df.to_dict(orient="records"), ) export_cfg.header = self.header(export_cfg) GCHR.export_skr51_xml(export_cfg) # Join auf Übersetzung - nicht zugeordnet df_ignored = df_bookings.merge(self.df_translate, how="left", on="Konto_Nr_Händler") df_ignored = df_ignored[df_ignored["Konto_Nr_SKR51"].isna()] if not df_ignored.empty: df_ignored = df_ignored.pivot_table( index=["Konto_Nr_Händler"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_ignored.to_csv(self.account_ignored, decimal=",", sep=";", encoding="latin-1") return self.export_filename @property def df_translate(self) -> pd.DataFrame: if self._df_translate is None: df_translate_import = pd.read_csv( self.account_translation, decimal=",", sep=";", encoding="latin-1", converters={i: str for i in range(0, 200)}, ).reset_index() self.makes = GCHR.get_makes_from_translation(df_translate_import) self.sites = GCHR.get_sites_from_translation(df_translate_import) df_prepared = GCHR.prepare_translation(df_translate_import) self._df_translate = self.special_translation(df_prepared) self.df_translate2 = ( self._df_translate.copy() .drop(columns=["Konto_Nr_Händler"]) .drop_duplicates() .set_index("Konto_Nr_SKR51") ) return self._df_translate @staticmethod def get_makes_from_translation(df_translate_import: pd.DataFrame) -> dict[str, str]: df_makes = df_translate_import[["Marke", "Marke_HBV"]].copy().drop_duplicates() df_makes = df_makes[df_makes["Marke_HBV"] != "0000"] makes = dict([(e["Marke"], e["Marke_HBV"]) for e in df_makes.to_dict(orient="records")]) makes["99"] = "0000" return makes @staticmethod def get_sites_from_translation(df_translate_import: pd.DataFrame) -> dict[str, str]: df_sites = df_translate_import[["Marke", "Standort", "Standort_HBV"]].copy().drop_duplicates() df_sites["Standort_HBV"] = np.where(df_sites["Standort_HBV"].str.len() != 6, "0000", df_sites["Standort_HBV"]) sites = dict( [(e["Marke"] + "-" + e["Standort"], e["Standort_HBV"]) for e in df_sites.to_dict(orient="records")] ) return sites @staticmethod def prepare_translation(df_translate_import: pd.DataFrame) -> pd.DataFrame: df = df_translate_import[ [ "Konto_Nr_Händler", "Konto_Nr_SKR51", ] ].drop_duplicates() logging.info(df.shape) row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Konto_Nr_SKR51": "01-01-0861-00-00-00", } df = pd.concat([df, pd.DataFrame.from_records([row])]) df.set_index("Konto_Nr_Händler") return df def special_translation(self, df: pd.DataFrame) -> pd.DataFrame: df["Konto_Nr_Händler"] = df["Konto_Nr_Händler"].str.upper() df["Konto_Nr_SKR51"] = df["Konto_Nr_SKR51"].str.upper() df = GCHR.extract_acct_info(df) df["Konto_Nr"] = df["Konto_Nr"].str.upper() logging.info(df.shape) logging.info(df.columns) logging.info(df.head()) logging.info("df: " + str(df.shape)) df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]") df["Kontoart"] = np.where(df["Bilanz"], "1", "2") df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"]) df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"]) df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1) # fehlende Marken- und Standortzuordnung df["Marke"] = np.where(df["Marke"].isin(self.makes.keys()), df["Marke"], "99") df["Marke_Standort"] = df["Marke"] + "-" + df["Standort"] df["Standort"] = np.where(df["Marke_Standort"].isin(self.sites.keys()), df["Standort"], "01") df_debug = df.drop(columns=["Bilanz"]) logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum")) logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum")) logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum")) df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv( self.debug_file, decimal=",", sep=";", encoding="latin-1" ) # Bereinigung GW-Kostenträger df["NW_Verkauf_1"] = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d")) df["Kostenstelle"] = np.where(df["NW_Verkauf_1"] == True, "11", df["Kostenstelle"]) df["Konto_7010"] = df["Konto_Nr"].str.match(r"^[78]01[01]") df["Kostenstelle"] = np.where(df["Konto_7010"] == True, "14", df["Kostenstelle"]) df["GW_Verkauf_2"] = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_2"] == True, "21", df["Kostenstelle"]) df["GW_Verkauf_3"] = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_3"] == True, "31", df["Kostenstelle"]) df["GW_Verkauf_4"] = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_4"] == True, "41", df["Kostenstelle"]) df["GW_Verkauf_x420"] = df["Konto_Nr"].str.match(r"^[78]420") df["Kostenstelle"] = np.where(df["GW_Verkauf_x420"] == True, "42", df["Kostenstelle"]) df["GW_Verkauf_5"] = (df["Konto_Nr"].str.match(r"^[78]5")) & (df["Kostenstelle"].str.match(r"^[^5]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_5"] == True, "51", df["Kostenstelle"]) df["GW_Verkauf_50"] = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Verkauf_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where( (df["GW_Verkauf_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"], ) df["NW_Verkauf_00"] = ( (df["Konto_Nr"].str.match(r"^[78]2")) & (df["Kostenstelle"].str.match(r"^1")) & (df["Kostenträger"].str.match(r"^[^01234]")) ) df["Kostenträger"] = np.where(df["NW_Verkauf_00"] == True, "00", df["Kostenträger"]) df["GW_Stk_50"] = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Stk_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where((df["GW_Stk_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"]) df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"]) df["Konto_5er"] = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143")) df["Absatzkanal"] = np.where(df["Konto_5er"] == True, "99", df["Absatzkanal"]) df["Konto_5005"] = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]")) df["Kostenstelle"] = np.where(df["Konto_5005"] == True, "20", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5005"] == True, "50", df["Kostenträger"]) df["Konto_5007"] = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)")) df["Kostenstelle"] = np.where(df["Konto_5007"] == True, "41", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5007"] == True, "70", df["Kostenträger"]) df["Konto_914er"] = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]")) df["Kostenträger"] = np.where(df["Konto_914er"] == True, "70", df["Kostenträger"]) df["Teile_30_60"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[3]")) & (df["Kostenträger"].str.match(r"^[^6]")) ) df["Kostenträger"] = np.where(df["Teile_30_60"] == True, "60", df["Kostenträger"]) df["Service_40_70"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[4]")) & (df["Kostenträger"].str.match(r"^[^7]")) ) df["Kostenträger"] = np.where(df["Service_40_70"] == True, "70", df["Kostenträger"]) df["KRM"] = df["Marke"] + df["Standort"] + df["Kostenstelle"] + df["Absatzkanal"] + df["Kostenträger"] df["Konto_Nr_SKR51"] = ( (df["Marke"] + "-" + df["Standort"] + "-" + df["Konto_Nr"]) + "-" + (df["Kostenstelle"] + "-" + df["Absatzkanal"] + "-" + df["Kostenträger"]) ) df["IsNumeric"] = ( (df["KRM"].str.isdigit()) & (df["Konto_Nr"].str.isdigit()) & (df["Konto_Nr"].str.len() == 4) # & (df["Konto_Nr_SKR51"].str.len() == 19) ) df_invalid = df[df["IsNumeric"] == False] df_invalid.to_csv(self.export_invalid_filename, decimal=",", sep=";", encoding="latin-1", index=False) return df[df["IsNumeric"] == True][TRANSLATE] def load_bookings_from_file(self) -> None: df_list: list[pd.DataFrame] = [] timestamps: list[float] = [] for csv_file in self.account_bookings: df_list.append( pd.read_csv( csv_file, decimal=",", sep=";", encoding="latin-1", converters={0: str, 1: str}, ) ) timestamps.append(Path(csv_file).stat().st_mtime) self.booking_date = datetime.fromtimestamp(max(timestamps)) df = pd.concat(df_list) df["amount"] = (df["Debit Amount"] + df["Credit Amount"]).round(2) return df @property def df_bookings(self) -> pd.DataFrame: if self._df_bookings is None: self._df_bookings = self.load_bookings_from_file() return self._df_bookings def filter_bookings(self) -> pd.DataFrame: # Kontensalden auf gegebenen Monat filtern filter_from = self.current_year + self.first_month_of_financial_year filter_prev = self.last_year + self.first_month_of_financial_year if self.first_month_of_financial_year > self.current_month: filter_from = self.last_year + self.first_month_of_financial_year filter_prev = self.last_year2 + self.first_month_of_financial_year filter_to = self.current_year + self.current_month filter_opening = self.current_year + "00" filter_prev_opening = self.last_year + "00" prev_year_closed = True df_opening_balance = self.df_bookings[(self.df_bookings["Bookkeep Period"] == filter_opening)] if df_opening_balance.shape[0] == 0: df_opening_balance = self.df_bookings[ (self.df_bookings["Bookkeep Period"] == filter_prev_opening) | ( (self.df_bookings["Bookkeep Period"] >= filter_prev) & (self.df_bookings["Bookkeep Period"] < filter_from) ) ].copy() df_opening_balance["Bookkeep Period"] = filter_opening prev_year_closed = False df_opening_balance = df_opening_balance[(df_opening_balance["Konto_Nr_Händler"].str.contains(r"-[013]\d\d+-"))] opening_balance = df_opening_balance["amount"].aggregate("sum").round(2) logging.info("Gewinn/Verlustvortrag") logging.info(opening_balance) if not prev_year_closed: row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Bookkeep Period": filter_opening, "Debit Amount": opening_balance * -1, "Credit Amount": 0, "Debit Quantity": 0, "Credit Quantity": 0, "amount": opening_balance * -1, } df_opening_balance = pd.concat([df_opening_balance, pd.DataFrame.from_records([row])]) df_filtered = self.df_bookings[ (self.df_bookings["Bookkeep Period"] >= filter_from) & (self.df_bookings["Bookkeep Period"] <= filter_to) ] # Buchungen kopieren und als Statistikkonten anhängen df_stats = df_filtered.copy() # df_stats = df_stats[df_stats['Konto_Nr_Händler'].str.match(r'-[24578]\d\d\d-')] df_stats["Konto_Nr_Händler"] = df_stats["Konto_Nr_Händler"].str.replace(r"-(\d\d\d+)-", r"-\1_STK-", regex=True) df_stats["amount"] = (df_filtered["Debit Quantity"] + df_filtered["Credit Quantity"]).round(2) df_combined = pd.concat([df_opening_balance, df_filtered, df_stats]) # Spalten konvertieren df_combined["period"] = df_combined["Bookkeep Period"].apply(lambda x: self.bookkeep_filter[x]) return df_combined[df_combined["amount"] != 0.00] @property def export_filename(self) -> str: return self.export_filename_for_period(self.current_year, self.current_month) @property def export_info_dir(self) -> str: return f"{self.base_dir}/Export/{self.current_year}/info/" @property def export_invalid_filename(self) -> str: return f"{self.base_dir}/Export/ungueltig.csv" def export_filename_for_period(self, year: str, month: str) -> str: return f"{self.base_dir}/Export/{year}/export_{year}-{month}.xml" @staticmethod def export_skr51_xml(export_cfg: GchrExportConfig): record_elements = ( ACCOUNT_INFO + ["Decimals"] + list(export_cfg.bookkeep_filter.values())[: export_cfg.period_no] + ["CumulatedYear"] ) root = ET.Element("HbvData") h = ET.SubElement(root, "Header") for k, v in export_cfg.header.items(): ET.SubElement(h, k).text = str(v) make_list = ET.SubElement(root, "MakeList") for make, make_code in export_cfg.makes_used.items(): e = ET.SubElement(make_list, "MakeListEntry") ET.SubElement(e, "Make").text = make ET.SubElement(e, "MakeCode").text = make_code bm_code_list = ET.SubElement(root, "BmCodeList") for s, bmcode in export_cfg.sites_used.items(): make, site = s.split("-") e = ET.SubElement(bm_code_list, "BmCodeEntry") ET.SubElement(e, "Make").text = make ET.SubElement(e, "Site").text = site ET.SubElement(e, "BmCode").text = bmcode record_list = ET.SubElement(root, "RecordList") for row in export_cfg.bookkeep_records: record = ET.SubElement(record_list, "Record") for e in record_elements: child = ET.SubElement(record, e) field = row.get(e, 0.0) if str(field) == "nan": field = "0" elif type(field) is float: field = "{:.0f}".format(field * 100) child.text = str(field) with open(export_cfg.export_file, "w", encoding="utf-8") as fwh: fwh.write(minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")) @staticmethod def convert_to_row(node: list[ET.Element]) -> list[str]: return [child.text for child in node] @staticmethod def convert_xml_to_csv(xmlfile: str, csvfile: str) -> bool: with open(xmlfile) as frh: record_list = ET.parse(frh).getroot().find("RecordList") header = [child.tag for child in record_list.find("Record")] bookings = [GCHR.convert_to_row(node) for node in record_list.findall("Record")] with open(csvfile, "w") as fwh: cwh = csv.writer(fwh, delimiter=";") cwh.writerow(header) cwh.writerows(bookings) return True def convert_csv_to_xml(self, csvfile: str, xmlfile: str) -> None: self.makes = {"01": "1844"} self.sites = {"01-01": "1844"} with open(csvfile, "r", encoding="latin-1") as frh: csv_reader = csv.DictReader(frh, delimiter=";") GCHR.export_skr51_xml(csv_reader, self.bookkeep_filter(), 1, list(self.sites.values())[0], xmlfile) def gchr_local() -> None: base_dir = os.getcwd() + "/../GCHR2_Testdaten/Kunden" for path in Path(base_dir).glob("*"): if path.is_dir(): print(path.name) gchr_export(str(path)) def gchr_export(base_dir: str) -> None: gchr = GCHR(base_dir) # gchr.export_all_periods(overwrite=True, today="2022-08-01") gchr.export_all_periods()