import re import pandas as pd data_types = { "int64": "varchar(20)", "float64": "decimal(18,8)", } int_values = ["Menge", "Anzahl"] decimal_values = ["Betrag"] str_values = [ "Zipcode", "PLZ_1_Stelle", "PLZ_2_Stelle", "PLZ_3_Stelle", "PLZ_4_Stelle", "PLZ", ] def get_column_types(csv_file, header=None): with open(csv_file, "r", encoding="latin-1") as frh: header1 = frh.readline() if header is not None: header1 = header columns = frh.readline().split(";") column_types = {key: '"' in val for key, val in zip(header1, columns)} skip = 1 if header else 0 df = pd.read_csv( csv_file, sep=";", decimal=",", encoding="latin-1", nrows=2000, names=header, skiprows=skip, ) col_types = dict([(c, get_type(df[c], column_types.get(c, False))) for c in df.columns]) return col_types def is_datetime(value: str): return re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}", value) def is_date(value: str): return re.match(r"\d{4}-\d{2}-\d{2}", value) def get_type(df_col: pd.Series, is_str: bool) -> str: if not is_str: if str(df_col.dtype) == "float64": if all(df_col.isna()): return "varchar(255)" for entry in str_values: if entry in df_col.name: return "varchar(20)" return "decimal(18,8)" if str(df_col.dtype) == "int64": for entry in decimal_values: if entry in df_col.name: return "decimal(18,8)" for entry in int_values: if entry in df_col.name: return "int" return "varchar(20)" if all([is_datetime(str(value)) for value in df_col]): return "datetime" if all([is_date(str(value)) for value in df_col]): return "date" max_len = max([len(str(value)) for value in df_col]) if max_len < 15: return "varchar(20)" if max_len < 35: return "varchar(50)" if max_len < 85: return "varchar(100)" return "varchar(255)" if __name__ == "__main__": get_column_types("C:\\GlobalCube_Entwicklung\\Export\\belege_eds_stk_ohne_service_aw.csv")