123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- import pandas as pd
- import numpy as np
- import xml.etree.ElementTree as ET
- import csv
- from xml.dom import minidom
- from datetime import datetime
- import logging
- from pathlib import Path
- import os
- from enum import Enum, auto
- ACCOUNT_INFO = [
- "Account",
- "Make",
- "Site",
- "Origin",
- "SalesChannel",
- "CostCarrier",
- "CostAccountingString",
- ]
- class GCHR:
- def __init__(self, base_dir) -> None:
- self.base_dir = base_dir
- self.account_translation = f"{self.base_dir}/data/Kontenrahmen_uebersetzt.csv"
- self.account_bookings = list(Path(self.base_dir).joinpath("data").glob("GuV_Bilanz_Salden*.csv"))
- self.first_month_of_financial_year = "09"
- pd.set_option("display.max_rows", 500)
- pd.set_option("display.float_format", lambda x: "%.2f" % x)
- def set_bookkeep_period(self, year, month):
- self.current_year = year
- self.current_month = month
- period = f"{year}-{month}"
- prot_file = f"{self.export_info_dir}/protokoll_{period}.log"
- logging.basicConfig(
- filename=prot_file,
- filemode="w",
- encoding="utf-8",
- level=logging.DEBUG,
- force=True,
- )
- self.debug_file = f"{self.export_info_dir}/debug_{period}.csv"
- self.account_ignored = f"{self.export_info_dir}/ignoriert_{period}.csv"
- self.account_invalid = f"{self.export_info_dir}/ungueltig_{period}.csv"
- self.last_year = str(int(self.current_year) - 1)
- self.last_year2 = str(int(self.current_year) - 2)
- self.next_year = str(int(self.current_year) + 1)
- def header(self, makes, sites):
- return {
- "Country": "DE",
- "MainBmCode": sites[0]["Standort_HBV"],
- "Month": self.current_month,
- "Year": self.current_year,
- "Currency": "EUR",
- "NumberOfMakes": len(makes),
- "NumberOfSites": len(sites),
- "ExtractionDate": datetime.now().strftime("%d.%m.%Y"),
- "ExtractionTime": datetime.now().strftime("%H:%M:%S"),
- "BeginFiscalYear": self.first_month_of_financial_year,
- }
- def bookkeep_filter(self):
- period = [self.current_year + str(i).zfill(2) for i in range(1, 13)]
- if self.first_month_of_financial_year != "01":
- if self.first_month_of_financial_year > self.current_month:
- period = [self.last_year + str(i).zfill(2) for i in range(1, 13)] + period
- else:
- period = period + [self.next_year + str(i).zfill(2) for i in range(1, 13)]
- fm = int(self.first_month_of_financial_year)
- period = period[fm - 1 : fm + 12]
- period = [self.current_year + "00"] + period
- rename_to = ["OpeningBalance"] + ["Period" + str(i).zfill(2) for i in range(1, 13)]
- return dict(zip(period, rename_to))
- def extract_acct_info(self, df: pd.DataFrame):
- acct_info = [
- "Marke",
- "Standort",
- "Konto_Nr",
- "Kostenstelle",
- "Absatzkanal",
- "Kostenträger",
- ]
- df["Konto_Nr_SKR51"] = df.index
- df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True)
- return df
- def export_all_periods(self):
- dt = datetime.now()
- prev = str(dt.year - 1)
- periods = [(prev, str(x).zfill(2)) for x in range(dt.month, 13)] + [
- (str(dt.year), str(x).zfill(2)) for x in range(1, dt.month)
- ]
- for year, month in periods:
- filename = self.export_filename_for_period(year, month)
- if not Path(filename).exists():
- os.makedirs(Path(filename).parent.joinpath("info"), exist_ok=True)
- self.export_period(year, month)
- def export_period(self, year, month):
- self.set_bookkeep_period(year, month)
- # Übersetzungstabelle laden
- df_translate = pd.read_csv(
- self.account_translation,
- decimal=",",
- sep=";",
- encoding="latin-1",
- converters={i: str for i in range(0, 200)},
- )
- logging.info(df_translate.shape)
- df_translate["duplicated"] = df_translate.duplicated()
- logging.info(df_translate[df_translate["duplicated"]])
- df_translate = df_translate[
- [
- "Konto_Nr_Händler",
- "Konto_Nr_SKR51",
- "Marke",
- "Marke_HBV",
- "Standort",
- "Standort_HBV",
- ]
- ]
- row = (
- df_translate[["Marke", "Marke_HBV", "Standort", "Standort_HBV"]]
- .drop_duplicates()
- .sort_values(by=["Marke", "Standort"])
- .iloc[:1]
- .to_dict(orient="records")[0]
- )
- row["Konto_Nr_Händler"] = "01-01-0861-00-00-00"
- row["Konto_Nr_SKR51"] = "01-01-0861-00-00-00"
- df_translate = pd.concat([df_translate, pd.DataFrame.from_records([row])])
- # print(df_translate.tail())
- # df_translate.drop(columns=['duplicated'], inplace=True)
- df_translate.drop_duplicates(inplace=True)
- df_translate.set_index("Konto_Nr_Händler")
- # Kontensalden laden
- df2 = []
- for csv_file in self.account_bookings:
- df2.append(
- pd.read_csv(
- csv_file,
- decimal=",",
- sep=";",
- encoding="latin-1",
- converters={0: str, 1: str},
- )
- )
- df_bookings = pd.concat(df2)
- # Kontensalden auf gegebenen Monat filtern
- filter_from = self.current_year + self.first_month_of_financial_year
- filter_prev = self.last_year + self.first_month_of_financial_year
- if self.first_month_of_financial_year > self.current_month:
- filter_from = self.last_year + self.first_month_of_financial_year
- filter_prev = self.last_year2 + self.first_month_of_financial_year
- filter_to = self.current_year + self.current_month
- filter_opening = self.current_year + "00"
- filter_prev_opening = self.last_year + "00"
- prev_year_closed = True
- df_opening_balance = df_bookings[(df_bookings["Bookkeep Period"] == filter_opening)]
- if df_opening_balance.shape[0] == 0:
- df_opening_balance = df_bookings[
- (df_bookings["Bookkeep Period"] == filter_prev_opening)
- | ((df_bookings["Bookkeep Period"] >= filter_prev) & (df_bookings["Bookkeep Period"] < filter_from))
- ].copy()
- df_opening_balance["Bookkeep Period"] = filter_opening
- prev_year_closed = False
- # df_opening_balance = df_opening_balance.merge(df_translate, how='inner', on='Konto_Nr_Händler')
- df_opening_balance = df_opening_balance[(df_opening_balance["Konto_Nr_Händler"].str.contains(r"-[013]\d\d+-"))]
- df_opening_balance["amount"] = (df_opening_balance["Debit Amount"] + df_opening_balance["Credit Amount"]).round(
- 2
- )
- # df_opening_balance.drop(columns=['Debit Amount', 'Credit Amount', 'Debit Quantity', 'Credit Quantity'], inplace=True)
- # df_opening_balance = df_opening_balance.groupby(['Marke', 'Standort']).sum()
- opening_balance = df_opening_balance["amount"].aggregate("sum").round(2)
- logging.info("Gewinn/Verlustvortrag")
- logging.info(opening_balance)
- if not prev_year_closed:
- row = {
- "Konto_Nr_Händler": "01-01-0861-00-00-00",
- "Bookkeep Period": filter_opening,
- "Debit Amount": opening_balance * -1,
- "Credit Amount": 0,
- "Debit Quantity": 0,
- "Credit Quantity": 0,
- "amount": opening_balance * -1,
- }
- df_opening_balance = pd.concat([df_opening_balance, pd.DataFrame.from_records([row])])
- df_bookings = df_bookings[
- (df_bookings["Bookkeep Period"] >= filter_from) & (df_bookings["Bookkeep Period"] <= filter_to)
- ]
- df_bookings["amount"] = (df_bookings["Debit Amount"] + df_bookings["Credit Amount"]).round(2)
- df_stats = df_bookings.copy()
- # df_stats = df_stats[df_stats['Konto_Nr_Händler'].str.match(r'-[24578]\d\d\d-')]
- df_stats["Konto_Nr_Händler"] = df_stats["Konto_Nr_Händler"].str.replace(r"-(\d\d\d+)-", r"-\1_STK-", regex=True)
- df_stats["amount"] = (df_bookings["Debit Quantity"] + df_bookings["Credit Quantity"]).round(2)
- df_bookings = pd.concat([df_opening_balance, df_bookings, df_stats])
- df_bookings = df_bookings[df_bookings["amount"] != 0.00]
- if df_bookings.shape[0] == 0:
- logging.error("ABBRUCH!!! Keine Daten vorhanden!")
- return False
- bk_filter = self.bookkeep_filter()
- period_no = list(bk_filter.keys()).index(filter_to) + 1
- # Spalten konvertieren
- df_bookings["period"] = df_bookings["Bookkeep Period"].apply(lambda x: bk_filter[x])
- logging.info("df_bookings: " + str(df_bookings.shape))
- # Join auf Übersetzung
- df_combined = df_bookings.merge(df_translate, how="inner", on="Konto_Nr_Händler")
- logging.info(f"df_combined: {df_combined.shape}")
- # Hack für fehlende Markenzuordnung
- df_combined["Fremdmarke"] = df_combined["Marke_HBV"].str.match(r"^0000")
- df_combined["Marke"] = np.where(df_combined["Fremdmarke"], "99", df_combined["Marke"])
- df_combined["Standort_egal"] = df_combined["Standort_HBV"].str.match(r"^\d\d_")
- df_combined["Standort_HBV"] = np.where(
- df_combined["Fremdmarke"] | df_combined["Standort_egal"],
- "0000",
- df_combined["Standort_HBV"],
- )
- makes = df_combined[["Marke", "Marke_HBV"]].drop_duplicates().sort_values(by=["Marke"])
- sites = (
- df_combined[["Marke", "Standort", "Standort_HBV"]].drop_duplicates().sort_values(by=["Marke", "Standort"])
- )
- # df_combined.to_csv(account_invalid, decimal=',', sep=';', encoding='latin-1', index=False)
- # Gruppieren
- # df_grouped = df_combined.groupby(['Konto_Nr_SKR51', 'period']).sum()
- df = df_combined.pivot_table(
- index=["Konto_Nr_SKR51"],
- columns=["period"],
- values="amount",
- aggfunc=np.sum,
- margins=True,
- margins_name="CumulatedYear",
- )
- logging.info("df_pivot: " + str(df.shape))
- df = self.extract_acct_info(df)
- # df = df_translate.reset_index(drop=True).drop(columns=['Kostenträger_Ebene']).drop_duplicates()
- logging.info(df.shape)
- logging.info(df.columns)
- logging.info(df.head())
- # df = df.merge(df_translate, how='inner', on='Konto_Nr_SKR51')
- logging.info("df: " + str(df.shape))
- df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]")
- df["Kontoart"] = np.where(df["Bilanz"], "1", "2")
- df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"])
- df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"])
- df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1)
- # Hack für fehlende Markenzuordnung
- df = df.merge(makes, how="left", on="Marke")
- df["Marke"] = np.where(df["Marke_HBV"].isna(), "99", df["Marke"])
- df_debug = df.drop(columns=["Bilanz"])
- logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum"))
- logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum"))
- logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum"))
- df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv(
- self.debug_file, decimal=",", sep=";", encoding="latin-1"
- )
- # Bereinigung GW-Kostenträger
- df["NW_Verkauf_1"] = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d"))
- df["Kostenstelle"] = np.where(df["NW_Verkauf_1"] == True, "11", df["Kostenstelle"])
- df["Konto_7010"] = df["Konto_Nr"].str.match(r"^[78]01[01]")
- df["Kostenstelle"] = np.where(df["Konto_7010"] == True, "14", df["Kostenstelle"])
- df["GW_Verkauf_2"] = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d"))
- df["Kostenstelle"] = np.where(df["GW_Verkauf_2"] == True, "21", df["Kostenstelle"])
- df["GW_Verkauf_3"] = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d"))
- df["Kostenstelle"] = np.where(df["GW_Verkauf_3"] == True, "31", df["Kostenstelle"])
- df["GW_Verkauf_4"] = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d"))
- df["Kostenstelle"] = np.where(df["GW_Verkauf_4"] == True, "41", df["Kostenstelle"])
- df["GW_Verkauf_x420"] = df["Konto_Nr"].str.match(r"^[78]420")
- df["Kostenstelle"] = np.where(df["GW_Verkauf_x420"] == True, "42", df["Kostenstelle"])
- df["GW_Verkauf_5"] = (df["Konto_Nr"].str.match(r"^[78]5")) & (df["Kostenstelle"].str.match(r"^[^5]\d"))
- df["Kostenstelle"] = np.where(df["GW_Verkauf_5"] == True, "51", df["Kostenstelle"])
- df["GW_Verkauf_50"] = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2"))
- df["Kostenträger"] = np.where(df["GW_Verkauf_50"] == True, "52", df["Kostenträger"])
- df["Kostenträger"] = np.where(
- (df["GW_Verkauf_50"] == True) & (df["Marke"] == "01"),
- "50",
- df["Kostenträger"],
- )
- df["NW_Verkauf_00"] = (
- (df["Konto_Nr"].str.match(r"^[78]2"))
- & (df["Kostenstelle"].str.match(r"^1"))
- & (df["Kostenträger"].str.match(r"^[^01234]"))
- )
- df["Kostenträger"] = np.where(df["NW_Verkauf_00"] == True, "00", df["Kostenträger"])
- df["GW_Stk_50"] = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2"))
- df["Kostenträger"] = np.where(df["GW_Stk_50"] == True, "52", df["Kostenträger"])
- df["Kostenträger"] = np.where((df["GW_Stk_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"])
- df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"])
- df["Konto_5er"] = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143"))
- df["Absatzkanal"] = np.where(df["Konto_5er"] == True, "99", df["Absatzkanal"])
- df["Konto_5005"] = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]"))
- df["Kostenstelle"] = np.where(df["Konto_5005"] == True, "20", df["Kostenstelle"])
- df["Kostenträger"] = np.where(df["Konto_5005"] == True, "50", df["Kostenträger"])
- df["Konto_5007"] = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)"))
- df["Kostenstelle"] = np.where(df["Konto_5007"] == True, "41", df["Kostenstelle"])
- df["Kostenträger"] = np.where(df["Konto_5007"] == True, "70", df["Kostenträger"])
- df["Konto_914er"] = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]"))
- df["Kostenträger"] = np.where(df["Konto_914er"] == True, "70", df["Kostenträger"])
- df["Teile_30_60"] = (
- (df["Konto_Nr"].str.match(r"^[578]"))
- & (df["Kostenstelle"].str.match(r"^[3]"))
- & (df["Kostenträger"].str.match(r"^[^6]"))
- )
- df["Kostenträger"] = np.where(df["Teile_30_60"] == True, "60", df["Kostenträger"])
- df["Service_40_70"] = (
- (df["Konto_Nr"].str.match(r"^[578]"))
- & (df["Kostenstelle"].str.match(r"^[4]"))
- & (df["Kostenträger"].str.match(r"^[^7]"))
- )
- df["Kostenträger"] = np.where(df["Service_40_70"] == True, "70", df["Kostenträger"])
- from_label = [
- "Marke",
- "Standort",
- "Konto_Nr",
- "Kostenstelle",
- "Absatzkanal",
- "Kostenträger",
- ]
- to_label = ["Make", "Site", "Account", "Origin", "SalesChannel", "CostCarrier"]
- df = df.rename(columns=dict(zip(from_label, to_label)))
- makes = makes.rename(columns=dict(zip(from_label, to_label))).to_dict(orient="records")
- sites = sites.rename(columns=dict(zip(from_label, to_label))).to_dict(orient="records")
- df["CostAccountingString"] = df["Make"] + df["Site"] + df["Origin"] + df["SalesChannel"] + df["CostCarrier"]
- df["IsNumeric"] = (
- (df["CostAccountingString"].str.isdigit()) & (df["Account"].str.isdigit()) & (df["Account"].str.len() == 4)
- )
- df_invalid = df[df["IsNumeric"] == False]
- df_invalid.to_csv(self.account_invalid, decimal=",", sep=";", encoding="latin-1", index=False)
- export_csv = self.export_filename[:-4] + ".csv"
- df.to_csv(export_csv, decimal=",", sep=";", encoding="latin-1", index=False)
- df = df[df["IsNumeric"] != False].groupby(ACCOUNT_INFO, as_index=False).aggregate("sum")
- # Infos ergänzen
- df["Decimals"] = 2
- # df['OpeningBalance'] = 0.0
- logging.info(df.shape)
- self.export_xml(df.to_dict(orient="records"), bk_filter, period_no, makes, sites)
- # Join auf Übersetzung - nicht zugeordnet
- df_ignored = df_bookings.merge(df_translate, how="left", on="Konto_Nr_Händler")
- df_ignored = df_ignored[
- df_ignored["Konto_Nr_SKR51"].isna()
- ] # [['Konto_Nr_Händler', 'Bookkeep Period', 'amount', 'quantity']]
- if not df_ignored.empty:
- df_ignored = df_ignored.pivot_table(
- index=["Konto_Nr_Händler"],
- columns=["period"],
- values="amount",
- aggfunc=np.sum,
- margins=True,
- margins_name="CumulatedYear",
- )
- df_ignored.to_csv(self.account_ignored, decimal=",", sep=";", encoding="latin-1")
- return self.export_filename
- @property
- def export_filename(self):
- return self.export_filename_for_period(self.current_year, self.current_month)
- @property
- def export_info_dir(self):
- return f"{self.base_dir}/Export/{self.current_year}/info/"
- def export_filename_for_period(self, year, month):
- return f"{self.base_dir}/Export/{year}/export_{year}-{month}.xml"
- def export_xml(self, records, bk_filter, period_no, makes, sites):
- record_elements = ACCOUNT_INFO + ["Decimals"] + list(bk_filter.values())[:period_no] + ["CumulatedYear"]
- root = ET.Element("HbvData")
- h = ET.SubElement(root, "Header")
- for k, v in self.header(makes, sites).items():
- ET.SubElement(h, k).text = str(v)
- make_list = ET.SubElement(root, "MakeList")
- for m in makes:
- e = ET.SubElement(make_list, "MakeListEntry")
- ET.SubElement(e, "Make").text = m["Make"]
- ET.SubElement(e, "MakeCode").text = m["Marke_HBV"]
- bm_code_list = ET.SubElement(root, "BmCodeList")
- for s in sites:
- e = ET.SubElement(bm_code_list, "BmCodeEntry")
- ET.SubElement(e, "Make").text = s["Make"]
- ET.SubElement(e, "Site").text = s["Site"]
- ET.SubElement(e, "BmCode").text = s["Standort_HBV"]
- record_list = ET.SubElement(root, "RecordList")
- for row in records:
- record = ET.SubElement(record_list, "Record")
- for e in record_elements:
- child = ET.SubElement(record, e)
- field = row.get(e, 0.0)
- if str(field) == "nan":
- field = "0"
- elif type(field) is float:
- field = "{:.0f}".format(field * 100)
- child.text = str(field)
- with open(self.export_filename, "w", encoding="utf-8") as fwh:
- fwh.write(minidom.parseString(ET.tostring(root)).toprettyxml(indent=" "))
- def convert_to_row(self, node):
- return [child.text for child in node]
- def convert_xml_to_csv(self, xmlfile, csvfile):
- record_list = ET.parse(xmlfile).getroot().find("RecordList")
- header = [child.tag for child in record_list.find("Record")]
- bookings = [self.convert_to_row(node) for node in record_list.findall("Record")]
- with open(csvfile, "w") as fwh:
- cwh = csv.writer(fwh, delimiter=";")
- cwh.writerow(header)
- cwh.writerows(bookings)
- return True
- def convert_csv_to_xml(self, csvfile, xmlfile):
- makes = [{"Make": "01", "Marke_HBV": "1844"}]
- sites = [{"Make": "01", "Site": "01", "Marke_HBV": "1844"}]
- with open(csvfile, "r", encoding="latin-1") as frh:
- csv_reader = csv.DictReader(frh, delimiter=";")
- self.export_xml(csv_reader, self.bookkeep_filter(), 1, makes, sites, xmlfile)
- class Kunden(Enum):
- Altermann = auto()
- Barth_und_Frey = auto()
- Hannuschka = auto()
- Koenig_und_Partner = auto()
- Luchtenberg = auto()
- Russig_Neustadt_deop01 = auto()
- Russig_Neustadt_deop02 = auto()
- Siebrecht = auto()
- def gchr_local(base_dir):
- for path in Path(base_dir).glob("*"):
- if path.is_dir():
- print(path.name)
- gchr_export(str(path))
- def gchr_export(base_dir):
- gchr = GCHR(base_dir)
- gchr.export_all_periods()
- if __name__ == "__main__":
- base_dir = os.getcwd() + "/gcstruct/Kunden"
- if Path(base_dir).exists():
- gchr_local(base_dir)
- else:
- gchr_export(os.getcwd())
|