import pandas as pd import numpy as np import xml.etree.ElementTree as ET import csv from xml.dom import minidom from datetime import datetime import logging from pathlib import Path import os ACCOUNT_INFO = [ "Account", "Make", "Site", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString", ] class GCHR: booking_date: datetime df_bookings: pd.DataFrame = None def __init__(self, base_dir) -> None: self.base_dir = base_dir self.account_translation = f"{self.base_dir}/data/Kontenrahmen_uebersetzt.csv" self.account_bookings = list(Path(self.base_dir).joinpath("data").glob("GuV_Bilanz_Salden*.csv")) self.first_month_of_financial_year = "01" pd.set_option("display.max_rows", 500) pd.set_option("display.float_format", lambda x: "%.2f" % x) def set_bookkeep_period(self, year, month): self.current_year = year self.current_month = month period = f"{year}-{month}" prot_file = f"{self.export_info_dir}/protokoll_{period}.log" logging.basicConfig( filename=prot_file, filemode="w", encoding="utf-8", level=logging.DEBUG, force=True, ) self.debug_file = f"{self.export_info_dir}/debug_{period}.csv" self.account_ignored = f"{self.export_info_dir}/ignoriert_{period}.csv" self.account_invalid = f"{self.export_info_dir}/ungueltig_{period}.csv" self.last_year = str(int(self.current_year) - 1) self.last_year2 = str(int(self.current_year) - 2) self.next_year = str(int(self.current_year) + 1) def header(self, makes, sites, main_site): return { "Country": "DE", "MainBmCode": main_site, "Month": self.current_month, "Year": self.current_year, "Currency": "EUR", "NumberOfMakes": len(makes), "NumberOfSites": len(sites), "ExtractionDate": self.booking_date.strftime("%d.%m.%Y"), "ExtractionTime": self.booking_date.strftime("%H:%M:%S"), "BeginFiscalYear": self.first_month_of_financial_year, } @property def bookkeep_filter(self): period = [self.current_year + str(i).zfill(2) for i in range(1, 13)] if self.first_month_of_financial_year != "01": if self.first_month_of_financial_year > self.current_month: period = [self.last_year + str(i).zfill(2) for i in range(1, 13)] + period else: period = period + [self.next_year + str(i).zfill(2) for i in range(1, 13)] fm = int(self.first_month_of_financial_year) period = period[fm - 1 : fm + 12] period = [self.current_year + "00"] + period rename_to = ["OpeningBalance"] + ["Period" + str(i).zfill(2) for i in range(1, 13)] return dict(zip(period, rename_to)) def extract_acct_info(self, df: pd.DataFrame): acct_info = [ "Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", ] df["Konto_Nr_SKR51"] = df.index df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True) return df def export_all_periods(self, overwrite=False, today=None): dt = datetime.now() if today is not None: dt = datetime.fromisoformat(today) prev = str(dt.year - 1) periods = [(prev, str(x).zfill(2)) for x in range(dt.month, 13)] + [ (str(dt.year), str(x).zfill(2)) for x in range(1, dt.month) ] for year, month in periods: filename = self.export_filename_for_period(year, month) if overwrite or not Path(filename).exists(): os.makedirs(Path(filename).parent.joinpath("info"), exist_ok=True) self.export_period(year, month) def export_period(self, year, month): self.set_bookkeep_period(year, month) # Übersetzungstabelle laden df_translate_import = pd.read_csv( self.account_translation, decimal=",", sep=";", encoding="latin-1", converters={i: str for i in range(0, 200)}, ) df_translate = self.prepare_translation(df_translate_import) # Kontensalden laden df_bookings = self.filter_bookings() all_periods = set(df_bookings["Bookkeep Period"].to_list()) bookkeep_period_date = datetime(int(year), int(month), 28) if df_bookings.shape[0] == 0 or len(all_periods) <= 1 or self.booking_date < bookkeep_period_date: logging.error("ABBRUCH!!! Keine Daten vorhanden!") return False filter_to = self.current_year + self.current_month period_no = list(self.bookkeep_filter.keys()).index(filter_to) + 1 logging.info("df_bookings: " + str(df_bookings.shape)) # Join auf Übersetzung df_combined = df_bookings.merge(df_translate, how="inner", on="Konto_Nr_Händler") logging.info(f"df_combined: {df_combined.shape}") # Hack für fehlende Markenzuordnung df_combined["Fremdmarke"] = df_combined["Marke_HBV"].str.match(r"^0000") df_combined["Marke"] = np.where(df_combined["Fremdmarke"], "99", df_combined["Marke"]) df_combined["Standort_egal"] = df_combined["Standort_HBV"].str.match(r"^\d\d_") df_combined["Standort_HBV"] = np.where( df_combined["Fremdmarke"] | df_combined["Standort_egal"], "0000", df_combined["Standort_HBV"], ) makes = df_combined[["Marke", "Marke_HBV"]].drop_duplicates().sort_values(by=["Marke"]) sites = ( df_combined[["Marke", "Standort", "Standort_HBV"]].drop_duplicates().sort_values(by=["Marke", "Standort"]) ) # df_combined.to_csv(account_invalid, decimal=',', sep=';', encoding='latin-1', index=False) # Gruppieren # df_grouped = df_combined.groupby(['Konto_Nr_SKR51', 'period']).sum() df_pivot = df_combined.pivot_table( index=["Konto_Nr_SKR51"], columns=["period"], values="amount", aggfunc="sum", margins=True, margins_name="CumulatedYear", ) df_pivot.drop(index="CumulatedYear", inplace=True) logging.info("df_pivot: " + str(df_pivot.shape)) df = self.special_translation(df_pivot, makes) from_label = ["Marke", "Standort", "Konto_Nr", "Kostenstelle", "Absatzkanal", "Kostenträger", "KRM"] to_label = ["Make", "Site", "Account", "Origin", "SalesChannel", "CostCarrier", "CostAccountingString"] col_dict = dict(zip(from_label, to_label)) df = df.rename(columns=col_dict) makes = makes.rename(columns=col_dict).to_dict(orient="records") sites = sites.rename(columns=col_dict).to_dict(orient="records") df_invalid = df[df["IsNumeric"] == False] df_invalid.to_csv(self.account_invalid, decimal=",", sep=";", encoding="latin-1", index=False) export_csv = self.export_filename[:-4] + ".csv" df.to_csv(export_csv, decimal=",", sep=";", encoding="latin-1", index=False) df = df[df["IsNumeric"] != False].groupby(ACCOUNT_INFO, as_index=False).aggregate("sum") # Infos ergänzen df["Decimals"] = 2 # df['OpeningBalance'] = 0.0 logging.info(df.shape) self.export_xml( df.to_dict(orient="records"), self.bookkeep_filter, period_no, makes, sites, sites[0]["Standort_HBV"] ) # Join auf Übersetzung - nicht zugeordnet df_ignored = df_bookings.merge(df_translate, how="left", on="Konto_Nr_Händler") df_ignored = df_ignored[ df_ignored["Konto_Nr_SKR51"].isna() ] # [['Konto_Nr_Händler', 'Bookkeep Period', 'amount', 'quantity']] if not df_ignored.empty: df_ignored = df_ignored.pivot_table( index=["Konto_Nr_Händler"], columns=["period"], values="amount", aggfunc=np.sum, margins=True, margins_name="CumulatedYear", ) df_ignored.to_csv(self.account_ignored, decimal=",", sep=";", encoding="latin-1") return self.export_filename def prepare_translation(self, df_translate: pd.DataFrame): logging.info(df_translate.shape) df_translate["duplicated"] = df_translate.duplicated() logging.info(df_translate[df_translate["duplicated"]]) df_translate = df_translate[ [ "Konto_Nr_Händler", "Konto_Nr_SKR51", "Marke", "Marke_HBV", "Standort", "Standort_HBV", ] ] row = ( df_translate[["Marke", "Marke_HBV", "Standort", "Standort_HBV"]] .drop_duplicates() .sort_values(by=["Marke", "Standort"]) .iloc[:1] .to_dict(orient="records")[0] ) row["Konto_Nr_Händler"] = "01-01-0861-00-00-00" row["Konto_Nr_SKR51"] = "01-01-0861-00-00-00" df_translate = pd.concat([df_translate, pd.DataFrame.from_records([row])]) # print(df_translate.tail()) # df_translate.drop(columns=['duplicated'], inplace=True) df_translate.drop_duplicates(inplace=True) df_translate.set_index("Konto_Nr_Händler") return df_translate def special_translation(self, df: pd.DataFrame, makes: pd.DataFrame): df = self.extract_acct_info(df) # df = df_translate.reset_index(drop=True).drop(columns=['Kostenträger_Ebene']).drop_duplicates() logging.info(df.shape) logging.info(df.columns) logging.info(df.head()) # df = df.merge(df_translate, how='inner', on='Konto_Nr_SKR51') logging.info("df: " + str(df.shape)) df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]") df["Kontoart"] = np.where(df["Bilanz"], "1", "2") df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"]) df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"]) df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1) # Hack für fehlende Markenzuordnung df = df.merge(makes, how="left", on="Marke") df["Marke"] = np.where(df["Marke_HBV"].isna(), "99", df["Marke"]) df_debug = df.drop(columns=["Bilanz"]) logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum")) logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum")) logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum")) df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv( self.debug_file, decimal=",", sep=";", encoding="latin-1" ) # Bereinigung GW-Kostenträger df["NW_Verkauf_1"] = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d")) df["Kostenstelle"] = np.where(df["NW_Verkauf_1"] == True, "11", df["Kostenstelle"]) df["Konto_7010"] = df["Konto_Nr"].str.match(r"^[78]01[01]") df["Kostenstelle"] = np.where(df["Konto_7010"] == True, "14", df["Kostenstelle"]) df["GW_Verkauf_2"] = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_2"] == True, "21", df["Kostenstelle"]) df["GW_Verkauf_3"] = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_3"] == True, "31", df["Kostenstelle"]) df["GW_Verkauf_4"] = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_4"] == True, "41", df["Kostenstelle"]) df["GW_Verkauf_x420"] = df["Konto_Nr"].str.match(r"^[78]420") df["Kostenstelle"] = np.where(df["GW_Verkauf_x420"] == True, "42", df["Kostenstelle"]) df["GW_Verkauf_5"] = (df["Konto_Nr"].str.match(r"^[78]5")) & (df["Kostenstelle"].str.match(r"^[^5]\d")) df["Kostenstelle"] = np.where(df["GW_Verkauf_5"] == True, "51", df["Kostenstelle"]) df["GW_Verkauf_50"] = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Verkauf_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where( (df["GW_Verkauf_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"], ) df["NW_Verkauf_00"] = ( (df["Konto_Nr"].str.match(r"^[78]2")) & (df["Kostenstelle"].str.match(r"^1")) & (df["Kostenträger"].str.match(r"^[^01234]")) ) df["Kostenträger"] = np.where(df["NW_Verkauf_00"] == True, "00", df["Kostenträger"]) df["GW_Stk_50"] = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2")) df["Kostenträger"] = np.where(df["GW_Stk_50"] == True, "52", df["Kostenträger"]) df["Kostenträger"] = np.where((df["GW_Stk_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"]) df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"]) df["Konto_5er"] = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143")) df["Absatzkanal"] = np.where(df["Konto_5er"] == True, "99", df["Absatzkanal"]) df["Konto_5005"] = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]")) df["Kostenstelle"] = np.where(df["Konto_5005"] == True, "20", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5005"] == True, "50", df["Kostenträger"]) df["Konto_5007"] = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)")) df["Kostenstelle"] = np.where(df["Konto_5007"] == True, "41", df["Kostenstelle"]) df["Kostenträger"] = np.where(df["Konto_5007"] == True, "70", df["Kostenträger"]) df["Konto_914er"] = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]")) df["Kostenträger"] = np.where(df["Konto_914er"] == True, "70", df["Kostenträger"]) df["Teile_30_60"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[3]")) & (df["Kostenträger"].str.match(r"^[^6]")) ) df["Kostenträger"] = np.where(df["Teile_30_60"] == True, "60", df["Kostenträger"]) df["Service_40_70"] = ( (df["Konto_Nr"].str.match(r"^[578]")) & (df["Kostenstelle"].str.match(r"^[4]")) & (df["Kostenträger"].str.match(r"^[^7]")) ) df["Kostenträger"] = np.where(df["Service_40_70"] == True, "70", df["Kostenträger"]) df["KRM"] = df["Marke"] + df["Standort"] + df["Kostenstelle"] + df["Absatzkanal"] + df["Kostenträger"] df["IsNumeric"] = (df["KRM"].str.isdigit()) & (df["Konto_Nr"].str.isdigit()) & (df["Konto_Nr"].str.len() == 4) return df def load_bookings_from_file(self): df2 = [] timestamps = [] for csv_file in self.account_bookings: df2.append( pd.read_csv( csv_file, decimal=",", sep=";", encoding="latin-1", converters={0: str, 1: str}, ) ) timestamps.append(Path(csv_file).stat().st_mtime) self.booking_date = datetime.fromtimestamp(max(timestamps)) self.df_bookings = pd.concat(df2) self.df_bookings["amount"] = (self.df_bookings["Debit Amount"] + self.df_bookings["Credit Amount"]).round(2) def filter_bookings(self): if self.df_bookings is None: self.load_bookings_from_file() # Kontensalden auf gegebenen Monat filtern filter_from = self.current_year + self.first_month_of_financial_year filter_prev = self.last_year + self.first_month_of_financial_year if self.first_month_of_financial_year > self.current_month: filter_from = self.last_year + self.first_month_of_financial_year filter_prev = self.last_year2 + self.first_month_of_financial_year filter_to = self.current_year + self.current_month filter_opening = self.current_year + "00" filter_prev_opening = self.last_year + "00" prev_year_closed = True df_opening_balance = self.df_bookings[(self.df_bookings["Bookkeep Period"] == filter_opening)] if df_opening_balance.shape[0] == 0: df_opening_balance = self.df_bookings[ (self.df_bookings["Bookkeep Period"] == filter_prev_opening) | ( (self.df_bookings["Bookkeep Period"] >= filter_prev) & (self.df_bookings["Bookkeep Period"] < filter_from) ) ].copy() df_opening_balance["Bookkeep Period"] = filter_opening prev_year_closed = False df_opening_balance = df_opening_balance[(df_opening_balance["Konto_Nr_Händler"].str.contains(r"-[013]\d\d+-"))] opening_balance = df_opening_balance["amount"].aggregate("sum").round(2) logging.info("Gewinn/Verlustvortrag") logging.info(opening_balance) if not prev_year_closed: row = { "Konto_Nr_Händler": "01-01-0861-00-00-00", "Bookkeep Period": filter_opening, "Debit Amount": opening_balance * -1, "Credit Amount": 0, "Debit Quantity": 0, "Credit Quantity": 0, "amount": opening_balance * -1, } df_opening_balance = pd.concat([df_opening_balance, pd.DataFrame.from_records([row])]) df_filtered = self.df_bookings[ (self.df_bookings["Bookkeep Period"] >= filter_from) & (self.df_bookings["Bookkeep Period"] <= filter_to) ] # Buchungen kopieren und als Statistikkonten anhängen df_stats = df_filtered.copy() # df_stats = df_stats[df_stats['Konto_Nr_Händler'].str.match(r'-[24578]\d\d\d-')] df_stats["Konto_Nr_Händler"] = df_stats["Konto_Nr_Händler"].str.replace(r"-(\d\d\d+)-", r"-\1_STK-", regex=True) df_stats["amount"] = (df_filtered["Debit Quantity"] + df_filtered["Credit Quantity"]).round(2) df_combined = pd.concat([df_opening_balance, df_filtered, df_stats]) # Spalten konvertieren df_combined["period"] = df_combined["Bookkeep Period"].apply(lambda x: self.bookkeep_filter[x]) return df_combined[df_combined["amount"] != 0.00] @property def export_filename(self): return self.export_filename_for_period(self.current_year, self.current_month) @property def export_info_dir(self): return f"{self.base_dir}/Export/{self.current_year}/info/" def export_filename_for_period(self, year, month): return f"{self.base_dir}/Export/{year}/export_{year}-{month}.xml" def export_xml(self, records, bk_filter, period_no, makes, sites, main_site): record_elements = ACCOUNT_INFO + ["Decimals"] + list(bk_filter.values())[:period_no] + ["CumulatedYear"] root = ET.Element("HbvData") h = ET.SubElement(root, "Header") for k, v in self.header(makes, sites, main_site).items(): ET.SubElement(h, k).text = str(v) make_list = ET.SubElement(root, "MakeList") for m in makes: e = ET.SubElement(make_list, "MakeListEntry") ET.SubElement(e, "Make").text = m["Make"] ET.SubElement(e, "MakeCode").text = m["Marke_HBV"] bm_code_list = ET.SubElement(root, "BmCodeList") for s in sites: e = ET.SubElement(bm_code_list, "BmCodeEntry") ET.SubElement(e, "Make").text = s["Make"] ET.SubElement(e, "Site").text = s["Site"] ET.SubElement(e, "BmCode").text = s["Standort_HBV"] record_list = ET.SubElement(root, "RecordList") for row in records: record = ET.SubElement(record_list, "Record") for e in record_elements: child = ET.SubElement(record, e) field = row.get(e, 0.0) if str(field) == "nan": field = "0" elif type(field) is float: field = "{:.0f}".format(field * 100) child.text = str(field) with open(self.export_filename, "w", encoding="utf-8") as fwh: fwh.write(minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")) def convert_to_row(self, node): return [child.text for child in node] def convert_xml_to_csv(self, xmlfile, csvfile): record_list = ET.parse(xmlfile).getroot().find("RecordList") header = [child.tag for child in record_list.find("Record")] bookings = [self.convert_to_row(node) for node in record_list.findall("Record")] with open(csvfile, "w") as fwh: cwh = csv.writer(fwh, delimiter=";") cwh.writerow(header) cwh.writerows(bookings) return True def convert_csv_to_xml(self, csvfile, xmlfile): makes = [{"Make": "01", "Marke_HBV": "1844"}] sites = [{"Make": "01", "Site": "01", "Marke_HBV": "1844"}] with open(csvfile, "r", encoding="latin-1") as frh: csv_reader = csv.DictReader(frh, delimiter=";") self.export_xml(csv_reader, self.bookkeep_filter(), 1, makes, sites, sites[0]["Standort_HBV"], xmlfile) def gchr_local(): base_dir = os.getcwd() + "/../GCHR2_Testdaten/Kunden" for path in Path(base_dir).glob("*"): if path.is_dir(): print(path.name) gchr_export(str(path)) def gchr_export(base_dir): gchr = GCHR(base_dir) gchr.export_all_periods(overwrite=True, today="2022-08-01") if __name__ == "__main__": gchr_local() # import cProfile # cProfile.run( # "gchr_local()", # "gchr_local.prof", # )