from database import conn_string import pandas as pd import numpy as np import fastparquet from sqlalchemy import create_engine, inspect, schema, Table # Copied from pandas with modifications def __get_dtype(column, sqltype): import sqlalchemy.dialects as sqld from sqlalchemy.types import Integer, Float, Boolean, DateTime, Date, TIMESTAMP if isinstance(sqltype, Float): return float elif isinstance(sqltype, Integer): # Since DataFrame cannot handle nullable int, convert nullable ints to floats if column.nullable: return float # TODO: Refine integer size. return np.dtype("int64") elif isinstance(sqltype, TIMESTAMP): # we have a timezone capable type if not sqltype.timezone: return np.dtype("datetime64[ns]") return pd.DatetimeTZDtype elif isinstance(sqltype, DateTime): # Caution: np.datetime64 is also a subclass of np.number. return np.dtype("datetime64[ns]") elif isinstance(sqltype, Date): return np.date elif isinstance(sqltype, Boolean): return bool elif isinstance(sqltype, sqld.mssql.base.BIT): # Handling database provider specific types return np.dtype("u1") # Catch all type - handle provider specific types in another elif block return object def __write_parquet(output_path: str, batch_array, column_dict, write_index: bool, compression: str, append: bool): # Create the DataFrame to hold the batch array contents b_df = pd.DataFrame(batch_array, columns=column_dict) # Cast the DataFrame columns to the sqlalchemy column analogues b_df = b_df.astype(dtype=column_dict) # Write to the parquet file (first write needs append=False) fastparquet.write(output_path, b_df, write_index=write_index, compression=compression, append=append) def table_to_parquet( output_path: str, table_name: str, con, batch_size: int = 10000, write_index: bool = True, compression: str = None ): # Get database schema using sqlalchemy reflection db_engine = create_engine(con) db_inspect = inspect(db_engine) db_tables = db_inspect.get_table_names(schema="import") # Get the columns for the parquet file column_dict = dict() for column in db_inspect.get_columns(table_name, "import"): dtype = __get_dtype(column, column.type) column_dict[column.name] = dtype # Query the table with db_engine.connect() as conn: # print(db_table.select()) result = conn.execute("SELECT * FROM import.journal_accountings") row_batch = result.fetchmany(size=batch_size) append = False while len(row_batch) > 0: __write_parquet(output_path, row_batch, column_dict, write_index, compression, append) append = True row_batch = result.fetchmany(size=batch_size) if __name__ == "__main__": dsn = { "user": "sa", "password": "Mffu3011#", "server": "localhost\\GLOBALCUBE", "database": "LOCOSOFT", "driver": "mssql", "schema": "import", } conn_str = conn_string(dsn) table_to_parquet("temp", "journal_accountings", conn_str) # print(timeit.timeit(s))