import hashlib import logging import os import re import shutil import pandas as pd import json from pathlib import Path from datetime import datetime, timedelta from sqlalchemy import create_engine from suds.client import Client from cryptography.fernet import Fernet logging.basicConfig(filename="logs/nasa.log", level=logging.DEBUG) logger = logging.getLogger() def get_config(): fernet_key = b"YBckeKYt-8g7LFvpG7XqAAcEbsYESnI-yl8by9rjeQQ=" fernet = Fernet(fernet_key) if Path("config/nasa_config.json").exists(): with open("config/nasa_config.json", "r") as f: config = json.load(f) with open("config/nasa_config.crypt", "wb") as f: f.write(fernet.encrypt(json.dumps(config).encode())) else: with open("config/nasa_config.crypt", "rb") as f: config = json.loads(fernet.decrypt(f.read()).decode()) return config def conn_string(dsn: dict[str, str]): return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0" def load_data(config: dict[str, str], source: str, period: str): year = period[:4] month = period[4:6] select_befehl_auftraege = f"SELECT * FROM [Auftraege_NASA_gruppiert] WHERE Periode = '{period}'" select_befehl_mitarbeiter = f"SELECT * FROM [Mitarbeiter_NASA] WHERE Periode = '{period}'" source_auftraege = "data/Auftraege_NASA_gruppiert.csv" source_mitarbeiter = "data/Mitarbeiter_NASA.csv" payload = { "HaendlerNr": config["client_id"], "Filiale": config["client_id"], "Jahr": year, "Monat": month, "Fabrikat": "Mazda", "AnzahlMitarbeiter": 0, "AnzahlProduktiv": 0.0, "WerkstattDurchlaeufe": 0, "Token": config["credentials"]["token"], } if source == "database": source_db = create_engine(conn_string(config["source_dsn"])) df = pd.read_sql(select_befehl_auftraege, con=source_db) rename_from = ["AuftragsArt", "AuftragsTyp"] rename_to = ["AuftragsArtId_Name", "AuftragsArt"] df = df.rename(columns=dict(zip(rename_from, rename_to))) else: df = pd.read_csv(source_auftraege, sep=";", encoding="latin-1", decimal=",") df = df[df["Periode"] == period] # AuftragsArt = ["Inspektion", "Karosseriearbeit", "Lackierung", "Verschleißteile", "Sonstiges"] # AuftragsArtId = {"1": "Extern", "2": "Garantie", "3": "Intern", "4": "Theke"] columns = [ "AuftragsArt", "AuftragsArtId", "TeileUmsatz", "LohnUmsatz", "SonstigeUmsatz", "GesamtUmsatz", "AnzahlAuftraege", ] df = df[columns] df.to_csv( f"{config['export_dir']}/csv/{period}_auftraege.csv", sep=";", encoding="latin-1", decimal=",", index=False, ) payload["WerkstattDurchlaeufe"] = int(df["AnzahlAuftraege"].sum()) payload["AfterSalesPositionen"] = df.to_dict("records") # Mitarbeiter gesamt und produktiv if source == "database": df = pd.read_sql(select_befehl_mitarbeiter, con=source_db) else: df = pd.read_csv(source_mitarbeiter, sep=";", encoding="latin-1", decimal=",") df.to_csv( f"{config['export_dir']}/csv/{period}_mitarbeiter.csv", sep=";", encoding="latin-1", decimal=",", index=False, ) payload["AnzahlMitarbeiter"] = df.shape[0] payload["AnzahlProduktiv"] = int(df["produktiv"].sum()) return payload def submit_data(config: dict[str, str], payload): client = Client( url=config["service_url"], username=config["credentials"]["username"], password=config["credentials"]["password"], ) try: return client.service.MeldeAfterSalesDaten(payload) except Exception as e: print(e) return -1 def print_result(period: str, result: str, len_pos: int): print("Periode: " + period) if len_pos == result: print(f"Erfolgreich {result} Datensätze übertragen") return print("Übertragung der Datensätze Fehlgeschlagen.") if result == -1: print("Fehler! Es waren keine Datensätze vorhanden.") else: print(f"{len_pos - result} von {len_pos} Datensätzen nicht verarbeitet!") def workflow(config: dict[str, str], year, month): period = f"{year}{month}" payload = load_data(config, "csv", period) result = submit_data(config, payload) len_pos = len(payload["AfterSalesPositionen"]) print_result(period, result, len_pos) def export_all_periods(config) -> None: dt = datetime.now() prev = str(dt.year - 1) periods = [f"{prev}{x:02}" for x in range(1, 13)] + [f"{dt.year}{x:02}" for x in range(1, dt.month)] for period in periods: payload = load_data(config, "database", period) json.dump( payload, open(f"export/NASA/temp/NASA_{config['client_id']}_{period}_{config['timestamp']}.json", "w"), indent=2, ) def file_get_hash(filename: str) -> str: with open(filename, "r") as frh: data = frh.read() return calculate_sha256(data) def calculate_sha256(data: str) -> str: return hashlib.sha256(data.encode()).hexdigest() def archive_files(export_dir: str): last_week = (datetime.now() - timedelta(days=6)).timestamp() for file in Path(export_dir).glob("*.json"): if file.stat().st_ctime < last_week: file.unlink() archive_path = Path(export_dir + "/Archiv") for file in Path(export_dir + "/temp").glob("*.json"): p = re.search(r"NASA_\d{5}_(20\d{4})_", file.name) if not p: continue period = p[1] year = period[:4] dest_folder = archive_path / year / period os.makedirs(dest_folder, exist_ok=True) file_hash = file_get_hash(file) if has_identical_file(dest_folder, file_hash): file.unlink() continue shutil.copy(file, archive_path.parent / file.name) file.rename(dest_folder / file.name) def has_identical_file(target: Path, file_hash: str) -> bool: for archived_file in Path(target).glob("*.json"): if file_get_hash(archived_file) == file_hash: return True return False def submit_changes(config): for file in Path(config["export_dir"] + "/temp").glob("NASA_*.json"): payload = json.load(file.open("r")) period = payload["Jahr"] + payload["Monat"] len_pos = len(payload["AfterSalesPositionen"]) result = submit_data(config, payload) print_result(period, result, len_pos) def main(): config = get_config() config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") config["export_dir"] = str(Path(".").resolve() / "export" / "NASA") export_all_periods(config) archive_files(config["export_dir"]) submit_changes(config) if __name__ == "__main__": main()