import logging

import numpy as np
import pandas as pd

TRANSLATE = [
    "Konto_Nr_Händler",
    "Konto_Nr_SKR51",
    "Marke",
    "Standort",
    "Konto_Nr",
    "Kostenstelle",
    "Absatzkanal",
    "Kostenträger",
    "Kontoart",
    "Konto_1",
    "KRM",
    "IsNumeric",
]


def load_translation(
    account_translation: str, debug_file: str, export_invalid_filename: str
) -> tuple[dict[str, str], dict[str, str], pd.DataFrame, pd.DataFrame]:
    df_translate_import = pd.read_csv(
        account_translation,
        decimal=",",
        sep=";",
        encoding="latin-1",
        converters={i: str for i in range(0, 200)},
    ).reset_index()

    makes = get_makes_from_translation(df_translate_import)
    sites = get_sites_from_translation(df_translate_import)

    df_prepared = prepare_translation(df_translate_import)
    df_translate = special_translation(df_prepared, makes, sites, debug_file, export_invalid_filename)
    df_translate2 = df_translate.copy().drop(columns=["Konto_Nr_Händler"]).drop_duplicates().set_index("Konto_Nr_SKR51")

    df_translate3 = (
        df_translate[["Kontoart", "Konto_Nr_SKR51", "Konto_Nr_Händler"]]
        .copy()
        .sort_values(by=["Kontoart", "Konto_Nr_SKR51"])
    )
    df_translate3.to_csv(account_translation[:-4] + "_GCHR.csv", decimal=",", sep=";", encoding="latin-1", index=False)
    return (makes, sites, df_translate, df_translate2)


def get_makes_from_translation(df_translate_import: pd.DataFrame) -> dict[str, str]:
    df_makes = df_translate_import[["Marke", "Marke_HBV"]].copy().drop_duplicates()
    df_makes = df_makes[df_makes["Marke_HBV"] != "0000"]
    makes = dict([(e["Marke"], e["Marke_HBV"]) for e in df_makes.to_dict(orient="records")])
    makes["99"] = "0000"
    return makes


def get_sites_from_translation(df_translate_import: pd.DataFrame) -> dict[str, str]:
    df_sites = df_translate_import[["Marke", "Standort", "Standort_HBV"]].copy().drop_duplicates()
    df_sites["Standort_HBV"] = np.where(df_sites["Standort_HBV"].str.len() != 6, "0000", df_sites["Standort_HBV"])
    sites = dict([(e["Marke"] + "-" + e["Standort"], e["Standort_HBV"]) for e in df_sites.to_dict(orient="records")])
    return sites


def prepare_translation(df_translate_import: pd.DataFrame) -> pd.DataFrame:
    df = df_translate_import[
        [
            "Konto_Nr_Händler",
            "Konto_Nr_SKR51",
        ]
    ].drop_duplicates()
    logging.info(df.shape)

    row = {
        "Konto_Nr_Händler": "01-01-0861-00-00-00",
        "Konto_Nr_SKR51": "01-01-0861-00-00-00",
    }
    df = pd.concat([df, pd.DataFrame.from_records([row])])
    df.set_index("Konto_Nr_Händler")
    return df


def special_translation(
    df: pd.DataFrame, makes: dict[str, str], sites: dict[str, str], debug_file: str, export_invalid_filename: str
) -> pd.DataFrame:
    df["Konto_Nr_Händler"] = df["Konto_Nr_Händler"].str.upper()
    df["Konto_Nr_SKR51"] = df["Konto_Nr_SKR51"].str.upper()
    df = extract_acct_info(df)
    df["Konto_Nr"] = df["Konto_Nr"].str.upper()
    logging.info(df.shape)
    logging.info(df.columns)
    logging.info(df.head())

    logging.info("df: " + str(df.shape))
    df["Bilanz"] = df["Konto_Nr"].str.match(r"^[013]")
    df["Kontoart"] = np.where(df["Bilanz"], "1", "2")
    df["Kontoart"] = np.where(df["Konto_Nr"].str.contains("_STK"), "3", df["Kontoart"])
    df["Kontoart"] = np.where(df["Konto_Nr"].str.match(r"^[9]"), "3", df["Kontoart"])
    df["Konto_1"] = df["Konto_Nr"].str.slice(0, 1)

    # fehlende Marken- und Standortzuordnung
    df["Marke"] = np.where(df["Marke"].isin(makes.keys()), df["Marke"], "99")
    df["Marke_Standort"] = df["Marke"] + "-" + df["Standort"]
    df["Standort"] = np.where(df["Marke_Standort"].isin(sites.keys()), df["Standort"], "01")

    df_debug = df.drop(columns=["Bilanz"])
    logging.info(df_debug.groupby(["Kontoart"]).aggregate("sum"))
    logging.info(df_debug.groupby(["Kontoart", "Konto_1"]).aggregate("sum"))
    logging.info(df_debug.groupby(["Konto_Nr"]).aggregate("sum"))
    df_debug.groupby(["Konto_Nr"]).aggregate("sum").to_csv(debug_file, decimal=",", sep=";", encoding="latin-1")

    # Bereinigung GW-Kostenträger
    df["NW_Verkauf_1"] = (df["Konto_Nr"].str.match(r"^[78]0")) & (df["Kostenstelle"].str.match(r"^[^1]\d"))
    df["Kostenstelle"] = np.where(df["NW_Verkauf_1"] == True, "11", df["Kostenstelle"])

    df["Konto_7010"] = df["Konto_Nr"].str.match(r"^[78]01[01]")
    df["Kostenstelle"] = np.where(df["Konto_7010"] == True, "14", df["Kostenstelle"])

    df["GW_Verkauf_2"] = (df["Konto_Nr"].str.match(r"^[78]1")) & (df["Kostenstelle"].str.match(r"^[^2]\d"))
    df["Kostenstelle"] = np.where(df["GW_Verkauf_2"] == True, "21", df["Kostenstelle"])

    df["GW_Verkauf_3"] = (df["Konto_Nr"].str.match(r"^[78]3")) & (df["Kostenstelle"].str.match(r"^[^3]\d"))
    df["Kostenstelle"] = np.where(df["GW_Verkauf_3"] == True, "31", df["Kostenstelle"])

    df["GW_Verkauf_4"] = (df["Konto_Nr"].str.match(r"^[78]4")) & (df["Kostenstelle"].str.match(r"^[^4]\d"))
    df["Kostenstelle"] = np.where(df["GW_Verkauf_4"] == True, "41", df["Kostenstelle"])

    df["GW_Verkauf_x420"] = df["Konto_Nr"].str.match(r"^[78]420")
    df["Kostenstelle"] = np.where(df["GW_Verkauf_x420"] == True, "42", df["Kostenstelle"])

    df["GW_Verkauf_5"] = (df["Konto_Nr"].str.match(r"^[78]5")) & (df["Kostenstelle"].str.match(r"^[^5]\d"))
    df["Kostenstelle"] = np.where(df["GW_Verkauf_5"] == True, "51", df["Kostenstelle"])

    df["GW_Verkauf_50"] = (df["Konto_Nr"].str.match(r"^[78]")) & (df["Kostenstelle"].str.match(r"^2"))
    df["Kostenträger"] = np.where(df["GW_Verkauf_50"] == True, "52", df["Kostenträger"])
    df["Kostenträger"] = np.where(
        (df["GW_Verkauf_50"] == True) & (df["Marke"] == "01"),
        "50",
        df["Kostenträger"],
    )

    df["NW_Verkauf_00"] = (
        (df["Konto_Nr"].str.match(r"^[78]2"))
        & (df["Kostenstelle"].str.match(r"^1"))
        & (df["Kostenträger"].str.match(r"^[^01234]"))
    )
    df["Kostenträger"] = np.where(df["NW_Verkauf_00"] == True, "00", df["Kostenträger"])

    df["GW_Stk_50"] = (df["Konto_Nr"].str.match(r"^9130")) & (df["Kostenstelle"].str.match(r"^2"))
    df["Kostenträger"] = np.where(df["GW_Stk_50"] == True, "52", df["Kostenträger"])
    df["Kostenträger"] = np.where((df["GW_Stk_50"] == True) & (df["Marke"] == "01"), "50", df["Kostenträger"])

    df["Kostenträger"] = np.where(df["Bilanz"] == True, "00", df["Kostenträger"])

    df["Konto_5er"] = (df["Konto_Nr"].str.match("^5")) | (df["Konto_Nr"].str.match("^9143"))
    df["Absatzkanal"] = np.where(df["Konto_5er"] == True, "99", df["Absatzkanal"])

    df["Konto_5005"] = (df["Konto_Nr"].str.match("^5005")) & (df["Kostenstelle"].str.match(r"^[^12]"))
    df["Kostenstelle"] = np.where(df["Konto_5005"] == True, "20", df["Kostenstelle"])
    df["Kostenträger"] = np.where(df["Konto_5005"] == True, "50", df["Kostenträger"])

    df["Konto_5007"] = (df["Konto_Nr"].str.match("^5007")) & (df["Kostenstelle"].str.match(r"^([^4]|42)"))
    df["Kostenstelle"] = np.where(df["Konto_5007"] == True, "41", df["Kostenstelle"])
    df["Kostenträger"] = np.where(df["Konto_5007"] == True, "70", df["Kostenträger"])

    df["Konto_914er"] = (df["Konto_Nr"].str.match("^914[34]")) & (df["Kostenträger"].str.match(r"^[^7]"))
    df["Kostenträger"] = np.where(df["Konto_914er"] == True, "70", df["Kostenträger"])

    df["Teile_30_60"] = (
        (df["Konto_Nr"].str.match(r"^[578]"))
        & (df["Kostenstelle"].str.match(r"^[3]"))
        & (df["Kostenträger"].str.match(r"^[^6]"))
    )
    df["Kostenträger"] = np.where(df["Teile_30_60"] == True, "60", df["Kostenträger"])

    df["Service_40_70"] = (
        (df["Konto_Nr"].str.match(r"^[578]"))
        & (df["Kostenstelle"].str.match(r"^[4]"))
        & (df["Kostenträger"].str.match(r"^[^7]"))
    )
    df["Kostenträger"] = np.where(df["Service_40_70"] == True, "70", df["Kostenträger"])

    df["KRM"] = df["Marke"] + df["Standort"] + df["Kostenstelle"] + df["Absatzkanal"] + df["Kostenträger"]
    df["Konto_Nr_SKR51"] = (
        (df["Marke"] + "-" + df["Standort"] + "-" + df["Konto_Nr"])
        + "-"
        + (df["Kostenstelle"] + "-" + df["Absatzkanal"] + "-" + df["Kostenträger"])
    )
    df["IsNumeric"] = (
        (df["KRM"].str.isdigit())
        & (df["Konto_Nr"].str.isdigit())
        & (df["Konto_Nr"].str.len() == 4)
        # & (df["Konto_Nr_SKR51"].str.len() == 19)
    )
    df_invalid = df[df["IsNumeric"] == False]
    df_invalid.to_csv(export_invalid_filename, decimal=",", sep=";", encoding="latin-1", index=False)
    return df[df["IsNumeric"] == True][TRANSLATE]


def extract_acct_info(df: pd.DataFrame) -> pd.DataFrame:
    acct_info = [
        "Marke",
        "Standort",
        "Konto_Nr",
        "Kostenstelle",
        "Absatzkanal",
        "Kostenträger",
    ]
    df["HasFiveDashes"] = df["Konto_Nr_SKR51"].str.count("-") == 5
    df["Invalid"] = "XX-XX-XXXX-XX-XX-XX"
    df["Konto_Nr_SKR51"] = np.where(
        df["HasFiveDashes"],
        df["Konto_Nr_SKR51"],
        df["Invalid"],
    )
    df[acct_info] = df["Konto_Nr_SKR51"].str.split(pat="-", n=6, expand=True)
    return df