Skip to content

Commit

Permalink
Better logging for multiprocessing (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
bleonard33 authored and makmanalp committed Jul 18, 2018
1 parent ad12707 commit b8c8439
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pandas_to_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
from .utilities import (
logger,
hdf_metadata,
create_file_object,
df_generator,
cast_pandas,
get_logger,
)
32 changes: 18 additions & 14 deletions pandas_to_postgres/_base_copy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .utilities import logger
from .utilities import get_logger
from sqlalchemy.schema import AddConstraint, DropConstraint
from sqlalchemy.exc import SQLAlchemyError

Expand Down Expand Up @@ -36,14 +36,15 @@ def __init__(
self.csv_chunksize = csv_chunksize

if not defer_sql_objs:
self.instantiate_sql_objs(conn, table_obj)
self.instantiate_attrs(conn, table_obj)
else:
self.sql_table = sql_table

def instantiate_sql_objs(self, conn, table_obj):
def instantiate_attrs(self, conn, table_obj):
"""
When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes
issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs
When using multiprocessing, pickling of logger and SQLAlchemy objects in
__init__ causes issues, so allow for deferring until after the pickling to fetch
SQLAlchemy objs
Parameters
----------
Expand All @@ -55,6 +56,7 @@ def instantiate_sql_objs(self, conn, table_obj):
self.conn = conn
self.table_obj = table_obj
self.sql_table = table_obj.name
self.logger = get_logger(self.sql_table)
self.primary_key = table_obj.primary_key
self.foreign_keys = table_obj.foreign_key_constraints

Expand All @@ -63,45 +65,47 @@ def drop_pk(self):
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
constraints that may rely on the PK
"""
logger.info("Dropping {} primary key".format(self.sql_table))
self.logger.info("Dropping {} primary key".format(self.sql_table))
try:
with self.conn.begin_nested():
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
except SQLAlchemyError:
logger.info("{} primary key not found. Skipping".format(self.sql_table))
self.logger.info(
"{} primary key not found. Skipping".format(self.sql_table)
)

def create_pk(self):
"""Create primary key constraints on PostgreSQL table"""
logger.info("Creating {} primary key".format(self.sql_table))
self.logger.info("Creating {} primary key".format(self.sql_table))
self.conn.execute(AddConstraint(self.primary_key))

def drop_fks(self):
"""Drop foreign key constraints on PostgreSQL table"""
for fk in self.foreign_keys:
logger.info("Dropping foreign key {}".format(fk.name))
self.logger.info("Dropping foreign key {}".format(fk.name))
try:
with self.conn.begin_nested():
self.conn.execute(DropConstraint(fk))
except SQLAlchemyError:
logger.warn("Foreign key {} not found".format(fk.name))
self.logger.warn("Foreign key {} not found".format(fk.name))

def create_fks(self):
"""Create foreign key constraints on PostgreSQL table"""
for fk in self.foreign_keys:
try:
logger.info("Creating foreign key {}".format(fk.name))
self.logger.info("Creating foreign key {}".format(fk.name))
self.conn.execute(AddConstraint(fk))
except SQLAlchemyError:
logger.warn("Error creating foreign key {}".format(fk.name))
self.logger.warn("Error creating foreign key {}".format(fk.name))

def truncate(self):
"""TRUNCATE PostgreSQL table"""
logger.info("Truncating {}".format(self.sql_table))
self.logger.info("Truncating {}".format(self.sql_table))
self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table))

def analyze(self):
"""Run ANALYZE on PostgreSQL table"""
logger.info("Analyzing {}".format(self.sql_table))
self.logger.info("Analyzing {}".format(self.sql_table))
self.conn.execute("ANALYZE {};".format(self.sql_table))

def copy_from_file(self, file_object):
Expand Down
10 changes: 5 additions & 5 deletions pandas_to_postgres/copy_df.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .utilities import create_file_object, df_generator, logger, cast_pandas
from .utilities import create_file_object, df_generator, cast_pandas
from ._base_copy import BaseCopy


Expand Down Expand Up @@ -38,17 +38,17 @@ def copy(self, functions=[cast_pandas]):
with self.conn.begin():
self.truncate()

logger.info("Creating generator for chunking dataframe")
self.logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(self.df, self.csv_chunksize):

logger.info("Creating CSV in memory")
self.logger.info("Creating CSV in memory")
fo = create_file_object(chunk)

logger.info("Copying chunk to database")
self.logger.info("Copying chunk to database")
self.copy_from_file(fo)
del fo

logger.info("All chunks copied ({} rows)".format(self.rows))
self.logger.info("All chunks copied ({} rows)".format(self.rows))

self.create_pk()
self.create_fks()
Expand Down
62 changes: 34 additions & 28 deletions pandas_to_postgres/copy_hdf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .utilities import create_file_object, df_generator, logger, cast_pandas
from ._base_copy import BaseCopy
import pandas as pd
from .utilities import create_file_object, df_generator, cast_pandas
from ._base_copy import BaseCopy


class HDFTableCopy(BaseCopy):
Expand Down Expand Up @@ -90,33 +90,35 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
"""
if self.hdf_tables is None:
logger.warn("No HDF table found for SQL table {}".format(self.sql_table))
self.logger.warn(
"No HDF table found for SQL table {}".format(self.sql_table)
)
return

for hdf_table in self.hdf_tables:
logger.info("*** {} ***".format(hdf_table))
self.logger.info("*** {} ***".format(hdf_table))

logger.info("Reading HDF table")
self.logger.info("Reading HDF table")
df = pd.read_hdf(self.file_name, key=hdf_table)
self.rows += len(df)

data_formatter_kwargs["hdf_table"] = hdf_table
logger.info("Formatting data")
self.logger.info("Formatting data")
df = self.data_formatting(
df, functions=data_formatters, **data_formatter_kwargs
)

logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(df, self.csv_chunksize):
self.logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):

logger.info("Creating CSV in memory")
self.logger.info("Creating CSV in memory")
fo = create_file_object(chunk)

logger.info("Copying chunk to database")
self.logger.info("Copying chunk to database")
self.copy_from_file(fo)
del fo
del df
logger.info("All chunks copied ({} rows)".format(self.rows))
self.logger.info("All chunks copied ({} rows)".format(self.rows))


class SmallHDFTableCopy(HDFTableCopy):
Expand All @@ -136,29 +138,29 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
"""
if self.hdf_tables is None:
logger.warn("No HDF table found for SQL table {self.sql_table}")
self.logger.warn("No HDF table found for SQL table {self.sql_table}")
return

for hdf_table in self.hdf_tables:
logger.info("*** {} ***".format(hdf_table))
logger.info("Reading HDF table")
self.logger.info("*** {} ***".format(hdf_table))
self.logger.info("Reading HDF table")
df = pd.read_hdf(self.file_name, key=hdf_table)
self.rows += len(df)

data_formatter_kwargs["hdf_table"] = hdf_table
logger.info("Formatting data")
self.logger.info("Formatting data")
df = self.data_formatting(
df, functions=data_formatters, **data_formatter_kwargs
)

logger.info("Creating CSV in memory")
self.logger.info("Creating CSV in memory")
fo = create_file_object(df)

logger.info("Copying table to database")
self.logger.info("Copying table to database")
self.copy_from_file(fo)
del df
del fo
logger.info("All chunks copied ({} rows)".format(self.rows))
self.logger.info("All chunks copied ({} rows)".format(self.rows))


class BigHDFTableCopy(HDFTableCopy):
Expand All @@ -181,11 +183,13 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
"""
if self.hdf_tables is None:
logger.warn("No HDF table found for SQL table {}".format(self.sql_table))
self.logger.warn(
"No HDF table found for SQL table {}".format(self.sql_table)
)
return

for hdf_table in self.hdf_tables:
logger.info("*** {} ***".format(hdf_table))
self.logger.info("*** {} ***".format(hdf_table))

with pd.HDFStore(self.file_name) as store:
nrows = store.get_storer(hdf_table).nrows
Expand All @@ -199,26 +203,28 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
start = 0

for i in range(n_chunks):
logger.info("*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks))
logger.info("Reading HDF table")
self.logger.info(
"*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks)
)
self.logger.info("Reading HDF table")
stop = min(start + self.hdf_chunksize, nrows)
df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)

start += self.hdf_chunksize

data_formatter_kwargs["hdf_table"] = hdf_table
logger.info("Formatting data")
self.logger.info("Formatting data")
df = self.data_formatting(
df, functions=data_formatters, **data_formatter_kwargs
)

logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(df, self.csv_chunksize):
logger.info("Creating CSV in memory")
self.logger.info("Creating generator for chunking dataframe")
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
self.logger.info("Creating CSV in memory")
fo = create_file_object(chunk)

logger.info("Copying chunk to database")
self.logger.info("Copying chunk to database")
self.copy_from_file(fo)
del fo
del df
logger.info("All chunks copied ({} rows)".format(self.rows))
self.logger.info("All chunks copied ({} rows)".format(self.rows))
7 changes: 5 additions & 2 deletions pandas_to_postgres/hdf_to_postgres.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from multiprocessing import Pool
from sqlalchemy import MetaData, create_engine
from .copy_hdf import HDFTableCopy
from .utilities import cast_pandas
from .utilities import cast_pandas, get_logger


logger = get_logger("hdf_to_postgres")


def create_hdf_table_objects(
Expand Down Expand Up @@ -92,7 +95,7 @@ def copy_worker(
if table_obj is None:
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))

copy_obj.instantiate_sql_objs(conn, table_obj)
copy_obj.instantiate_attrs(conn, table_obj)

# Run the task
copy_obj.copy(
Expand Down
21 changes: 12 additions & 9 deletions pandas_to_postgres/utilities.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import logging
from pandas import HDFStore, isna
from collections import defaultdict
from pandas import isna, HDFStore
from io import StringIO


logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s %(asctime)s.%(msecs)03d %(message)s",
datefmt="%Y-%m-%d,%H:%M:%S",
)
def get_logger(name):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s %(message)s",
datefmt="%Y-%m-%d,%H:%M:%S",
)

logger = logging.getLogger("pandas_to_postgres")
return logging.getLogger(name)


def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]):
Expand Down Expand Up @@ -42,6 +43,7 @@ def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]):

sql_to_hdf = defaultdict(set)
metadata_vars = defaultdict(dict)
logger = get_logger("hdf_metadata")

with HDFStore(file_name, mode="r") as store:
keys = keys or store.keys()
Expand Down Expand Up @@ -90,7 +92,7 @@ def create_file_object(df):
return file_object


def df_generator(df, chunksize=10 ** 6):
def df_generator(df, chunksize=10 ** 6, logger=None):
"""
Create a generator to iterate over chunks of a dataframe
Expand All @@ -108,7 +110,8 @@ def df_generator(df, chunksize=10 ** 6):
n_chunks = (df.shape[0] // chunksize) + 1

for i in range(n_chunks):
logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks))
if logger:
logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks))
yield df.iloc[rows : rows + chunksize]
rows += chunksize

Expand Down

0 comments on commit b8c8439

Please sign in to comment.