cet.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import plac
  2. import pandas as pd
  3. from sqlalchemy import create_engine
  4. import psycopg2
  5. from os import path
  6. @plac.pos("query", "SQL Query", type=str)
  7. @plac.pos("mode", "", choices=["in", "out", "queryout"])
  8. @plac.pos("csv_file", "", type=str)
  9. @plac.opt("Server", "Hostname or DSN", type=str)
  10. @plac.opt("database", "", type=str)
  11. @plac.opt("User", "", type=str)
  12. @plac.opt("Password", "", type=str)
  13. @plac.flg("charset", "")
  14. @plac.opt("Codepage", "", type=str)
  15. @plac.opt("errorlog", "", type=str)
  16. def run(
  17. query,
  18. mode,
  19. csv_file,
  20. Server="localhost\\GLOBALCUBE",
  21. database="master",
  22. User="sa",
  23. Password="Mffu3011#",
  24. charset=False,
  25. Codepage="65001",
  26. errorlog="error.log",
  27. ):
  28. # dsn = f"dsn={Server};uid={User};pwd={Password}"
  29. dsn = f"postgresql://{User}:{Password}@{Server}/{database}"
  30. if Codepage.isnumeric():
  31. Codepage = "cp" + Codepage
  32. if mode == "queryout":
  33. queryout(dsn, query, csv_file, Codepage, errorlog)
  34. return
  35. print("This is madness")
  36. def convert_data(element):
  37. txt = str(element)
  38. txt = txt.replace("None", "")
  39. txt = txt.replace("False", "0").replace("True", "1")
  40. txt = txt.replace("\t", "").replace("\r", "").replace("\n", "")
  41. txt = txt.replace("\x81", "").replace("\x90", "")
  42. txt = "" if txt in ["nan", "NaT"] else txt
  43. return txt
  44. def queryout(dsn, query, csv_file, codepage, errorlog):
  45. if path.exists(query):
  46. with open(query, "r", encoding=codepage) as frh:
  47. query = frh.read()
  48. try:
  49. conn = create_engine(dsn).connect().execution_options(stream_results=True)
  50. df = pd.read_sql(query, conn, chunksize=1000)
  51. except Exception as e:
  52. print(e.args[1])
  53. with open(csv_file, "w", encoding=codepage, errors="replace") as fwh:
  54. print("Kopiervorgang wird gestartet...")
  55. i = 0
  56. for chunk in df:
  57. chunk_dict = chunk.to_dict(orient="records")
  58. for row in chunk_dict:
  59. try:
  60. fwh.write(("\t".join(map(convert_data, row.values())) + "\n"))
  61. except Exception as e:
  62. print(e.args[1])
  63. i += len(chunk_dict)
  64. if (len(chunk_dict)) == 1000:
  65. print(f"1000 Zeilen zum SQL Server gesendet. Insgesamt gesendet: {i}")
  66. print("")
  67. print(f"{i} Zeilen kopiert.")
  68. if __name__ == "__main__":
  69. plac.call(run)