nasa_upload.py 7.0 KB


  1. import hashlib
  2. import logging
  3. import os
  4. import re
  5. import shutil
  6. import pandas as pd
  7. import json
  8. from pathlib import Path
  9. from datetime import datetime, timedelta
  10. from sqlalchemy import create_engine
  11. from suds.client import Client
  12. from cryptography.fernet import Fernet
  13. logging.basicConfig(filename="logs/nasa.log", level=logging.DEBUG)
  14. logger = logging.getLogger()
  15. def get_config():
  16. fernet_key = b"YBckeKYt-8g7LFvpG7XqAAcEbsYESnI-yl8by9rjeQQ="
  17. fernet = Fernet(fernet_key)
  18. if Path("config/nasa_config.json").exists():
  19. with open("config/nasa_config.json", "r") as f:
  20. config = json.load(f)
  21. with open("config/nasa_config.crypt", "wb") as f:
  22. f.write(fernet.encrypt(json.dumps(config).encode()))
  23. else:
  24. with open("config/nasa_config.crypt", "rb") as f:
  25. config = json.loads(fernet.decrypt(f.read()).decode())
  26. return config
  27. def conn_string(dsn: dict[str, str]):
  28. return f"mssql+pyodbc://{dsn['user']}:{dsn['pass']}@{dsn['server']}/{dsn['database']}?driver=SQL+Server+Native+Client+11.0"
  29. def load_data(config: dict[str, str], source: str, period: str):
  30. year = period[:4]
  31. month = period[4:6]
  32. select_befehl_auftraege = f"SELECT * FROM [Auftraege_NASA_gruppiert] WHERE Periode = '{period}'"
  33. select_befehl_mitarbeiter = f"SELECT * FROM [Mitarbeiter_NASA] WHERE Periode = '{period}'"
  34. source_auftraege = "data/Auftraege_NASA_gruppiert.csv"
  35. source_mitarbeiter = "data/Mitarbeiter_NASA.csv"
  36. payload = {
  37. "HaendlerNr": config["client_id"],
  38. "Filiale": config["client_id"],
  39. "Jahr": year,
  40. "Monat": month,
  41. "Fabrikat": "Mazda",
  42. "AnzahlMitarbeiter": 0,
  43. "AnzahlProduktiv": 0.0,
  44. "WerkstattDurchlaeufe": 0,
  45. "Token": config["credentials"]["token"],
  46. }
  47. if source == "database":
  48. source_db = create_engine(conn_string(config["source_dsn"]))
  49. df = pd.read_sql(select_befehl_auftraege, con=source_db)
  50. rename_from = ["AuftragsArt", "AuftragsTyp"]
  51. rename_to = ["AuftragsArtId_Name", "AuftragsArt"]
  52. df = df.rename(columns=dict(zip(rename_from, rename_to)))
  53. else:
  54. df = pd.read_csv(source_auftraege, sep=";", encoding="latin-1", decimal=",")
  55. df = df[df["Periode"] == period]
  56. # AuftragsArt = ["Inspektion", "Karosseriearbeit", "Lackierung", "Verschleißteile", "Sonstiges"]
  57. # AuftragsArtId = {"1": "Extern", "2": "Garantie", "3": "Intern", "4": "Theke"]
  58. columns = [
  59. "AuftragsArt",
  60. "AuftragsArtId",
  61. "TeileUmsatz",
  62. "LohnUmsatz",
  63. "SonstigeUmsatz",
  64. "GesamtUmsatz",
  65. "AnzahlAuftraege",
  66. ]
  67. df = df[columns]
  68. df.to_csv(
  69. f"{config['export_dir']}/csv/{period}_auftraege.csv",
  70. sep=";",
  71. encoding="latin-1",
  72. decimal=",",
  73. index=False,
  74. )
  75. payload["WerkstattDurchlaeufe"] = int(df["AnzahlAuftraege"].sum())
  76. payload["AfterSalesPositionen"] = df.to_dict("records")
  77. # Mitarbeiter gesamt und produktiv
  78. if source == "database":
  79. df = pd.read_sql(select_befehl_mitarbeiter, con=source_db)
  80. else:
  81. df = pd.read_csv(source_mitarbeiter, sep=";", encoding="latin-1", decimal=",")
  82. df.to_csv(
  83. f"{config['export_dir']}/csv/{period}_mitarbeiter.csv",
  84. sep=";",
  85. encoding="latin-1",
  86. decimal=",",
  87. index=False,
  88. )
  89. payload["AnzahlMitarbeiter"] = df.shape[0]
  90. payload["AnzahlProduktiv"] = int(df["produktiv"].sum())
  91. return payload
  92. def submit_data(config: dict[str, str], payload):
  93. client = Client(
  94. url=config["service_url"],
  95. username=config["credentials"]["username"],
  96. password=config["credentials"]["password"],
  97. )
  98. try:
  99. return client.service.MeldeAfterSalesDaten(payload)
  100. except Exception as e:
  101. print(e)
  102. return -1
  103. def print_result(period: str, result: str, len_pos: int):
  104. print("Periode: " + period)
  105. if len_pos == result:
  106. print(f"Erfolgreich {result} Datensätze übertragen")
  107. return
  108. print("Übertragung der Datensätze Fehlgeschlagen.")
  109. if result == -1:
  110. print("Fehler! Es waren keine Datensätze vorhanden.")
  111. else:
  112. print(f"{len_pos - result} von {len_pos} Datensätzen nicht verarbeitet!")
  113. def workflow(config: dict[str, str], year, month):
  114. period = f"{year}{month}"
  115. payload = load_data(config, "csv", period)
  116. result = submit_data(config, payload)
  117. len_pos = len(payload["AfterSalesPositionen"])
  118. print_result(period, result, len_pos)
  119. def export_all_periods(config) -> None:
  120. dt = datetime.now()
  121. prev = str(dt.year - 1)
  122. periods = [f"{prev}{x:02}" for x in range(1, 13)] + [f"{dt.year}{x:02}" for x in range(1, dt.month)]
  123. for period in periods:
  124. payload = load_data(config, "database", period)
  125. json.dump(
  126. payload,
  127. open(f"export/NASA/temp/NASA_{config['client_id']}_{period}_{config['timestamp']}.json", "w"),
  128. indent=2,
  129. )
  130. def file_get_hash(filename: str) -> str:
  131. with open(filename, "r") as frh:
  132. data = frh.read()
  133. return calculate_sha256(data)
  134. def calculate_sha256(data: str) -> str:
  135. return hashlib.sha256(data.encode()).hexdigest()
  136. def archive_files(export_dir: str):
  137. last_week = (datetime.now() - timedelta(days=6)).timestamp()
  138. for file in Path(export_dir).glob("*.json"):
  139. if file.stat().st_ctime < last_week:
  140. file.unlink()
  141. archive_path = Path(export_dir + "/Archiv")
  142. for file in Path(export_dir + "/temp").glob("*.json"):
  143. p = re.search(r"NASA_\d{5}_(20\d{4})_", file.name)
  144. if not p:
  145. continue
  146. period = p[1]
  147. year = period[:4]
  148. dest_folder = archive_path / year / period
  149. os.makedirs(dest_folder, exist_ok=True)
  150. file_hash = file_get_hash(file)
  151. if has_identical_file(dest_folder, file_hash):
  152. file.unlink()
  153. continue
  154. shutil.copy(file, archive_path.parent / file.name)
  155. file.rename(dest_folder / file.name)
  156. def has_identical_file(target: Path, file_hash: str) -> bool:
  157. for archived_file in Path(target).glob("*.json"):
  158. if file_get_hash(archived_file) == file_hash:
  159. return True
  160. return False
  161. def submit_changes(config):
  162. for file in Path(config["export_dir"] + "/temp").glob("NASA_*.json"):
  163. payload = json.load(file.open("r"))
  164. period = payload["Jahr"] + payload["Monat"]
  165. len_pos = len(payload["AfterSalesPositionen"])
  166. result = submit_data(config, payload)
  167. print_result(period, result, len_pos)
  168. def main():
  169. config = get_config()
  170. config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
  171. config["export_dir"] = str(Path(".").resolve() / "export" / "NASA")
  172. export_all_periods(config)
  173. archive_files(config["export_dir"])
  174. submit_changes(config)
  175. if __name__ == "__main__":
  176. main()