浏览代码

NASA-Workflow auf mehrere Dateien aufgeteilt

gc-server3 5 月之前
父节点
当前提交
f0119f44ce
共有 5 个文件被更改,包括 204 次插入186 次删除
  1. 5 0
      .vscode/settings.json
  2. 50 0
      nasa_archive.py
  3. 84 0
      nasa_export.py
  4. 7 186
      nasa_upload.py
  5. 58 0
      nasa_workflow.py

+ 5 - 0
.vscode/settings.json

@@ -2,9 +2,14 @@
     "[python]": {
         "editor.defaultFormatter": "ms-python.black-formatter",
         "editor.formatOnSave": true,
+        "editor.codeActionsOnSave": {
+            "source.organizeImports": "explicit",
+        },
     },
     "python.testing.pytestEnabled": false,
     "python.testing.unittestEnabled": true,
+    "isort.args":["--profile", "black"],
+
     "black-formatter.args": ["--line-length", "120"],
     "files.associations": {
         "*.mac": "vbs"

+ 50 - 0
nasa_archive.py

@@ -0,0 +1,50 @@
+import hashlib
+import logging
+import os
+import re
+import shutil
+from datetime import datetime, timedelta
+from pathlib import Path
+
+logger = logging.getLogger("nasa")
+
+
+def file_get_hash(filename: str) -> str:
+    with open(filename, "r") as frh:
+        data = frh.read()
+        return calculate_sha256(data)
+
+
+def calculate_sha256(data: str) -> str:
+    return hashlib.sha256(data.encode()).hexdigest()
+
+
+def archive_files(export_dir: str):
+    last_week = (datetime.now() - timedelta(days=6)).timestamp()
+    for file in Path(export_dir).glob("*.json"):
+        if file.stat().st_ctime < last_week:
+            file.unlink()
+
+    archive_path = Path(export_dir + "/Archiv")
+    for file in Path(export_dir + "/temp").glob("*.json"):
+        p = re.search(r"NASA_\d{5}_(20\d{4})_", file.name)
+        if not p:
+            continue
+        period = p[1]
+        year = period[:4]
+        dest_folder = archive_path / year / period
+        os.makedirs(dest_folder, exist_ok=True)
+        file_hash = file_get_hash(file)
+
+        if has_identical_file(dest_folder, file_hash):
+            file.unlink()
+            continue
+        shutil.copy(file, archive_path.parent / file.name)
+        file.rename(dest_folder / file.name)
+
+
+def has_identical_file(target: Path, file_hash: str) -> bool:
+    for archived_file in Path(target).glob("*.json"):
+        if file_get_hash(archived_file) == file_hash:
+            return True
+    return False

+ 84 - 0
nasa_export.py

@@ -0,0 +1,84 @@
+import pandas as pd
+from sqlalchemy import create_engine
+
+
+def conn_string(dsn: dict[str, str]):
+    return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
+
+
+def load_data(config: dict[str, str], source: str, period: str):
+    year = period[:4]
+    month = period[4:6]
+
+    select_befehl_auftraege = f"SELECT * FROM [Auftraege_NASA_gruppiert] WHERE Periode = '{period}'"
+    select_befehl_mitarbeiter = f"SELECT * FROM [Mitarbeiter_NASA] WHERE Periode = '{period}'"
+
+    source_auftraege = "data/Auftraege_NASA_gruppiert.csv"
+    source_mitarbeiter = "data/Mitarbeiter_NASA.csv"
+
+    payload = {
+        "HaendlerNr": config["client_id"],
+        "Filiale": config["client_id"],
+        "Jahr": year,
+        "Monat": month,
+        "Fabrikat": "Mazda",
+        "AnzahlMitarbeiter": 0,
+        "AnzahlProduktiv": 0.0,
+        "WerkstattDurchlaeufe": 0,
+        "Token": config["credentials"]["token"],
+    }
+
+    if source == "database":
+        source_db = create_engine(conn_string(config["source_dsn"]))
+        df = pd.read_sql(select_befehl_auftraege, con=source_db)
+        rename_from = ["AuftragsArt", "AuftragsTyp"]
+        rename_to = ["AuftragsArtId_Name", "AuftragsArt"]
+        df = df.rename(columns=dict(zip(rename_from, rename_to)))
+
+    else:
+        df = pd.read_csv(source_auftraege, sep=";", encoding="latin-1", decimal=",")
+        df = df[df["Periode"] == period]
+
+    # AuftragsArt = ["Inspektion", "Karosseriearbeit", "Lackierung", "Verschleißteile", "Sonstiges"]
+    # AuftragsArtId = {"1": "Extern", "2": "Garantie", "3": "Intern", "4": "Theke"]
+
+    columns = [
+        "AuftragsArt",
+        "AuftragsArtId",
+        "TeileUmsatz",
+        "LohnUmsatz",
+        "SonstigeUmsatz",
+        "GesamtUmsatz",
+        "AnzahlAuftraege",
+    ]
+
+    df = df[columns]
+
+    df.to_csv(
+        f"{config['export_dir']}/csv/{period}_auftraege.csv",
+        sep=";",
+        encoding="latin-1",
+        decimal=",",
+        index=False,
+    )
+
+    payload["WerkstattDurchlaeufe"] = int(df["AnzahlAuftraege"].sum())
+    payload["AfterSalesPositionen"] = df.to_dict("records")
+
+    # Mitarbeiter gesamt und produktiv
+    if source == "database":
+        df = pd.read_sql(select_befehl_mitarbeiter, con=source_db)
+    else:
+        df = pd.read_csv(source_mitarbeiter, sep=";", encoding="latin-1", decimal=",")
+
+    df.to_csv(
+        f"{config['export_dir']}/csv/{period}_mitarbeiter.csv",
+        sep=";",
+        encoding="latin-1",
+        decimal=",",
+        index=False,
+    )
+
+    payload["AnzahlMitarbeiter"] = df.shape[0]
+    payload["AnzahlProduktiv"] = int(df["produktiv"].sum())
+    return payload

+ 7 - 186
nasa_upload.py

@@ -1,116 +1,10 @@
-import hashlib
-import logging
-import os
-import re
-import shutil
-import pandas as pd
 import json
+import logging
 from pathlib import Path
-from datetime import datetime, timedelta
-from sqlalchemy import create_engine
-from suds.client import Client
-from cryptography.fernet import Fernet
-
-
-logging.basicConfig(filename="logs/nasa.log", level=logging.DEBUG)
-logger = logging.getLogger()
-
-
-def get_config():
-    fernet_key = b"YBckeKYt-8g7LFvpG7XqAAcEbsYESnI-yl8by9rjeQQ="
-    fernet = Fernet(fernet_key)
-
-    if Path("config/nasa_config.json").exists():
-        with open("config/nasa_config.json", "r") as f:
-            config = json.load(f)
-        with open("config/nasa_config.crypt", "wb") as f:
-            f.write(fernet.encrypt(json.dumps(config).encode()))
-    else:
-        with open("config/nasa_config.crypt", "rb") as f:
-            config = json.loads(fernet.decrypt(f.read()).decode())
-    return config
-
-
-def conn_string(dsn: dict[str, str]):
-    return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
-
-
-def load_data(config: dict[str, str], source: str, period: str):
-    year = period[:4]
-    month = period[4:6]
-
-    select_befehl_auftraege = f"SELECT * FROM [Auftraege_NASA_gruppiert] WHERE Periode = '{period}'"
-    select_befehl_mitarbeiter = f"SELECT * FROM [Mitarbeiter_NASA] WHERE Periode = '{period}'"
-
-    source_auftraege = "data/Auftraege_NASA_gruppiert.csv"
-    source_mitarbeiter = "data/Mitarbeiter_NASA.csv"
-
-    payload = {
-        "HaendlerNr": config["client_id"],
-        "Filiale": config["client_id"],
-        "Jahr": year,
-        "Monat": month,
-        "Fabrikat": "Mazda",
-        "AnzahlMitarbeiter": 0,
-        "AnzahlProduktiv": 0.0,
-        "WerkstattDurchlaeufe": 0,
-        "Token": config["credentials"]["token"],
-    }
-
-    if source == "database":
-        source_db = create_engine(conn_string(config["source_dsn"]))
-        df = pd.read_sql(select_befehl_auftraege, con=source_db)
-        rename_from = ["AuftragsArt", "AuftragsTyp"]
-        rename_to = ["AuftragsArtId_Name", "AuftragsArt"]
-        df = df.rename(columns=dict(zip(rename_from, rename_to)))
-
-    else:
-        df = pd.read_csv(source_auftraege, sep=";", encoding="latin-1", decimal=",")
-        df = df[df["Periode"] == period]
-
-    # AuftragsArt = ["Inspektion", "Karosseriearbeit", "Lackierung", "Verschleißteile", "Sonstiges"]
-    # AuftragsArtId = {"1": "Extern", "2": "Garantie", "3": "Intern", "4": "Theke"]
-
-    columns = [
-        "AuftragsArt",
-        "AuftragsArtId",
-        "TeileUmsatz",
-        "LohnUmsatz",
-        "SonstigeUmsatz",
-        "GesamtUmsatz",
-        "AnzahlAuftraege",
-    ]
-
-    df = df[columns]
-
-    df.to_csv(
-        f"{config['export_dir']}/csv/{period}_auftraege.csv",
-        sep=";",
-        encoding="latin-1",
-        decimal=",",
-        index=False,
-    )
 
-    payload["WerkstattDurchlaeufe"] = int(df["AnzahlAuftraege"].sum())
-    payload["AfterSalesPositionen"] = df.to_dict("records")
-
-    # Mitarbeiter gesamt und produktiv
-    if source == "database":
-        df = pd.read_sql(select_befehl_mitarbeiter, con=source_db)
-    else:
-        df = pd.read_csv(source_mitarbeiter, sep=";", encoding="latin-1", decimal=",")
-
-    df.to_csv(
-        f"{config['export_dir']}/csv/{period}_mitarbeiter.csv",
-        sep=";",
-        encoding="latin-1",
-        decimal=",",
-        index=False,
-    )
+from suds.client import Client
 
-    payload["AnzahlMitarbeiter"] = df.shape[0]
-    payload["AnzahlProduktiv"] = int(df["produktiv"].sum())
-    return payload
+logger = logging.getLogger("nasa")
 
 
 def submit_data(config: dict[str, str], payload):
@@ -123,7 +17,7 @@ def submit_data(config: dict[str, str], payload):
     try:
         return client.service.MeldeAfterSalesDaten(payload)
     except Exception as e:
-        print(e)
+        logger.Error(e)
     return -1
 
 
@@ -131,6 +25,7 @@ def print_result(period: str, result: str, len_pos: int):
     print("Periode: " + period)
     if len_pos == result:
         print(f"Erfolgreich {result} Datensätze übertragen")
+        logger.Info(f"Erfolgreich {result} Datensätze übertragen")
         return
 
     print("Übertragung der Datensätze Fehlgeschlagen.")
@@ -138,88 +33,14 @@ def print_result(period: str, result: str, len_pos: int):
         print("Fehler! Es waren keine Datensätze vorhanden.")
     else:
         print(f"{len_pos - result} von {len_pos} Datensätzen nicht verarbeitet!")
-
-
-def workflow(config: dict[str, str], year, month):
-    period = f"{year}{month}"
-    payload = load_data(config, "csv", period)
-    result = submit_data(config, payload)
-    len_pos = len(payload["AfterSalesPositionen"])
-    print_result(period, result, len_pos)
-
-
-def export_all_periods(config) -> None:
-    dt = datetime.now()
-    prev = str(dt.year - 1)
-    periods = [f"{prev}{x:02}" for x in range(1, 13)] + [f"{dt.year}{x:02}" for x in range(1, dt.month)]
-
-    for period in periods:
-        payload = load_data(config, "database", period)
-        json.dump(
-            payload,
-            open(f"export/NASA/temp/NASA_{config['client_id']}_{period}_{config['timestamp']}.json", "w"),
-            indent=2,
-        )
-
-
-def file_get_hash(filename: str) -> str:
-    with open(filename, "r") as frh:
-        data = frh.read()
-        return calculate_sha256(data)
-
-
-def calculate_sha256(data: str) -> str:
-    return hashlib.sha256(data.encode()).hexdigest()
-
-
-def archive_files(export_dir: str):
-    last_week = (datetime.now() - timedelta(days=6)).timestamp()
-    for file in Path(export_dir).glob("*.json"):
-        if file.stat().st_ctime < last_week:
-            file.unlink()
-
-    archive_path = Path(export_dir + "/Archiv")
-    for file in Path(export_dir + "/temp").glob("*.json"):
-        p = re.search(r"NASA_\d{5}_(20\d{4})_", file.name)
-        if not p:
-            continue
-        period = p[1]
-        year = period[:4]
-        dest_folder = archive_path / year / period
-        os.makedirs(dest_folder, exist_ok=True)
-        file_hash = file_get_hash(file)
-
-        if has_identical_file(dest_folder, file_hash):
-            file.unlink()
-            continue
-        shutil.copy(file, archive_path.parent / file.name)
-        file.rename(dest_folder / file.name)
-
-
-def has_identical_file(target: Path, file_hash: str) -> bool:
-    for archived_file in Path(target).glob("*.json"):
-        if file_get_hash(archived_file) == file_hash:
-            return True
-    return False
+        logger.Error(f"{len_pos - result} von {len_pos} Datensätzen nicht verarbeitet!")
 
 
 def submit_changes(config):
     for file in Path(config["export_dir"] + "/temp").glob("NASA_*.json"):
         payload = json.load(file.open("r"))
         period = payload["Jahr"] + payload["Monat"]
+        logger.Info("Periode: " + period)
         len_pos = len(payload["AfterSalesPositionen"])
         result = submit_data(config, payload)
         print_result(period, result, len_pos)
-
-
-def main():
-    config = get_config()
-    config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
-    config["export_dir"] = str(Path(".").resolve() / "export" / "NASA")
-    export_all_periods(config)
-    archive_files(config["export_dir"])
-    submit_changes(config)
-
-
-if __name__ == "__main__":
-    main()

+ 58 - 0
nasa_workflow.py

@@ -0,0 +1,58 @@
+import json
+import logging
+from datetime import datetime
+from pathlib import Path
+
+from cryptography.fernet import Fernet
+
+from nasa_export import load_data
+from nasa_upload import archive_files, submit_changes
+
+logging.basicConfig(filename="logs/nasa.log", level=logging.INFO)
+logger = logging.getLogger("nasa")
+
+
+def get_config():
+    fernet_key = b"YBckeKYt-8g7LFvpG7XqAAcEbsYESnI-yl8by9rjeQQ="
+    fernet = Fernet(fernet_key)
+
+    if Path("config/nasa_config.json").exists():
+        with open("config/nasa_config.json", "r") as f:
+            config = json.load(f)
+        with open("config/nasa_config.crypt", "wb") as f:
+            f.write(fernet.encrypt(json.dumps(config).encode()))
+    else:
+        with open("config/nasa_config.crypt", "rb") as f:
+            config = json.loads(fernet.decrypt(f.read()).decode())
+        if Path("config/nasa_config.bak").exists():
+            Path("config/nasa_config.bak").unlink()
+            with open("config/nasa_config.json", "w") as f:
+                json.dump(config, f, indent=2)
+    return config
+
+
+def export_all_periods(config) -> None:
+    dt = datetime.now()
+    prev = str(dt.year - 1)
+    periods = [f"{prev}{x:02}" for x in range(1, 13)] + [f"{dt.year}{x:02}" for x in range(1, dt.month)]
+
+    for period in periods:
+        payload = load_data(config, "database", period)
+        json.dump(
+            payload,
+            open(f"export/NASA/temp/NASA_{config['client_id']}_{period}_{config['timestamp']}.json", "w"),
+            indent=2,
+        )
+
+
+def main():
+    config = get_config()
+    config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
+    config["export_dir"] = str(Path.cwd() / "export" / "NASA")
+    export_all_periods(config)
+    archive_files(config["export_dir"])
+    submit_changes(config)
+
+
+if __name__ == "__main__":
+    main()