Skip to content

Commit

Permalink
Correct person_id handling (#915)
Browse files Browse the repository at this point in the history
* release v1.0.4

* Select person_id from patient table instead of person table.

* Add connection pool env vars

* progress

* Add evnironmen variable documentation

* fix index DDL

* make mpi_client a global

* modify requirements.txt

* install phdi from main

* remove unused dependency

* Correct patient match indicator and insertion of internal and external person ids

* remove link version of generate_block_query

* remove _get_person_id

* formatting & remove unused imports

* clean up

* add coverage for person_id handling

* clean up tests

* update docstrings

* Install phdi from correct-person-id-handling

---------

Co-authored-by: m-goggins <[email protected]>
  • Loading branch information
DanPaseltiner and m-goggins authored Nov 8, 2023
1 parent b501f1d commit 26165bc
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 193 deletions.
36 changes: 27 additions & 9 deletions containers/record-linkage/app/config.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
from functools import lru_cache
from pydantic import BaseSettings

# from typing import Optional
from pydantic import BaseSettings, Field
from typing import Optional


class Settings(BaseSettings):
mpi_db_type: str
mpi_dbname: str
mpi_host: str
mpi_user: str
mpi_password: str
mpi_port: str
mpi_db_type: str = Field(
description="The type of database used by the MPI",
)
mpi_dbname: str = Field(
description="The name of the database used by the MPI",
)
mpi_host: str = Field(
description="The host name of the MPI database",
)
mpi_user: str = Field(
description="The name of the user used to connect to the MPI database",
)
mpi_password: str = Field(
description="The password used to connect to the MPI database",
)
mpi_port: str = Field(description="The port used to connect to the MPI database")
connection_pool_size: Optional[int] = Field(
description="The number of MPI database connections in the connection pool",
default=5,
)
connection_pool_max_overflow: Optional[int] = Field(
description="The maximum number of MPI database connections that can be opened "
"above the connection pool size",
default=10,
)


@lru_cache()
Expand Down
20 changes: 11 additions & 9 deletions containers/record-linkage/app/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from app.config import get_settings
from typing import Annotated
from fastapi import Response, status, Body
from pathlib import Path
Expand All @@ -11,15 +10,16 @@
)
from pydantic import BaseModel, Field
from typing import Optional
from app.utils import (
read_json_from_assets,
run_migrations,
)
from app.utils import read_json_from_assets, run_migrations, get_settings
from phdi.linkage.mpi import DIBBsMPIConnectorClient

# Ensure MPI is configured as expected.
run_migrations()

settings = get_settings()
MPI_CLIENT = DIBBsMPIConnectorClient(
pool_size=settings["connection_pool_size"],
max_overflow=settings["connection_pool_max_overflow"],
)
# Instantiate FastAPI via PHDI's BaseService class
app = BaseService(
service_name="DIBBs Record Linkage Service",
Expand Down Expand Up @@ -173,11 +173,13 @@ async def link_record(
"message": "Supplied bundle contains no Patient resource to link on.",
}

# Initialize a DB connection for use with the MPI
# Then, link away
# Now link the record
try:
(found_match, new_person_id) = link_record_against_mpi(
record_to_link, algo_config, external_id
record=record_to_link,
algo_config=algo_config,
external_person_id=external_id,
mpi_client=MPI_CLIENT,
)
updated_bundle = add_person_resource(
new_person_id, record_to_link.get("id", ""), input_bundle
Expand Down
8 changes: 4 additions & 4 deletions containers/record-linkage/migrations/V01_02__indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ Indexing by the filter conditions for each block improves performance.
*/

-- Block 1 - First 4 characters of address and last 4 characters of MRN
CREATE INDEX IF NOT EXISTS address_line_1_index ON address (left(line_1, 4))
CREATE INDEX IF NOT EXISTS address_line_1_index ON address (left(line_1, 4));

CREATE INDEX IF NOT EXISTS identifier_value_and_type_code_index ON identifier (right(patient_identifier, 4), type_code)
CREATE INDEX IF NOT EXISTS identifier_value_and_type_code_index ON identifier (right(patient_identifier, 4), type_code);

-- Block 2 - First 4 characters of last name and first 4 characters of first name
CREATE INDEX IF NOT EXISTS name_last_name_index ON name (left(last_name, 4))
CREATE INDEX IF NOT EXISTS name_last_name_index ON name (left(last_name, 4));

CREATE INDEX IF NOT EXISTS given_name_given_name_index ON given_name (left(given_name, 4))
CREATE INDEX IF NOT EXISTS given_name_given_name_index ON given_name (left(given_name, 4));

COMMIT;
2 changes: 1 addition & 1 deletion containers/record-linkage/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ typing_extensions==4.5.0
uvicorn
pydantic==1.10.9
fastapi==0.96.0
phdi @ git+https://github.com/CDCgov/phdi@main
phdi @ git+https://github.com/CDCgov/phdi@correct-person-id-handling
httpx
pathlib
pyway
Expand Down
1 change: 1 addition & 0 deletions containers/record-linkage/tests/test_record_linkage.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def test_linkage_invalid_db_type():


def test_linkage_success():
_clean_up()
set_mpi_env_vars()
run_migrations()
test_bundle = load_test_bundle()
Expand Down
2 changes: 0 additions & 2 deletions phdi/linkage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
feature_match_four_char,
perform_linkage_pass,
score_linkage_vs_truth,
_generate_block_query,
calculate_m_probs,
calculate_u_probs,
load_json_probs,
Expand Down Expand Up @@ -46,7 +45,6 @@
"feature_match_four_char",
"perform_linkage_pass",
"score_linkage_vs_truth",
"_generate_block_query",
"calculate_m_probs",
"calculate_u_probs",
"load_json_probs",
Expand Down
23 changes: 0 additions & 23 deletions phdi/linkage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,3 @@ def _generate_block_query(self, block_critieria: dict) -> Select:
criteria.
"""
pass # pragma: no cover

@abstractmethod
def _get_person_id() -> str:
"""
If person id is not supplied then generate a new person record
with a new person id.
If an external person id is not supplied then just return the new
person id.
If an external person id is supplied then check if there is an
external person record created for this external person id.
If the external person record exists then verify that the person id,
either supplied or newly created, is linked to the external person record.
If the person id supplied, or newly created, is not linked in the found
external person record then create a new external person record using
the supplied external person id and the person id (either supplied
or newly created).
If the external person record does exist and is linked to the person id,
either supplied or newly created, then just return the person id.
If an external person record does not exist with the supplied external
person id then create a new external person record and link it to the
the person id, either supplied or newly created. Then return the person id.
"""
pass # pragma: no cover
49 changes: 13 additions & 36 deletions phdi/linkage/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from itertools import combinations
from math import log
from random import sample
from typing import List, Callable, Dict, Union
from typing import List, Callable, Union

from phdi.harmonization.utils import compare_strings
from phdi.fhir.utils import extract_value_with_resource_path
Expand Down Expand Up @@ -629,20 +629,19 @@ def link_record_against_mpi(
)
else:
linkage_scores[person] = belongingness_ratio
# Didn't match any person in our database
if len(linkage_scores) == 0:
(matched, new_person_id, results) = mpi_client.insert_matched_patient(
record, person_id=None, external_person_id=external_person_id
)
return (matched, new_person_id)
person_id = None
matched = False

# Determine strongest match, upsert, then let the caller know
else:
best_person = _find_strongest_link(linkage_scores)
mpi_client.insert_matched_patient(
record, person_id=best_person, external_person_id=external_person_id
)
return (True, best_person)
# If we found any matches, find the strongest one
if len(linkage_scores) != 0:
person_id = _find_strongest_link(linkage_scores)
matched = True

person_id = mpi_client.insert_matched_patient(
record, person_id=person_id, external_person_id=external_person_id
)

return (matched, person_id)


def load_json_probs(path: pathlib.Path):
Expand Down Expand Up @@ -1410,28 +1409,6 @@ def _match_within_block_cluster_ratio(
return clusters


def _generate_block_query(table_name: str, block_data: Dict) -> str:
"""
Generates a query for selecting a block of data from `table_name` per the block_data
parameters.
:param table_name: Table name.
:param block_data: Dictionary containing key value pairs for the column name you for
blocking and the data for the incoming record, e.g., ["ZIP"]: "90210".
:return: Query to select block of data base on `block_data` parameters.
"""
query_stub = f"SELECT * FROM {table_name} WHERE "
block_query = " AND ".join(
[
key + f" = '{value}'" if type(value) is str else (key + f" = {value}")
for key, value in block_data.items()
]
)
query = query_stub + block_query
return query


def _is_empty_extraction_field(block_vals: dict, field: str):
"""
Helper method that determines when a field extracted from an incoming
Expand Down
Loading

0 comments on commit 26165bc

Please sign in to comment.