Skip to content

Commit

Permalink
Implement connection pooling (#908)
Browse files Browse the repository at this point in the history
* release v1.0.4

* Add connection pool to DAL.

* Add connection pool to MPI client

* remove schema init from MPI

* move schema init to dal

* add mpi_client param to link_record_against_mpi

* Make get_connection and public DAL method and call in mpi.__init__

* update DAL tests

* Resolve remaining test issues

* black

---------

Co-authored-by: m-goggins <[email protected]>
  • Loading branch information
DanPaseltiner and m-goggins authored Nov 2, 2023
1 parent ed4eeae commit c4b5712
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 28 deletions.
13 changes: 12 additions & 1 deletion phdi/linkage/dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ def __init__(self) -> None:
self.EXTERNAL_SOURCE_TABLE = None
self.TABLE_LIST = []

def get_connection(self, engine_url: str, engine_echo: bool = False) -> None:
def get_connection(
self,
engine_url: str,
engine_echo: bool = False,
pool_size: int = 5,
max_overflow: int = 10,
) -> None:
"""
Establish a connection to the database
Expand All @@ -42,6 +48,9 @@ def get_connection(self, engine_url: str, engine_echo: bool = False) -> None:
:param engine_url: The URL of the database engine
:param engine_echo: If True, print SQL statements to stdout
:param pool_size: The number of connections to keep open in the connection pool
:param max_overflow: The number of connections to allow in the connection pool
“overflow”
:return: None
"""

Expand All @@ -50,6 +59,8 @@ def get_connection(self, engine_url: str, engine_echo: bool = False) -> None:
engine_url,
client_encoding="utf8",
echo=engine_echo,
pool_size=pool_size,
max_overflow=max_overflow,
)

self.session = scoped_session(
Expand Down
7 changes: 4 additions & 3 deletions phdi/linkage/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from phdi.harmonization.utils import compare_strings
from phdi.fhir.utils import extract_value_with_resource_path

from phdi.linkage.mpi import DIBBsMPIConnectorClient
from phdi.linkage.mpi import DIBBsMPIConnectorClient, BaseMPIConnectorClient
from phdi.linkage.utils import datetime_to_str

LINKING_FIELDS_TO_FHIRPATHS = {
Expand Down Expand Up @@ -541,6 +541,7 @@ def link_record_against_mpi(
record: dict,
algo_config: List[dict],
external_person_id: str = None,
mpi_client: BaseMPIConnectorClient = None,
) -> tuple[bool, str]:
"""
Runs record linkage on a single incoming record (extracted from a FHIR
Expand All @@ -563,8 +564,8 @@ def link_record_against_mpi(
new Person ID or the ID of an existing matched Person).
"""
# Initialize MPI client
mpi_client = DIBBsMPIConnectorClient()
mpi_client._initialize_schema()
if mpi_client is None:
mpi_client = DIBBsMPIConnectorClient()

# Need to bind function names back to their symbolic invocations
# in context of the module--i.e. turn the string of a function
Expand Down
16 changes: 10 additions & 6 deletions phdi/linkage/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ class DIBBsMPIConnectorClient(BaseMPIConnectorClient):

matched: bool = False

def __init__(self):
def __init__(self, pool_size: int = 5, max_overflow: int = 10):
"""
Initialize the MPI connector client with the MPI database.
:param pool_size: The number of connections to keep open to the database.
:param max_overflow: The number of connections to allow in connection pool.
"""
dbsettings = load_mpi_env_vars_os()
dbuser = dbsettings.get("user")
dbname = dbsettings.get("dbname")
Expand All @@ -30,9 +35,11 @@ def __init__(self):
self.dal = DataAccessLayer()
self.dal.get_connection(
engine_url=f"postgresql+psycopg2://{dbuser}:"
+ f"{dbpwd}@{dbhost}:{dbport}/{dbname}"
+ f"{dbpwd}@{dbhost}:{dbport}/{dbname}",
pool_size=pool_size,
max_overflow=max_overflow,
)

self.dal.initialize_schema()
self.column_to_fhirpaths = {
"patient": {
"root_path": "Patient",
Expand Down Expand Up @@ -100,9 +107,6 @@ def __init__(self):
},
}

def _initialize_schema(self):
self.dal.initialize_schema()

def get_block_data(self, block_criteria: Dict) -> List[list]:
"""
Returns a list of lists containing records from the MPI database that
Expand Down
4 changes: 1 addition & 3 deletions tests/linkage/test_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def test_get_connection():
assert isinstance(dal.engine, Engine)
assert dal.session is not None
assert isinstance(dal.session, scoped_session)

assert dal.PATIENT_TABLE is None
assert dal.PERSON_TABLE is None
assert dal.NAME_TABLE is None
Expand All @@ -82,7 +83,6 @@ def test_get_connection():

def test_initialize_schema():
dal = _init_db()
dal.initialize_schema()

assert dal.engine is not None
assert isinstance(dal.engine, Engine)
Expand Down Expand Up @@ -185,7 +185,6 @@ def test_get_table_by_name():
_clean_up(dal)

dal2 = _init_db()
dal2.initialize_schema()

table = dal2.get_table_by_name("address")
assert isinstance(table, Table)
Expand Down Expand Up @@ -273,7 +272,6 @@ def test_select_results():
records_to_add = [pt1, pt2]
pks = dal.bulk_insert_list(dal.PATIENT_TABLE, records_to_add, True)
mpi = DIBBsMPIConnectorClient()
mpi._initialize_schema()
blocked_data_query = mpi._generate_block_query(
block_data, select(dal.PATIENT_TABLE)
)
Expand Down
16 changes: 9 additions & 7 deletions tests/linkage/test_linkage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ def _init_db() -> DataAccessLayer:
"mpi_port": "5432",
"mpi_db_type": "postgres",
}
MPI = DIBBsMPIConnectorClient()
MPI.dal.get_connection(

dal = DataAccessLayer()
dal.get_connection(
engine_url="postgresql+psycopg2://postgres:pw@localhost:5432/testdb"
)
_clean_up(MPI.dal)
_clean_up(dal)

# load ddl
schema_ddl = open(
Expand All @@ -76,15 +77,16 @@ def _init_db() -> DataAccessLayer:
).read()

try:
with MPI.dal.engine.connect() as db_conn:
with dal.engine.connect() as db_conn:
db_conn.execute(text(schema_ddl))
db_conn.commit()
except Exception as e:
print(e)
with MPI.dal.engine.connect() as db_conn:
with dal.engine.connect() as db_conn:
db_conn.rollback()
MPI._initialize_schema()
return MPI
dal.initialize_schema()

return DIBBsMPIConnectorClient()


def _clean_up(dal):
Expand Down
18 changes: 10 additions & 8 deletions tests/linkage/test_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ def _init_db() -> DataAccessLayer:
"mpi_port": "5432",
"mpi_db_type": "postgres",
}
MPI = DIBBsMPIConnectorClient()
MPI.dal.get_connection(

dal = DataAccessLayer()
dal.get_connection(
engine_url="postgresql+psycopg2://postgres:pw@localhost:5432/testdb"
)
_clean_up(MPI.dal)
_clean_up(dal)

# load ddl
schema_ddl = open(
Expand All @@ -45,15 +46,16 @@ def _init_db() -> DataAccessLayer:
).read()

try:
with MPI.dal.engine.connect() as db_conn:
with dal.engine.connect() as db_conn:
db_conn.execute(text(schema_ddl))
db_conn.commit()
except Exception as e:
print(e)
with MPI.dal.engine.connect() as db_conn:
with dal.engine.connect() as db_conn:
db_conn.rollback()
MPI._initialize_schema()
return MPI
dal.initialize_schema()

return DIBBsMPIConnectorClient()


def _clean_up(dal):
Expand Down Expand Up @@ -331,7 +333,7 @@ def test_init():
"mpi_port": "5432",
"mpi_db_type": "postgres",
}

_init_db()
MPI = DIBBsMPIConnectorClient()

assert MPI is not None
Expand Down

0 comments on commit c4b5712

Please sign in to comment.