123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- from database import conn_string
- import pandas as pd
- import numpy as np
- import fastparquet
- from sqlalchemy import create_engine, inspect, schema, Table
- # Copied from pandas with modifications
- def __get_dtype(column, sqltype):
- import sqlalchemy.dialects as sqld
- from sqlalchemy.types import Integer, Float, Boolean, DateTime, Date, TIMESTAMP
- if isinstance(sqltype, Float):
- return float
- elif isinstance(sqltype, Integer):
- # Since DataFrame cannot handle nullable int, convert nullable ints to floats
- if column.nullable:
- return float
- # TODO: Refine integer size.
- return np.dtype("int64")
- elif isinstance(sqltype, TIMESTAMP):
- # we have a timezone capable type
- if not sqltype.timezone:
- return np.dtype("datetime64[ns]")
- return pd.DatetimeTZDtype
- elif isinstance(sqltype, DateTime):
- # Caution: np.datetime64 is also a subclass of np.number.
- return np.dtype("datetime64[ns]")
- elif isinstance(sqltype, Date):
- return np.date
- elif isinstance(sqltype, Boolean):
- return bool
- elif isinstance(sqltype, sqld.mssql.base.BIT):
- # Handling database provider specific types
- return np.dtype("u1")
- # Catch all type - handle provider specific types in another elif block
- return object
- def __write_parquet(output_path: str, batch_array, column_dict, write_index: bool, compression: str, append: bool):
- # Create the DataFrame to hold the batch array contents
- b_df = pd.DataFrame(batch_array, columns=column_dict)
- # Cast the DataFrame columns to the sqlalchemy column analogues
- b_df = b_df.astype(dtype=column_dict)
- # Write to the parquet file (first write needs append=False)
- fastparquet.write(output_path, b_df, write_index=write_index, compression=compression, append=append)
- def table_to_parquet(
- output_path: str, table_name: str, con, batch_size: int = 10000, write_index: bool = True, compression: str = None
- ):
- # Get database schema using sqlalchemy reflection
- db_engine = create_engine(con)
- db_inspect = inspect(db_engine)
- db_tables = db_inspect.get_table_names(schema="import")
- # Get the columns for the parquet file
- column_dict = dict()
- for column in db_inspect.get_columns(table_name, "import"):
- dtype = __get_dtype(column, column.type)
- column_dict[column.name] = dtype
- # Query the table
- with db_engine.connect() as conn:
- # print(db_table.select())
- result = conn.execute("SELECT * FROM import.journal_accountings")
- row_batch = result.fetchmany(size=batch_size)
- append = False
- while len(row_batch) > 0:
- __write_parquet(output_path, row_batch, column_dict, write_index, compression, append)
- append = True
- row_batch = result.fetchmany(size=batch_size)
- if __name__ == "__main__":
- dsn = {
- "user": "sa",
- "password": "Mffu3011#",
- "server": "localhost\\GLOBALCUBE",
- "database": "LOCOSOFT",
- "driver": "mssql",
- "schema": "import",
- }
- conn_str = conn_string(dsn)
- table_to_parquet("temp", "journal_accountings", conn_str)
- # print(timeit.timeit(s))
|