123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- import hashlib
- import logging
- import os
- import re
- import shutil
- import pandas as pd
- import json
- from pathlib import Path
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine
- from suds.client import Client
- from cryptography.fernet import Fernet
- logging.basicConfig(filename="logs/nasa.log", level=logging.DEBUG)
- logger = logging.getLogger()
- def get_config():
- fernet_key = b"YBckeKYt-8g7LFvpG7XqAAcEbsYESnI-yl8by9rjeQQ="
- fernet = Fernet(fernet_key)
- if Path("config/nasa_config.json").exists():
- with open("config/nasa_config.json", "r") as f:
- config = json.load(f)
- with open("config/nasa_config.crypt", "wb") as f:
- f.write(fernet.encrypt(json.dumps(config).encode()))
- else:
- with open("config/nasa_config.crypt", "rb") as f:
- config = json.loads(fernet.decrypt(f.read()).decode())
- return config
- def conn_string(dsn: dict[str, str]):
- return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
- def load_data(config: dict[str, str], source: str, period: str):
- year = period[:4]
- month = period[4:6]
- select_befehl_auftraege = f"SELECT * FROM [Auftraege_NASA_gruppiert] WHERE Periode = '{period}'"
- select_befehl_mitarbeiter = f"SELECT * FROM [Mitarbeiter_NASA] WHERE Periode = '{period}'"
- source_auftraege = "data/Auftraege_NASA_gruppiert.csv"
- source_mitarbeiter = "data/Mitarbeiter_NASA.csv"
- payload = {
- "HaendlerNr": config["client_id"],
- "Filiale": config["client_id"],
- "Jahr": year,
- "Monat": month,
- "Fabrikat": "Mazda",
- "AnzahlMitarbeiter": 0,
- "AnzahlProduktiv": 0.0,
- "WerkstattDurchlaeufe": 0,
- "Token": config["credentials"]["token"],
- }
- if source == "database":
- source_db = create_engine(conn_string(config["source_dsn"]))
- df = pd.read_sql(select_befehl_auftraege, con=source_db)
- rename_from = ["AuftragsArt", "AuftragsTyp"]
- rename_to = ["AuftragsArtId_Name", "AuftragsArt"]
- df = df.rename(columns=dict(zip(rename_from, rename_to)))
- else:
- df = pd.read_csv(source_auftraege, sep=";", encoding="latin-1", decimal=",")
- df = df[df["Periode"] == period]
- # AuftragsArt = ["Inspektion", "Karosseriearbeit", "Lackierung", "Verschleißteile", "Sonstiges"]
- # AuftragsArtId = {"1": "Extern", "2": "Garantie", "3": "Intern", "4": "Theke"]
- columns = [
- "AuftragsArt",
- "AuftragsArtId",
- "TeileUmsatz",
- "LohnUmsatz",
- "SonstigeUmsatz",
- "GesamtUmsatz",
- "AnzahlAuftraege",
- ]
- df = df[columns]
- df.to_csv(
- f"{config['export_dir']}/csv/{period}_auftraege.csv",
- sep=";",
- encoding="latin-1",
- decimal=",",
- index=False,
- )
- payload["WerkstattDurchlaeufe"] = int(df["AnzahlAuftraege"].sum())
- payload["AfterSalesPositionen"] = df.to_dict("records")
- # Mitarbeiter gesamt und produktiv
- if source == "database":
- df = pd.read_sql(select_befehl_mitarbeiter, con=source_db)
- else:
- df = pd.read_csv(source_mitarbeiter, sep=";", encoding="latin-1", decimal=",")
- df.to_csv(
- f"{config['export_dir']}/csv/{period}_mitarbeiter.csv",
- sep=";",
- encoding="latin-1",
- decimal=",",
- index=False,
- )
- payload["AnzahlMitarbeiter"] = df.shape[0]
- payload["AnzahlProduktiv"] = int(df["produktiv"].sum())
- return payload
- def submit_data(config: dict[str, str], payload):
- client = Client(
- url=config["service_url"],
- username=config["credentials"]["username"],
- password=config["credentials"]["password"],
- )
- try:
- return client.service.MeldeAfterSalesDaten(payload)
- except Exception as e:
- print(e)
- return -1
- def print_result(period: str, result: str, len_pos: int):
- print("Periode: " + period)
- if len_pos == result:
- print(f"Erfolgreich {result} Datensätze übertragen")
- return
- print("Übertragung der Datensätze Fehlgeschlagen.")
- if result == -1:
- print("Fehler! Es waren keine Datensätze vorhanden.")
- else:
- print(f"{len_pos - result} von {len_pos} Datensätzen nicht verarbeitet!")
- def workflow(config: dict[str, str], year, month):
- period = f"{year}{month}"
- payload = load_data(config, "csv", period)
- result = submit_data(config, payload)
- len_pos = len(payload["AfterSalesPositionen"])
- print_result(period, result, len_pos)
- def export_all_periods(config) -> None:
- dt = datetime.now()
- prev = str(dt.year - 1)
- periods = [f"{prev}{x:02}" for x in range(1, 13)] + [f"{dt.year}{x:02}" for x in range(1, dt.month)]
- for period in periods:
- payload = load_data(config, "database", period)
- json.dump(
- payload,
- open(f"export/NASA/temp/NASA_{config['client_id']}_{period}_{config['timestamp']}.json", "w"),
- indent=2,
- )
- def file_get_hash(filename: str) -> str:
- with open(filename, "r") as frh:
- data = frh.read()
- return calculate_sha256(data)
- def calculate_sha256(data: str) -> str:
- return hashlib.sha256(data.encode()).hexdigest()
- def archive_files(export_dir: str):
- last_week = (datetime.now() - timedelta(days=6)).timestamp()
- for file in Path(export_dir).glob("*.json"):
- if file.stat().st_ctime < last_week:
- file.unlink()
- archive_path = Path(export_dir + "/Archiv")
- for file in Path(export_dir + "/temp").glob("*.json"):
- p = re.search(r"NASA_\d{5}_(20\d{4})_", file.name)
- if not p:
- continue
- period = p[1]
- year = period[:4]
- dest_folder = archive_path / year / period
- os.makedirs(dest_folder, exist_ok=True)
- file_hash = file_get_hash(file)
- if has_identical_file(dest_folder, file_hash):
- file.unlink()
- continue
- shutil.copy(file, archive_path.parent / file.name)
- file.rename(dest_folder / file.name)
- def has_identical_file(target: Path, file_hash: str) -> bool:
- for archived_file in Path(target).glob("*.json"):
- if file_get_hash(archived_file) == file_hash:
- return True
- return False
- def submit_changes(config):
- for file in Path(config["export_dir"] + "/temp").glob("NASA_*.json"):
- payload = json.load(file.open("r"))
- period = payload["Jahr"] + payload["Monat"]
- len_pos = len(payload["AfterSalesPositionen"])
- result = submit_data(config, payload)
- print_result(period, result, len_pos)
- def main():
- config = get_config()
- config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
- config["export_dir"] = str(Path(".").resolve() / "export" / "NASA")
- export_all_periods(config)
- archive_files(config["export_dir"])
- submit_changes(config)
- if __name__ == "__main__":
- main()
|