csv_column_types.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import re
  2. import pandas as pd
  3. data_types = {
  4. "int64": "varchar(20)",
  5. "float64": "decimal(18,8)",
  6. }
  7. int_values = ["Menge", "Anzahl"]
  8. decimal_values = ["Betrag"]
  9. str_values = [
  10. "Zipcode",
  11. "PLZ_1_Stelle",
  12. "PLZ_2_Stelle",
  13. "PLZ_3_Stelle",
  14. "PLZ_4_Stelle",
  15. "PLZ",
  16. ]
  17. def get_column_types(csv_file, header=None):
  18. with open(csv_file, "r", encoding="latin-1") as frh:
  19. header1 = frh.readline()
  20. if header is not None:
  21. header1 = header
  22. columns = frh.readline().split(";")
  23. column_types = {key: '"' in val for key, val in zip(header1, columns)}
  24. skip = 1 if header else 0
  25. df = pd.read_csv(
  26. csv_file,
  27. sep=";",
  28. decimal=",",
  29. encoding="latin-1",
  30. nrows=2000,
  31. names=header,
  32. skiprows=skip,
  33. )
  34. col_types = dict([(c, get_type(df[c], column_types.get(c, False))) for c in df.columns])
  35. return col_types
  36. def is_datetime(value: str):
  37. return re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}", value)
  38. def is_date(value: str):
  39. return re.match(r"\d{4}-\d{2}-\d{2}", value)
  40. def get_type(df_col: pd.Series, is_str: bool) -> str:
  41. if not is_str:
  42. if str(df_col.dtype) == "float64":
  43. if all(df_col.isna()):
  44. return "varchar(255)"
  45. for entry in str_values:
  46. if entry in df_col.name:
  47. return "varchar(20)"
  48. return "decimal(18,8)"
  49. if str(df_col.dtype) == "int64":
  50. for entry in decimal_values:
  51. if entry in df_col.name:
  52. return "decimal(18,8)"
  53. for entry in int_values:
  54. if entry in df_col.name:
  55. return "int"
  56. return "varchar(20)"
  57. if all([is_datetime(str(value)) for value in df_col]):
  58. return "datetime"
  59. if all([is_date(str(value)) for value in df_col]):
  60. return "date"
  61. max_len = max([len(str(value)) for value in df_col])
  62. if max_len < 15:
  63. return "varchar(20)"
  64. if max_len < 35:
  65. return "varchar(50)"
  66. if max_len < 85:
  67. return "varchar(100)"
  68. return "varchar(255)"
  69. if __name__ == "__main__":
  70. get_column_types("C:\\GlobalCube_Entwicklung\\Export\\belege_eds_stk_ohne_service_aw.csv")