| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- import logging
- from pathlib import Path
- import numpy as np
- import pandas as pd
- TRANSLATE = [
- "Konto_Nr_Händler",
- "Konto_Nr_SKR51",
- "Marke",
- "Standort",
- "Konto_Nr",
- "Kostenstelle",
- "Absatzkanal",
- "Kostenträger",
- "Kontoart",
- "Konto_1",
- "KRM",
- "IsNumeric",
- ]
- def load_translation(
- account_translation: str, debug_file: str, export_invalid_filename: str
- ) -> tuple[dict[str, str], dict[str, str], pd.DataFrame, pd.DataFrame]:
- account_translation_gchr = account_translation[:-4] + "_GCHR.csv"
- if (
- Path(account_translation_gchr).exists()
- and Path(account_translation_gchr).stat().st_mtime >= Path(account_translation).stat().st_mtime
- and False
- ):
- df_translate = pd.read_csv(
- account_translation_gchr,
- decimal=",",
- sep=";",
- encoding="latin-1",
- converters={i: str for i in range(0, 200)},
- )
- makes = get_makes_from_translation(df_translate)
- sites = get_sites_from_translation(df_translate)
- df_translate2 = (
- df_translate.copy().drop(columns=["Konto_Nr_Händler"]).drop_duplicates().set_index("Konto_Nr_SKR51")
- )
- return (makes, sites, df_translate, df_translate2)
- df_translate_import = pd.read_csv(
- account_translation,
- decimal=",",
- sep=";",
- encoding="latin-1",
- converters={i: str for i in range(0, 200)},
- usecols=["Konto_Nr_Händler", "Konto_Nr_SKR51", "Marke", "Standort", "Marke_HBV", "Standort_HBV"],
- ) # .reset_index()
- makes = get_makes_from_translation(df_translate_import)
- sites = get_sites_from_translation(df_translate_import)
- df_prepared = prepare_translation(df_translate_import)
- df_translate = special_translation(df_prepared, makes, sites, debug_file, export_invalid_filename)
- df_translate2 = df_translate.copy().drop(columns=["Konto_Nr_Händler"]).drop_duplicates().set_index("Konto_Nr_SKR51")
- df_translate3 = (
- df_translate[["Kontoart", "Konto_Nr_SKR51", "Konto_Nr_Händler"]]
- .copy()
- .sort_values(by=["Kontoart", "Konto_Nr_SKR51"])
- )
- df_translate3.to_csv(account_translation_gchr, decimal=",", sep=";", encoding="latin-1", index=False)
- return (makes, sites, df_translate, df_translate2)
- def get_makes_from_translation(df_translate_import: pd.DataFrame) -> dict[str, str]:
- df_makes = df_translate_import[["Marke", "Marke_HBV"]].copy().drop_duplicates()
- df_makes = df_makes[df_makes["Marke_HBV"] != "0000"]
- makes = dict([(e["Marke"], e["Marke_HBV"]) for e in df_makes.to_dict(orient="records")])
- makes["99"] = "0000"
- return makes
- def get_sites_from_translation(df_translate_import: pd.DataFrame) -> dict[str, str]:
- df_sites = df_translate_import[["Marke", "Standort", "Standort_HBV"]].copy().drop_duplicates()
- df_sites["Standort_HBV"] = np.where(df_sites["Standort_HBV"].str.len() < 4, "0000", df_sites["Standort_HBV"])
- sites = dict([(e["Marke"] + "-" + e["Standort"], e["Standort_HBV"]) for e in df_sites.to_dict(orient="records")])
- return sites
- def prepare_translation(df_translate_import: pd.DataFrame, dest_account: str = "01-01-0860-10-00-00") -> pd.DataFrame:
- df = df_translate_import[
- [
- "Konto_Nr_Händler",
- "Konto_Nr_SKR51",
- ]
- ].drop_duplicates()
- logging.info(df.shape)
- row = {
- "Konto_Nr_Händler": "01-01-0861-00-00-00",
- "Konto_Nr_SKR51": dest_account,
- }
- df = pd.concat([df, pd.DataFrame.from_records([row])])
- df.set_index("Konto_Nr_Händler")
- return df
- def special_translation(
- df: pd.DataFrame, makes: dict[str, str], sites: dict[str, str], debug_file: str, export_invalid_filename: str
- ) -> pd.DataFrame:
- df["Konto_Nr_Händler"] = df["Konto_Nr_Händler"].str.upper()
- df["Konto_Nr_SKR51"] = df["Konto_Nr_SKR51"].str.upper()
- df = extract_acct_info(df)
- df["Konto_Nr"] = df["Konto_Nr"].str.upper()
- logging.info(df.shape)
- logging.info(df.columns)
- logging.info(df.head())
- logging.info("df: " + str(df.shape))
- df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]")
- df["Kontoart"] = np.where(df["Bilanz"], "1", "2")
- df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"])
- df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"])
- df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1)
- # fehlende Marken- und Standortzuordnung
- df["Marke"] = np.where(df["Marke"].isin(makes.keys()), df["Marke"], "99")
- df["Marke_Standort"] = df["Marke"] + "-" + df["Standort"]
- df["Standort"] = np.where(df["Marke_Standort"].isin(sites.keys()), df["Standort"], "01")
- if False:
- df_debug = df.drop(columns=["Bilanz"])
- logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum"))
- logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum"))
- logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum"))
- df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv(debug_file, decimal=",", sep=";", encoding="latin-1")
- # Bereinigung GW-Kostenträger
- kst_nw_verkauf_1 = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d"))
- df["Kostenstelle"] = np.where(kst_nw_verkauf_1 == True, "11", df["Kostenstelle"])
- kst_konto_7010 = df["Konto_Nr"].str.match(r"^[78]01[01]")
- df["Kostenstelle"] = np.where(kst_konto_7010 == True, "14", df["Kostenstelle"])
- kst_gw_verkauf_2 = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d"))
- df["Kostenstelle"] = np.where(kst_gw_verkauf_2 == True, "21", df["Kostenstelle"])
- kst_gw_verkauf_3 = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d"))
- df["Kostenstelle"] = np.where(kst_gw_verkauf_3 == True, "31", df["Kostenstelle"])
- kst_gw_verkauf_4 = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d"))
- df["Kostenstelle"] = np.where(kst_gw_verkauf_4 == True, "41", df["Kostenstelle"])
- kst_gw_verkauf_x420 = df["Konto_Nr"].str.match(r"^[78]420")
- df["Kostenstelle"] = np.where(kst_gw_verkauf_x420 == True, "42", df["Kostenstelle"])
- kst_gw_verkauf_5 = (df["Konto_Nr"].str.match(r"^[78]50")) & (df["Kostenstelle"].str.match(r"^[^5]\d"))
- df["Kostenstelle"] = np.where(kst_gw_verkauf_5 == True, "51", df["Kostenstelle"])
- kst_gw_verkauf_50 = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2"))
- df["Kostenträger"] = np.where(kst_gw_verkauf_50 == True, "52", df["Kostenträger"])
- df["Kostenträger"] = np.where(
- (kst_gw_verkauf_50 == True) & (df["Marke"] == "01"),
- "50",
- df["Kostenträger"],
- )
- ktr_nw_verkauf_00 = (
- (df["Konto_Nr"].str.match(r"^[78]2"))
- & (df["Kostenstelle"].str.match(r"^1"))
- & (df["Kostenträger"].str.match(r"^[^01234]"))
- )
- df["Kostenträger"] = np.where(ktr_nw_verkauf_00 == True, "00", df["Kostenträger"])
- ktr_gw_stk_50 = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2"))
- df["Kostenträger"] = np.where(ktr_gw_stk_50 == True, "52", df["Kostenträger"])
- df["Kostenträger"] = np.where((ktr_gw_stk_50 == True) & (df["Marke"] == "01"), "50", df["Kostenträger"])
- df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"])
- abs_konto_5 = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143"))
- df["Absatzkanal"] = np.where(abs_konto_5 == True, "99", df["Absatzkanal"])
- kst_ktr_konto_5005 = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]"))
- df["Kostenstelle"] = np.where(kst_ktr_konto_5005 == True, "20", df["Kostenstelle"])
- df["Kostenträger"] = np.where(kst_ktr_konto_5005 == True, "50", df["Kostenträger"])
- kst_ktr_konto_5007 = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)"))
- df["Kostenstelle"] = np.where(kst_ktr_konto_5007 == True, "41", df["Kostenstelle"])
- df["Kostenträger"] = np.where(kst_ktr_konto_5007 == True, "70", df["Kostenträger"])
- ktr_konto_914x = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]"))
- df["Kostenträger"] = np.where(ktr_konto_914x == True, "70", df["Kostenträger"])
- ktr_teile_30_60 = (
- (df["Konto_Nr"].str.match(r"^[578]"))
- & (df["Kostenstelle"].str.match(r"^[3]"))
- & (df["Kostenträger"].str.match(r"^[^6]"))
- )
- df["Kostenträger"] = np.where(ktr_teile_30_60 == True, "60", df["Kostenträger"])
- ktr_service_40_70 = (
- (df["Konto_Nr"].str.match(r"^[578]"))
- & (df["Kostenstelle"].str.match(r"^[4]"))
- & (df["Kostenträger"].str.match(r"^[^7]"))
- )
- df["Kostenträger"] = np.where(ktr_service_40_70 == True, "70", df["Kostenträger"])
- ktr_mw_70 = (df["Konto_Nr"].str.match(r"^[78]50")) & (df["Kostenträger"].str.match(r"^70"))
- df["Kostenträger"] = np.where(ktr_mw_70 == True, "00", df["Kostenträger"])
- ktr_mw_70_vz = (
- (df["Konto_Nr"].str.match(r"^5"))
- & (df["Kostenstelle"].str.match(r"^5[156]"))
- & (df["Kostenträger"].str.match(r"^70"))
- )
- df["Kostenträger"] = np.where(ktr_mw_70_vz == True, "00", df["Kostenträger"])
- # Volkswagen Specials
- kto_1250 = (df["Konto_Nr"].str.match(r"^1250")) & ~(df["Kostenstelle"].str.match(r"^1"))
- df["Kostenstelle"] = np.where(kto_1250, "11", df["Kostenstelle"])
- kto_1729 = (df["Konto_Nr"].str.match(r"^1729")) & (df["Kostenstelle"].str.match(r"^9"))
- df["Kostenstelle"] = np.where(kto_1729, "11", df["Kostenstelle"])
- kto_1795 = (df["Konto_Nr"].str.match(r"^1795")) & ~(df["Kostenstelle"].str.match(r"^3"))
- df["Kostenstelle"] = np.where(kto_1795, "31", df["Kostenstelle"])
- kto_2 = (df["Konto_Nr"].str.match(r"^2")) & (df["Kostenträger"].str.match(r"^70"))
- df["Kostenträger"] = np.where(kto_2, "00", df["Kostenträger"])
- kto_4560 = (df["Konto_Nr"].isin(["4560", "4570", "5004"])) & (df["Kostenträger"].str.match(r"^70"))
- df["Kostenträger"] = np.where(kto_4560, "00", df["Kostenträger"])
- kto_8520 = (df["Konto_Nr"].isin(["8520", "8900"])) & (df["Kostenstelle"].str.match(r"^51"))
- df["Kostenträger"] = np.where(kto_8520, "00", df["Kostenträger"])
- kto_8999 = (df["Konto_Nr"].isin(["8999"])) & (df["Kostenstelle"].str.match(r"^30"))
- df["Kostenträger"] = np.where(kto_8999, "00", df["Kostenträger"])
- kto_8860 = df["Konto_Nr"].isin(["8860"])
- df["Kostenträger"] = np.where(kto_8860, "00", df["Kostenträger"])
- kto_3000 = (df["Konto_Nr"].isin(["3000", "3001"])) & ~(df["Kostenstelle"].isin(["10", "11", "13"]))
- df["Kostenstelle"] = np.where(kto_3000, "11", df["Kostenstelle"])
- kto_3010 = df["Konto_Nr"].isin(["3010", "3011"])
- df["Kostenstelle"] = np.where(kto_3010, "14", df["Kostenstelle"])
- kto_3110 = (df["Konto_Nr"].isin(["3110"])) & (df["Kostenstelle"].str.match(r"^1"))
- df["Kostenstelle"] = np.where(kto_3110, "20", df["Kostenstelle"])
- kto_3300 = (df["Konto_Nr"].isin(["3300"])) & ~(df["Kostenstelle"].str.match(r"^3"))
- df["Kostenstelle"] = np.where(kto_3300, "31", df["Kostenstelle"])
- kto_0200 = (df["Konto_Nr"].isin(["0200"])) & (df["Kostenstelle"].isin(["90"]))
- df["Kostenstelle"] = np.where(kto_0200, "40", df["Kostenstelle"])
- kto_3510 = (df["Konto_Nr"].isin(["3510"])) & (df["Kostenstelle"].isin(["00"]))
- df["Kostenstelle"] = np.where(kto_3510, "51", df["Kostenstelle"])
- df["Kostenträger"] = np.where(kto_3510, "01", df["Kostenträger"])
- df["KRM"] = df["Marke"] + df["Standort"] + df["Kostenstelle"] + df["Absatzkanal"] + df["Kostenträger"]
- df["Konto_Nr_SKR51"] = (
- (df["Marke"] + "-" + df["Standort"] + "-" + df["Konto_Nr"])
- + "-"
- + (df["Kostenstelle"] + "-" + df["Absatzkanal"] + "-" + df["Kostenträger"])
- )
- df["IsNumeric"] = (
- (df["KRM"].str.isdigit())
- & (df["Konto_Nr"].str.isdigit())
- & (df["Konto_Nr"].str.len() == 4)
- # & (df["Konto_Nr_SKR51"].str.len() == 19)
- )
- df_invalid = df[df["IsNumeric"] == False]
- df_invalid.to_csv(export_invalid_filename, decimal=",", sep=";", encoding="latin-1", index=False)
- return df[df["IsNumeric"] == True][TRANSLATE]
- def extract_acct_info(df: pd.DataFrame) -> pd.DataFrame:
- acct_info = [
- "Marke",
- "Standort",
- "Konto_Nr",
- "Kostenstelle",
- "Absatzkanal",
- "Kostenträger",
- ]
- df["HasFiveDashes"] = df["Konto_Nr_SKR51"].str.count("-") == 5
- df["Invalid"] = "XX-XX-XXXX-XX-XX-XX"
- df["Konto_Nr_SKR51"] = np.where(
- df["HasFiveDashes"],
- df["Konto_Nr_SKR51"],
- df["Invalid"],
- )
- df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True)
- return df
|