diff --git a/pandas_to_postgres/__init__.py b/pandas_to_postgres/__init__.py index 5ab7dca..cfc068d 100644 --- a/pandas_to_postgres/__init__.py +++ b/pandas_to_postgres/__init__.py @@ -2,9 +2,9 @@ from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker from .utilities import ( - logger, hdf_metadata, create_file_object, df_generator, cast_pandas, + get_logger, ) diff --git a/pandas_to_postgres/_base_copy.py b/pandas_to_postgres/_base_copy.py index b33604d..54307a3 100644 --- a/pandas_to_postgres/_base_copy.py +++ b/pandas_to_postgres/_base_copy.py @@ -1,4 +1,4 @@ -from .utilities import logger +from .utilities import get_logger from sqlalchemy.schema import AddConstraint, DropConstraint from sqlalchemy.exc import SQLAlchemyError @@ -36,14 +36,15 @@ def __init__( self.csv_chunksize = csv_chunksize if not defer_sql_objs: - self.instantiate_sql_objs(conn, table_obj) + self.instantiate_attrs(conn, table_obj) else: self.sql_table = sql_table - def instantiate_sql_objs(self, conn, table_obj): + def instantiate_attrs(self, conn, table_obj): """ - When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes - issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs + When using multiprocessing, pickling of logger and SQLAlchemy objects in + __init__ causes issues, so allow for deferring until after the pickling to fetch + SQLAlchemy objs Parameters ---------- @@ -55,6 +56,7 @@ def instantiate_sql_objs(self, conn, table_obj): self.conn = conn self.table_obj = table_obj self.sql_table = table_obj.name + self.logger = get_logger(self.sql_table) self.primary_key = table_obj.primary_key self.foreign_keys = table_obj.foreign_key_constraints @@ -63,45 +65,47 @@ def drop_pk(self): Drop primary key constraints on PostgreSQL table as well as CASCADE any other constraints that may rely on the PK """ - logger.info("Dropping {} primary key".format(self.sql_table)) + self.logger.info("Dropping {} primary key".format(self.sql_table)) try: with self.conn.begin_nested(): self.conn.execute(DropConstraint(self.primary_key, cascade=True)) except SQLAlchemyError: - logger.info("{} primary key not found. Skipping".format(self.sql_table)) + self.logger.info( + "{} primary key not found. Skipping".format(self.sql_table) + ) def create_pk(self): """Create primary key constraints on PostgreSQL table""" - logger.info("Creating {} primary key".format(self.sql_table)) + self.logger.info("Creating {} primary key".format(self.sql_table)) self.conn.execute(AddConstraint(self.primary_key)) def drop_fks(self): """Drop foreign key constraints on PostgreSQL table""" for fk in self.foreign_keys: - logger.info("Dropping foreign key {}".format(fk.name)) + self.logger.info("Dropping foreign key {}".format(fk.name)) try: with self.conn.begin_nested(): self.conn.execute(DropConstraint(fk)) except SQLAlchemyError: - logger.warn("Foreign key {} not found".format(fk.name)) + self.logger.warn("Foreign key {} not found".format(fk.name)) def create_fks(self): """Create foreign key constraints on PostgreSQL table""" for fk in self.foreign_keys: try: - logger.info("Creating foreign key {}".format(fk.name)) + self.logger.info("Creating foreign key {}".format(fk.name)) self.conn.execute(AddConstraint(fk)) except SQLAlchemyError: - logger.warn("Error creating foreign key {}".format(fk.name)) + self.logger.warn("Error creating foreign key {}".format(fk.name)) def truncate(self): """TRUNCATE PostgreSQL table""" - logger.info("Truncating {}".format(self.sql_table)) + self.logger.info("Truncating {}".format(self.sql_table)) self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table)) def analyze(self): """Run ANALYZE on PostgreSQL table""" - logger.info("Analyzing {}".format(self.sql_table)) + self.logger.info("Analyzing {}".format(self.sql_table)) self.conn.execute("ANALYZE {};".format(self.sql_table)) def copy_from_file(self, file_object): diff --git a/pandas_to_postgres/copy_df.py b/pandas_to_postgres/copy_df.py index 9e51fd4..1554c9d 100644 --- a/pandas_to_postgres/copy_df.py +++ b/pandas_to_postgres/copy_df.py @@ -1,4 +1,4 @@ -from .utilities import create_file_object, df_generator, logger, cast_pandas +from .utilities import create_file_object, df_generator, cast_pandas from ._base_copy import BaseCopy @@ -38,17 +38,17 @@ def copy(self, functions=[cast_pandas]): with self.conn.begin(): self.truncate() - logger.info("Creating generator for chunking dataframe") + self.logger.info("Creating generator for chunking dataframe") for chunk in df_generator(self.df, self.csv_chunksize): - logger.info("Creating CSV in memory") + self.logger.info("Creating CSV in memory") fo = create_file_object(chunk) - logger.info("Copying chunk to database") + self.logger.info("Copying chunk to database") self.copy_from_file(fo) del fo - logger.info("All chunks copied ({} rows)".format(self.rows)) + self.logger.info("All chunks copied ({} rows)".format(self.rows)) self.create_pk() self.create_fks() diff --git a/pandas_to_postgres/copy_hdf.py b/pandas_to_postgres/copy_hdf.py index b4694a9..7332ae7 100644 --- a/pandas_to_postgres/copy_hdf.py +++ b/pandas_to_postgres/copy_hdf.py @@ -1,6 +1,6 @@ -from .utilities import create_file_object, df_generator, logger, cast_pandas -from ._base_copy import BaseCopy import pandas as pd +from .utilities import create_file_object, df_generator, cast_pandas +from ._base_copy import BaseCopy class HDFTableCopy(BaseCopy): @@ -90,33 +90,35 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}): data_formatter_kwargs: list of kwargs to pass to data_formatters functions """ if self.hdf_tables is None: - logger.warn("No HDF table found for SQL table {}".format(self.sql_table)) + self.logger.warn( + "No HDF table found for SQL table {}".format(self.sql_table) + ) return for hdf_table in self.hdf_tables: - logger.info("*** {} ***".format(hdf_table)) + self.logger.info("*** {} ***".format(hdf_table)) - logger.info("Reading HDF table") + self.logger.info("Reading HDF table") df = pd.read_hdf(self.file_name, key=hdf_table) self.rows += len(df) data_formatter_kwargs["hdf_table"] = hdf_table - logger.info("Formatting data") + self.logger.info("Formatting data") df = self.data_formatting( df, functions=data_formatters, **data_formatter_kwargs ) - logger.info("Creating generator for chunking dataframe") - for chunk in df_generator(df, self.csv_chunksize): + self.logger.info("Creating generator for chunking dataframe") + for chunk in df_generator(df, self.csv_chunksize, logger=self.logger): - logger.info("Creating CSV in memory") + self.logger.info("Creating CSV in memory") fo = create_file_object(chunk) - logger.info("Copying chunk to database") + self.logger.info("Copying chunk to database") self.copy_from_file(fo) del fo del df - logger.info("All chunks copied ({} rows)".format(self.rows)) + self.logger.info("All chunks copied ({} rows)".format(self.rows)) class SmallHDFTableCopy(HDFTableCopy): @@ -136,29 +138,29 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}): data_formatter_kwargs: list of kwargs to pass to data_formatters functions """ if self.hdf_tables is None: - logger.warn("No HDF table found for SQL table {self.sql_table}") + self.logger.warn("No HDF table found for SQL table {self.sql_table}") return for hdf_table in self.hdf_tables: - logger.info("*** {} ***".format(hdf_table)) - logger.info("Reading HDF table") + self.logger.info("*** {} ***".format(hdf_table)) + self.logger.info("Reading HDF table") df = pd.read_hdf(self.file_name, key=hdf_table) self.rows += len(df) data_formatter_kwargs["hdf_table"] = hdf_table - logger.info("Formatting data") + self.logger.info("Formatting data") df = self.data_formatting( df, functions=data_formatters, **data_formatter_kwargs ) - logger.info("Creating CSV in memory") + self.logger.info("Creating CSV in memory") fo = create_file_object(df) - logger.info("Copying table to database") + self.logger.info("Copying table to database") self.copy_from_file(fo) del df del fo - logger.info("All chunks copied ({} rows)".format(self.rows)) + self.logger.info("All chunks copied ({} rows)".format(self.rows)) class BigHDFTableCopy(HDFTableCopy): @@ -181,11 +183,13 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}): data_formatter_kwargs: list of kwargs to pass to data_formatters functions """ if self.hdf_tables is None: - logger.warn("No HDF table found for SQL table {}".format(self.sql_table)) + self.logger.warn( + "No HDF table found for SQL table {}".format(self.sql_table) + ) return for hdf_table in self.hdf_tables: - logger.info("*** {} ***".format(hdf_table)) + self.logger.info("*** {} ***".format(hdf_table)) with pd.HDFStore(self.file_name) as store: nrows = store.get_storer(hdf_table).nrows @@ -199,26 +203,28 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}): start = 0 for i in range(n_chunks): - logger.info("*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks)) - logger.info("Reading HDF table") + self.logger.info( + "*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks) + ) + self.logger.info("Reading HDF table") stop = min(start + self.hdf_chunksize, nrows) df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop) start += self.hdf_chunksize data_formatter_kwargs["hdf_table"] = hdf_table - logger.info("Formatting data") + self.logger.info("Formatting data") df = self.data_formatting( df, functions=data_formatters, **data_formatter_kwargs ) - logger.info("Creating generator for chunking dataframe") - for chunk in df_generator(df, self.csv_chunksize): - logger.info("Creating CSV in memory") + self.logger.info("Creating generator for chunking dataframe") + for chunk in df_generator(df, self.csv_chunksize, logger=self.logger): + self.logger.info("Creating CSV in memory") fo = create_file_object(chunk) - logger.info("Copying chunk to database") + self.logger.info("Copying chunk to database") self.copy_from_file(fo) del fo del df - logger.info("All chunks copied ({} rows)".format(self.rows)) + self.logger.info("All chunks copied ({} rows)".format(self.rows)) diff --git a/pandas_to_postgres/hdf_to_postgres.py b/pandas_to_postgres/hdf_to_postgres.py index 58155e7..29556da 100644 --- a/pandas_to_postgres/hdf_to_postgres.py +++ b/pandas_to_postgres/hdf_to_postgres.py @@ -1,7 +1,10 @@ from multiprocessing import Pool from sqlalchemy import MetaData, create_engine from .copy_hdf import HDFTableCopy -from .utilities import cast_pandas +from .utilities import cast_pandas, get_logger + + +logger = get_logger("hdf_to_postgres") def create_hdf_table_objects( @@ -92,7 +95,7 @@ def copy_worker( if table_obj is None: raise ValueError("Table {} does not exist.".format(copy_obj.sql_table)) - copy_obj.instantiate_sql_objs(conn, table_obj) + copy_obj.instantiate_attrs(conn, table_obj) # Run the task copy_obj.copy( diff --git a/pandas_to_postgres/utilities.py b/pandas_to_postgres/utilities.py index 7f1a959..a104b5c 100644 --- a/pandas_to_postgres/utilities.py +++ b/pandas_to_postgres/utilities.py @@ -1,16 +1,17 @@ import logging -from pandas import HDFStore, isna from collections import defaultdict +from pandas import isna, HDFStore from io import StringIO -logging.basicConfig( - level=logging.DEBUG, - format="%(levelname)s %(asctime)s.%(msecs)03d %(message)s", - datefmt="%Y-%m-%d,%H:%M:%S", -) +def get_logger(name): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s %(message)s", + datefmt="%Y-%m-%d,%H:%M:%S", + ) -logger = logging.getLogger("pandas_to_postgres") + return logging.getLogger(name) def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]): @@ -42,6 +43,7 @@ def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]): sql_to_hdf = defaultdict(set) metadata_vars = defaultdict(dict) + logger = get_logger("hdf_metadata") with HDFStore(file_name, mode="r") as store: keys = keys or store.keys() @@ -90,7 +92,7 @@ def create_file_object(df): return file_object -def df_generator(df, chunksize=10 ** 6): +def df_generator(df, chunksize=10 ** 6, logger=None): """ Create a generator to iterate over chunks of a dataframe @@ -108,7 +110,8 @@ def df_generator(df, chunksize=10 ** 6): n_chunks = (df.shape[0] // chunksize) + 1 for i in range(n_chunks): - logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks)) + if logger: + logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks)) yield df.iloc[rows : rows + chunksize] rows += chunksize