diff --git a/containers/record-linkage/app/config.py b/containers/record-linkage/app/config.py index 4a888bde3c..7df8a60747 100644 --- a/containers/record-linkage/app/config.py +++ b/containers/record-linkage/app/config.py @@ -1,16 +1,34 @@ from functools import lru_cache -from pydantic import BaseSettings - -# from typing import Optional +from pydantic import BaseSettings, Field +from typing import Optional class Settings(BaseSettings): - mpi_db_type: str - mpi_dbname: str - mpi_host: str - mpi_user: str - mpi_password: str - mpi_port: str + mpi_db_type: str = Field( + description="The type of database used by the MPI", + ) + mpi_dbname: str = Field( + description="The name of the database used by the MPI", + ) + mpi_host: str = Field( + description="The host name of the MPI database", + ) + mpi_user: str = Field( + description="The name of the user used to connect to the MPI database", + ) + mpi_password: str = Field( + description="The password used to connect to the MPI database", + ) + mpi_port: str = Field(description="The port used to connect to the MPI database") + connection_pool_size: Optional[int] = Field( + description="The number of MPI database connections in the connection pool", + default=5, + ) + connection_pool_max_overflow: Optional[int] = Field( + description="The maximum number of MPI database connections that can be opened " + "above the connection pool size", + default=10, + ) @lru_cache() diff --git a/containers/record-linkage/app/main.py b/containers/record-linkage/app/main.py index ee1f37791c..e9c9172bce 100644 --- a/containers/record-linkage/app/main.py +++ b/containers/record-linkage/app/main.py @@ -1,4 +1,3 @@ -from app.config import get_settings from typing import Annotated from fastapi import Response, status, Body from pathlib import Path @@ -11,15 +10,16 @@ ) from pydantic import BaseModel, Field from typing import Optional -from app.utils import ( - read_json_from_assets, - run_migrations, -) +from app.utils import read_json_from_assets, run_migrations, get_settings from phdi.linkage.mpi import DIBBsMPIConnectorClient # Ensure MPI is configured as expected. run_migrations() - +settings = get_settings() +MPI_CLIENT = DIBBsMPIConnectorClient( + pool_size=settings["connection_pool_size"], + max_overflow=settings["connection_pool_max_overflow"], +) # Instantiate FastAPI via PHDI's BaseService class app = BaseService( service_name="DIBBs Record Linkage Service", @@ -173,11 +173,13 @@ async def link_record( "message": "Supplied bundle contains no Patient resource to link on.", } - # Initialize a DB connection for use with the MPI - # Then, link away + # Now link the record try: (found_match, new_person_id) = link_record_against_mpi( - record_to_link, algo_config, external_id + record=record_to_link, + algo_config=algo_config, + external_person_id=external_id, + mpi_client=MPI_CLIENT, ) updated_bundle = add_person_resource( new_person_id, record_to_link.get("id", ""), input_bundle diff --git a/containers/record-linkage/migrations/V01_02__indexes.sql b/containers/record-linkage/migrations/V01_02__indexes.sql index e7b83202ee..b909cdbcc2 100644 --- a/containers/record-linkage/migrations/V01_02__indexes.sql +++ b/containers/record-linkage/migrations/V01_02__indexes.sql @@ -33,13 +33,13 @@ Indexing by the filter conditions for each block improves performance. */ -- Block 1 - First 4 characters of address and last 4 characters of MRN -CREATE INDEX IF NOT EXISTS address_line_1_index ON address (left(line_1, 4)) +CREATE INDEX IF NOT EXISTS address_line_1_index ON address (left(line_1, 4)); -CREATE INDEX IF NOT EXISTS identifier_value_and_type_code_index ON identifier (right(patient_identifier, 4), type_code) +CREATE INDEX IF NOT EXISTS identifier_value_and_type_code_index ON identifier (right(patient_identifier, 4), type_code); -- Block 2 - First 4 characters of last name and first 4 characters of first name -CREATE INDEX IF NOT EXISTS name_last_name_index ON name (left(last_name, 4)) +CREATE INDEX IF NOT EXISTS name_last_name_index ON name (left(last_name, 4)); -CREATE INDEX IF NOT EXISTS given_name_given_name_index ON given_name (left(given_name, 4)) +CREATE INDEX IF NOT EXISTS given_name_given_name_index ON given_name (left(given_name, 4)); COMMIT; diff --git a/containers/record-linkage/requirements.txt b/containers/record-linkage/requirements.txt index 0a08ef9d14..a6eaa90438 100644 --- a/containers/record-linkage/requirements.txt +++ b/containers/record-linkage/requirements.txt @@ -2,7 +2,7 @@ typing_extensions==4.5.0 uvicorn pydantic==1.10.9 fastapi==0.96.0 -phdi @ git+https://github.com/CDCgov/phdi@main +phdi @ git+https://github.com/CDCgov/phdi@correct-person-id-handling httpx pathlib pyway diff --git a/containers/record-linkage/tests/test_record_linkage.py b/containers/record-linkage/tests/test_record_linkage.py index 62efe92a14..9e8e7768bb 100644 --- a/containers/record-linkage/tests/test_record_linkage.py +++ b/containers/record-linkage/tests/test_record_linkage.py @@ -118,6 +118,7 @@ def test_linkage_invalid_db_type(): def test_linkage_success(): + _clean_up() set_mpi_env_vars() run_migrations() test_bundle = load_test_bundle() diff --git a/phdi/linkage/__init__.py b/phdi/linkage/__init__.py index 089b3ab33f..6343d60b1e 100644 --- a/phdi/linkage/__init__.py +++ b/phdi/linkage/__init__.py @@ -11,7 +11,6 @@ feature_match_four_char, perform_linkage_pass, score_linkage_vs_truth, - _generate_block_query, calculate_m_probs, calculate_u_probs, load_json_probs, @@ -46,7 +45,6 @@ "feature_match_four_char", "perform_linkage_pass", "score_linkage_vs_truth", - "_generate_block_query", "calculate_m_probs", "calculate_u_probs", "load_json_probs", diff --git a/phdi/linkage/core.py b/phdi/linkage/core.py index d7275a8207..873eb26b17 100644 --- a/phdi/linkage/core.py +++ b/phdi/linkage/core.py @@ -45,26 +45,3 @@ def _generate_block_query(self, block_critieria: dict) -> Select: criteria. """ pass # pragma: no cover - - @abstractmethod - def _get_person_id() -> str: - """ - If person id is not supplied then generate a new person record - with a new person id. - If an external person id is not supplied then just return the new - person id. - If an external person id is supplied then check if there is an - external person record created for this external person id. - If the external person record exists then verify that the person id, - either supplied or newly created, is linked to the external person record. - If the person id supplied, or newly created, is not linked in the found - external person record then create a new external person record using - the supplied external person id and the person id (either supplied - or newly created). - If the external person record does exist and is linked to the person id, - either supplied or newly created, then just return the person id. - If an external person record does not exist with the supplied external - person id then create a new external person record and link it to the - the person id, either supplied or newly created. Then return the person id. - """ - pass # pragma: no cover diff --git a/phdi/linkage/link.py b/phdi/linkage/link.py index 79392f532b..1c38ddcacf 100644 --- a/phdi/linkage/link.py +++ b/phdi/linkage/link.py @@ -8,7 +8,7 @@ from itertools import combinations from math import log from random import sample -from typing import List, Callable, Dict, Union +from typing import List, Callable, Union from phdi.harmonization.utils import compare_strings from phdi.fhir.utils import extract_value_with_resource_path @@ -629,20 +629,19 @@ def link_record_against_mpi( ) else: linkage_scores[person] = belongingness_ratio - # Didn't match any person in our database - if len(linkage_scores) == 0: - (matched, new_person_id, results) = mpi_client.insert_matched_patient( - record, person_id=None, external_person_id=external_person_id - ) - return (matched, new_person_id) + person_id = None + matched = False - # Determine strongest match, upsert, then let the caller know - else: - best_person = _find_strongest_link(linkage_scores) - mpi_client.insert_matched_patient( - record, person_id=best_person, external_person_id=external_person_id - ) - return (True, best_person) + # If we found any matches, find the strongest one + if len(linkage_scores) != 0: + person_id = _find_strongest_link(linkage_scores) + matched = True + + person_id = mpi_client.insert_matched_patient( + record, person_id=person_id, external_person_id=external_person_id + ) + + return (matched, person_id) def load_json_probs(path: pathlib.Path): @@ -1410,28 +1409,6 @@ def _match_within_block_cluster_ratio( return clusters -def _generate_block_query(table_name: str, block_data: Dict) -> str: - """ - Generates a query for selecting a block of data from `table_name` per the block_data - parameters. - - :param table_name: Table name. - :param block_data: Dictionary containing key value pairs for the column name you for - blocking and the data for the incoming record, e.g., ["ZIP"]: "90210". - :return: Query to select block of data base on `block_data` parameters. - - """ - query_stub = f"SELECT * FROM {table_name} WHERE " - block_query = " AND ".join( - [ - key + f" = '{value}'" if type(value) is str else (key + f" = {value}") - for key, value in block_data.items() - ] - ) - query = query_stub + block_query - return query - - def _is_empty_extraction_field(block_vals: dict, field: str): """ Helper method that determines when a field extracted from an incoming diff --git a/phdi/linkage/mpi.py b/phdi/linkage/mpi.py index 7241bcf897..5ac637129d 100644 --- a/phdi/linkage/mpi.py +++ b/phdi/linkage/mpi.py @@ -1,4 +1,4 @@ -from typing import List, Dict, Union +from typing import List, Dict from sqlalchemy import Select, and_, func, literal_column, select, text from sqlalchemy.dialects.postgresql import aggregate_order_by from phdi.linkage.core import BaseMPIConnectorClient @@ -19,8 +19,6 @@ class DIBBsMPIConnectorClient(BaseMPIConnectorClient): """ - matched: bool = False - def __init__(self, pool_size: int = 5, max_overflow: int = 10): """ Initialize the MPI connector client with the MPI database. @@ -153,7 +151,7 @@ def insert_matched_patient( patient_resource: Dict, person_id=None, external_person_id=None, - ) -> Union[None, tuple]: + ) -> str: """ If a matching person ID has been found in the MPI, inserts a new patient into the patient table and all other subsequent MPI tables, including the @@ -167,25 +165,27 @@ def insert_matched_patient( found in the MPI, defaults to None. :param external_person_id: The external person id for the person that matches the patient record if a match has been found in the MPI, defaults to None. - :return: the person id """ try: - correct_person_id = self._get_person_id( - person_id=person_id, external_person_id=external_person_id - ) - patient_resource["person"] = correct_person_id + if person_id is None: + person_id = self._insert_person() + + patient_resource["person"] = person_id mpi_records = self._get_mpi_records(patient_resource) - return_results = self.dal.bulk_insert_dict( - records_with_table=mpi_records, return_primary_keys=True + self.dal.bulk_insert_dict( + records_with_table=mpi_records, return_primary_keys=False ) + if external_person_id is not None: + self._insert_external_person_id(person_id, external_person_id) + except Exception as error: # pragma: no cover raise ValueError(f"{error}") - return (self.matched, correct_person_id, return_results) + return person_id def _generate_where_criteria(self, block_criteria: dict, table_name: str) -> list: """ @@ -515,77 +515,56 @@ def _extract_given_names(self, given_names: list, name_id: uuid) -> dict: table_records.append(record) return table_records - def _get_person_id( + def _insert_external_person_id( self, person_id: str, external_person_id: str, - ) -> str: - """ - If person id is not supplied then generate a new person record - with a new person id. - If an external person id is not supplied then just return the new - person id. - If an external person id is supplied then check if there is an - external person record created for this external person id. - If the external person record exists then verify that the person id, - either supplied or newly created, is linked to the external person record. - If the person id supplied, or newly created, is not linked in the found - external person record then create a new external person record using - the supplied external person id and the person id (either supplied - or newly created). - If the external person record does exist and is linked to the person id, - either supplied or newly created, then just return the person id. - If an external person record does not exist with the supplied external - person id then create a new external person record and link it to the - the person id, either supplied or newly created. Then return the person id. + ): """ - if person_id is None: - new_person_id = self._insert_person() - else: - self.matched = True - new_person_id = person_id + Inserts a new external person id record into the MPI if the external_person_id + does not already exist in the MPI. - if external_person_id is None: - return new_person_id + :param person_id: The person_id matching the patient record if a match has been + found in the MPI. + :param external_person_id: The external_person_id for the patient record if it + exists. + + """ + if person_id is None or external_person_id is None: # pragma: no cover + raise ValueError("person_id and external_person_id must be provided.") - query = select(self.dal.EXTERNAL_PERSON_TABLE).where( + external_source_id_query = select(self.dal.EXTERNAL_SOURCE_TABLE).where( text( - f"{self.dal.EXTERNAL_PERSON_TABLE.name}.external_person_id" - + f" = '{external_person_id}'" + f"{self.dal.EXTERNAL_SOURCE_TABLE.name}.external_source_name" + + " = 'IRIS'" ) ) - external_person_record = self.dal.select_results(query, False) - - if len(external_person_record) > 0: - self.matched = True - found_person_id = external_person_record[0][1] - else: - found_person_id = None + external_source_record = self.dal.select_results( + external_source_id_query, False + ) + if len(external_source_record) > 0: + external_source_id = external_source_record[0][0] - if found_person_id is None or found_person_id != new_person_id: - # Retrive external_source_id - external_source_id_query = select(self.dal.EXTERNAL_SOURCE_TABLE).where( + query = select(self.dal.EXTERNAL_PERSON_TABLE).where( text( - f"{self.dal.EXTERNAL_SOURCE_TABLE.name}.external_source_name" - + " = 'IRIS'" + f"{self.dal.EXTERNAL_PERSON_TABLE.name}.external_person_id" + + f" = '{external_person_id}' AND " + + f"{self.dal.EXTERNAL_PERSON_TABLE.name}.person_id = '{person_id}'" + + f" AND {self.dal.EXTERNAL_PERSON_TABLE.name}.external_source_id " + + f" = '{external_source_id}'" ) ) - external_source_record = self.dal.select_results( - external_source_id_query, False - ) - if len(external_source_record) > 0: - external_source_id = external_source_record[0][0] - else: - external_source_id = None - new_external_person_record = { - "person_id": new_person_id, - "external_person_id": external_person_id, - "external_source_id": external_source_id, - } - self.dal.bulk_insert_list( - self.dal.EXTERNAL_PERSON_TABLE, [new_external_person_record], False - ) - return new_person_id + external_person_record = self.dal.select_results(query, False) + + if len(external_person_record) == 0: + new_external_person_record = { + "person_id": person_id, + "external_person_id": external_person_id, + "external_source_id": external_source_id, + } + self.dal.bulk_insert_list( + self.dal.EXTERNAL_PERSON_TABLE, [new_external_person_record], False + ) def _generate_dict_record_from_results( self, results_list: List[list] diff --git a/tests/linkage/test_mpi.py b/tests/linkage/test_mpi.py index e2c4d88111..e854cbb1e4 100644 --- a/tests/linkage/test_mpi.py +++ b/tests/linkage/test_mpi.py @@ -5,7 +5,7 @@ import re import pytest import uuid -from sqlalchemy import Select, select, text +from sqlalchemy import Select, select, text, insert from phdi.linkage.mpi import DIBBsMPIConnectorClient from phdi.linkage.dal import DataAccessLayer @@ -339,8 +339,7 @@ def test_insert_matched_patient(): result = MPI.insert_matched_patient(patient_resource) assert result is not None - assert not result[0] - assert result[1] is not None + person_rec = MPI.dal.select_results(select(MPI.dal.PERSON_TABLE)) patient_rec = MPI.dal.select_results(select(MPI.dal.PATIENT_TABLE)) name_rec = MPI.dal.select_results(select(MPI.dal.NAME_TABLE)) @@ -360,15 +359,25 @@ def test_insert_matched_patient(): _clean_up(MPI.dal) MPI = _init_db() + # Insert fake IRIS external source id for testing + external_source_id = uuid.uuid4() + stmt = insert(MPI.dal.EXTERNAL_SOURCE_TABLE).values( + external_source_id=external_source_id, + external_source_name="IRIS", + external_source_description="Fake surveillance system", + ) + with MPI.dal.engine.connect() as conn: + result = conn.execute(stmt) + conn.commit() + external_person_id = "EXT-1233456" result = MPI.insert_matched_patient( patient_resource=patient_resource, person_id=None, - external_person_id="EXT-1233456", + external_person_id=external_person_id, ) assert result is not None - assert not result[0] - assert result[1] is not None + EXTERNAL_PERSON_rec = MPI.dal.select_results(select(MPI.dal.EXTERNAL_PERSON_TABLE)) person_rec = MPI.dal.select_results(select(MPI.dal.PERSON_TABLE)) patient_rec = MPI.dal.select_results(select(MPI.dal.PATIENT_TABLE)) @@ -380,6 +389,8 @@ def test_insert_matched_patient(): assert len(person_rec) == 2 assert len(EXTERNAL_PERSON_rec) == 2 + assert EXTERNAL_PERSON_rec[1][2] == external_person_id + assert EXTERNAL_PERSON_rec[1][3] == external_source_id assert len(patient_rec) == 2 assert len(name_rec) == 2 assert len(given_name_rec) == 3 @@ -389,42 +400,43 @@ def test_insert_matched_patient(): _clean_up(MPI.dal) - -def test_get_person_id(): + # Test for missing external_person_id MPI = _init_db() - result = MPI._get_person_id(person_id=None, external_person_id=None) - assert result is not None + # Insert fake IRIS external source id for testing + external_source_id = uuid.uuid4() + stmt = insert(MPI.dal.EXTERNAL_SOURCE_TABLE).values( + external_source_id=external_source_id, + external_source_name="IRIS", + external_source_description="Fake surveillance system", + ) + with MPI.dal.engine.connect() as conn: + result = conn.execute(stmt) + conn.commit() - results = MPI.dal.select_results(select(MPI.dal.PERSON_TABLE)) - assert results[1][0] == result - - result2 = MPI._get_person_id(person_id=result, external_person_id=None) - assert result2 == result - results2 = MPI.dal.select_results(select(MPI.dal.EXTERNAL_PERSON_TABLE)) - assert len(results2) == 1 - assert results2[0][0] == "external_id" - - result3 = MPI._get_person_id(person_id=result, external_person_id="MYEXTID-123") - assert result3 == result - results3 = MPI.dal.select_results(select(MPI.dal.EXTERNAL_PERSON_TABLE)) - assert len(results3) == 2 - assert results3[0][0] == "external_id" - assert results3[1][0] is not None - assert results3[1][1] == result3 - assert results3[1][2] == "MYEXTID-123" - - result4 = MPI._get_person_id(person_id=None, external_person_id="MYEXTID-789") - assert result4 is not None - assert result4 != result - query = select(MPI.dal.EXTERNAL_PERSON_TABLE).where( - text(f"{MPI.dal.EXTERNAL_PERSON_TABLE.name}.external_person_id = 'MYEXTID-789'") + result = MPI.insert_matched_patient( + patient_resource=patient_resource, + person_id=None, + external_person_id=None, ) - results4 = MPI.dal.select_results(query) - assert len(results4) == 2 - assert results4[0][0] == "external_id" - assert results4[1][0] is not None - assert results4[1][1] == result4 - assert results4[1][2] == "MYEXTID-789" + assert result is not None + + EXTERNAL_PERSON_rec = MPI.dal.select_results(select(MPI.dal.EXTERNAL_PERSON_TABLE)) + person_rec = MPI.dal.select_results(select(MPI.dal.PERSON_TABLE)) + patient_rec = MPI.dal.select_results(select(MPI.dal.PATIENT_TABLE)) + name_rec = MPI.dal.select_results(select(MPI.dal.NAME_TABLE)) + given_name_rec = MPI.dal.select_results(select(MPI.dal.GIVEN_NAME_TABLE)) + address_rec = MPI.dal.select_results(select(MPI.dal.ADDRESS_TABLE)) + phone_rec = MPI.dal.select_results(select(MPI.dal.PHONE_TABLE)) + id_rec = MPI.dal.select_results(select(MPI.dal.ID_TABLE)) + + assert len(person_rec) == 2 + assert len(EXTERNAL_PERSON_rec) == 1 + assert len(patient_rec) == 2 + assert len(name_rec) == 2 + assert len(given_name_rec) == 3 + assert len(address_rec) == 2 + assert len(phone_rec) == 2 + assert len(id_rec) == 2 _clean_up(MPI.dal)