Skip to content

Commit

Permalink
Refactor HDFMetadata from class to function (#5)
Browse files Browse the repository at this point in the history
* Refactor HDF metadata from class to function to pass more args directly to hdf_to_postgres rather than through HDFMetadata object, few small bug fixes

* Updated docstrings

* _copy_worker --> copy_worker

* Add the data_formatters back into copy_worker function

* Missed format key
  • Loading branch information
bleonard33 authored and makmanalp committed Jul 18, 2018
1 parent edbb7f7 commit ad12707
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 184 deletions.
7 changes: 2 additions & 5 deletions pandas_to_postgres/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from .copy_df import DataFrameCopy
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
from .hdf_to_postgres import (
hdf_to_postgres,
create_hdf_table_objects,
)
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
from .utilities import (
logger,
HDFMetadata,
hdf_metadata,
create_file_object,
df_generator,
cast_pandas,
Expand Down
11 changes: 8 additions & 3 deletions pandas_to_postgres/_base_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def create_fks(self):
"""Create foreign key constraints on PostgreSQL table"""
for fk in self.foreign_keys:
try:
logger.info("Creating foreign key {fk.name}".format(fk.name))
logger.info("Creating foreign key {}".format(fk.name))
self.conn.execute(AddConstraint(fk))
except SQLAlchemyError:
logger.warn("Error creating foreign key {fk.name}".format(fk.name))
logger.warn("Error creating foreign key {}".format(fk.name))

def truncate(self):
"""TRUNCATE PostgreSQL table"""
Expand Down Expand Up @@ -128,12 +128,17 @@ def data_formatting(self, df, functions=[], **kwargs):
Parameters
----------
df: pandas DataFrame
dataframe to format
DataFrame to format
functions: list of functions
Functions to apply to df. each gets passed df, self as copy_obj, and all
kwargs passed to data_formatting
**kwargs
kwargs to pass on to each function
Returns
-------
df: pandas DataFrame
formatted DataFrame
"""
for f in functions:
df = f(df, copy_obj=self, **kwargs)
Expand Down
108 changes: 16 additions & 92 deletions pandas_to_postgres/copy_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,46 @@ class HDFTableCopy(BaseCopy):

def __init__(
self,
file_name,
hdf_tables,
hdf_meta,
defer_sql_objs=False,
conn=None,
table_obj=None,
sql_table=None,
csv_chunksize=10 ** 6,
hdf_chunksize=10 ** 7,
hdf_metadata=None,
):
"""
Parameters
----------
file_name
hdf_tables: list of strings
HDF keys with data corresponding to destination SQL table
(assumption being that HDF tables:SQL tables is many:one)
hdf_meta: HDFMetadata object
Information from the HDF file for use in building copy objects
defer_sql_objs: bool
multiprocessing has issue with passing SQLALchemy objects, so if
True, defer attributing these to the object until after pickled by Pool
conn: SQLAlchemy connection
conn: SQLAlchemy connection or None
Managed outside of the object
table_obj: SQLAlchemy model object
table_obj: SQLAlchemy model object or None
Destination SQL Table
sql_table: string
sql_table: string or None
SQL table name
csv_chunksize: int
Max rows to keep in memory when generating CSV for COPY
hdf_chunksize: int
Max rows to keep in memory when reading HDF file
hdf_metadata: dict or None
Dict of HDF table keys to dict of constant:value pairs. Not actively used by
any pre-defined function, but available to data_formatting method
"""
super().__init__(defer_sql_objs, conn, table_obj, sql_table, csv_chunksize)

self.hdf_tables = hdf_tables

# Info from the HDFMetadata object
self.hdf_metadata = hdf_meta.metadata_vars
self.file_name = hdf_meta.file_name
self.hdf_chunksize = hdf_meta.chunksize
self.hdf_metadata = hdf_metadata
self.file_name = file_name
self.hdf_chunksize = hdf_chunksize

def copy(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
"""
Expand Down Expand Up @@ -121,46 +125,6 @@ class SmallHDFTableCopy(HDFTableCopy):
in-memory for both reading from the HDF as well as COPYing using StringIO.
"""

def __init__(
self,
hdf_tables,
hdf_meta,
defer_sql_objs=False,
conn=None,
table_obj=None,
sql_table=None,
csv_chunksize=10 ** 6,
):
"""
Parameters
----------
hdf_tables: list of strings
HDF keys with data corresponding to destination SQL table
(assumption being that HDF tables:SQL tables is many:one)
hdf_meta: HDFMetadata object
Information from the HDF file for use in building copy objects
defer_sql_objs: bool
multiprocessing has issue with passing SQLALchemy objects, so if
True, defer attributing these to the object until after pickled by Pool
conn: SQLAlchemy connection
Managed outside of the object
table_obj: SQLAlchemy model object
Destination SQL Table
sql_table: string
SQL table name
csv_chunksize: int
Max rows to keep in memory when generating CSV for COPY
"""
super().__init__(
hdf_tables,
hdf_meta,
defer_sql_objs,
conn,
table_obj,
sql_table,
csv_chunksize,
)

def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
"""
Copy each HDF table that relates to SQL table to database
Expand Down Expand Up @@ -206,46 +170,6 @@ class BigHDFTableCopy(HDFTableCopy):
pd.read_hdf(..., iterator=True) because we found the performance was much better.
"""

def __init__(
self,
hdf_tables,
hdf_meta,
defer_sql_objs=False,
conn=None,
table_obj=None,
sql_table=None,
csv_chunksize=10 ** 6,
):
"""
Parameters
----------
hdf_tables: list of strings
HDF keys with data corresponding to destination SQL table
(assumption being that HDF tables:SQL tables is many:one)
hdf_meta: HDFMetadata object
Information from the HDF file for use in building copy objects
defer_sql_objs: bool
multiprocessing has issue with passing SQLALchemy objects, so if
True, defer attributing these to the object until after pickled by Pool
conn: SQLAlchemy connection
Managed outside of the object
table_obj: SQLAlchemy model object
Destination SQL Table
sql_table: string
SQL table name
csv_chunksize: int
Max rows to keep in memory when generating CSV for COPY
"""
super().__init__(
hdf_tables,
hdf_meta,
defer_sql_objs,
conn,
table_obj,
sql_table,
csv_chunksize,
)

def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
"""
Copy each HDF table that relates to SQL table to database
Expand Down Expand Up @@ -275,7 +199,7 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
start = 0

for i in range(n_chunks):
logger.info("*** HDF chunk {i + 1} of {} ***".format(n_chunks))
logger.info("*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks))
logger.info("Reading HDF table")
stop = min(start + self.hdf_chunksize, nrows)
df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)
Expand Down
Loading

0 comments on commit ad12707

Please sign in to comment.