Skip to content

Commit

Permalink
Separate the DAL and MPI functionality - Add new DDL for multi-table …
Browse files Browse the repository at this point in the history
…schema for MPI (#828)

* Initial commit - this is cleaning up some of the connection stuff as well as prepping for adding schema migrations

* WIP - still a LOT to do

* More progress - making parallel DAL/MPI to existing

* Create config.py

* Got the tests working - IT WORKS!

* Restored original MPI (postgres.py) but left it in the mpi folder - removed print statements

* restored tests for previous happy path

* additional clean-up - added new table schema into MPI to be able to test in parallel until we are ready to cut over

* Added to do statements

* minor changes

* fixed issue with message parser out-puting "" instead of null

* renamed to generic dal

* Removed message-parser changes - split out into a separate branch/PR

* restructure folders for linkage

* Adding tests

* Update test_mpi.py

* Update utils.py

* Update utils.py

* adding tests for the DAL

* fix format and added a few more tests

* cleaned up some docstrings in the google style and added argument types

* run black

* Added more tests and commented out some code that is currently not used

* added tests and commenting out code which will be updated in another ticket to get test coverage up

* Update postgres_mpi.py

* Fixed testing errors

* LOTS of changes - Finally got it all working - will need to clean up and add more tests

* Delete result.py

* flake8 fixes

* Commented out code that will be changed and added a test case

* Update postgres_mpi.py

* removed print statements that aren't needed

* general clean-up - added TODOs and removed commented out code

* moved select transaction functionality into the dal - added tests

* renamed dal to be DataAccessLayer

* Updated Schema with most recent changes - initialize all the new tables in the DAL - create mechanism to get all tables necessary for blocking

* adding and fixing tests with updated schema

* Fixed some DDL errors and added/updated tests to use new schema

* fixed all errors in tests and updated tests for DAL

* fixed some ddl bugs - created the base query for data for linkage purposes - started working on CTE queries in the generate_block_query function

* Creating functions to simplify generating the CTE queries and attaching them to base - mostly done - just need to handle for given name scenario

* cte queries added to base query - just need to write some tests to validate all this and we should be good to go

* Made some minor tweaks to fix some minor bugs - testing query generator in MPI indirectly

* tested mpi functionality in current state and added test cases

* Update test_mpi.py

* change city & state to earthly entities

* update patient bundle

* remove Zakera Ward and Citadel

* update for LoL blocks, not LoLoL

* change tests for LoL output from blocking, not LoLoL

* simplified the code base by a lot - added some tests and fixed some tests - still working on given_name issue

* Completed the given_name scenario all handled by using the meta data from the tables and FKs to generate query - added tests to verify

* Update postgres_mpi.py

* Update postgres_mpi.py

* lots of additional functions to simplify the overall workflow

* temp change

* add new MPI connector to linkage tests

* condense address to LoL and expand to include city, state, and zip

* Update postgres_mpi.py

* Made some modifications to the dal and updated tests cases - all are passing again

* Added more helper functions - updated MPI - just need to wrap up patient inserts and add tests

* temp update

* Created new core for updated functions and cleaned up functions - some updates/changes to help with implementing MPI in link.py

* Fixed all bugs and have all tests passing for DAL and MPI with additional changes - still need to finish multi part insert for patient

* Fixed all tests - still need to add a few more tests and finish the full insert

* more helper functions and inserts almost done - tests still required

* INSERTS are COMPLETE!!  Just need to add more tests and fix one test

* added additional tests and bolstering for utils and config functions

* Moved new DAL and MPI as main and deleted old code, fixed bugs, added tests

* Tests all working - code is working

* minor clean-up

* Update dal.py

* Update test.yaml

* changed name of test file - weird errors during pick-up testing

* Updating comments to make the functions more clear and a bit of clean up

* more doc comments updated

* more comments added.

* Some basic clean-up and reformatting

* Added/updated comments - removed some TODOs - other general clean-up

* changes based on testing w Brady

* Update phdi/linkage/core.py

Co-authored-by: Marcelle <[email protected]>

* Added some tests - updated some tests - updated comments based upon feedback

* Fixed comments and expanded names as suggested in PR

* expanding table names

* fixed tests

* Fix _get_mpi_values and rename to _extract_patient_data

* update to match

* temp

* refer to self

* add function for correctly extracting given_names into their own table

* add tests for extract_given_name

* add tests for _get_mpi_records and update for None values in given names

* pass patient_ids to appropriate tables

* update tests for passing in name_id

* Implement transformations and inserts in the new MPI using the new flattened schema (#840)

* updated migration name

* Removed unused Utils - fixed tests - added ordering to records in dict

* provide example in comments

* flake 8 fixes

* fixed bug in identifier section of fhir paths

* Update test_mpi.py

* Remove sorting

* added sort capability to dal to insert tables based upon requirement from DB and FKs - updated tests

* updated identifier.value to identifier.patient_identifier - fixed a bug for finding MRN in the organize block query

* Renamed postgres_mpi.py to just mpi.py - fixed some linting issues - fixed tests

* remove test junk

* fix None and name_id generation issue

* implement new MPI for testing

* tests for correct matching person and patient IDs

* removed dummy function

* update enhanced algo test

* fix null test to have a match

* add TODO

* update where connector client is imported from

* Fixed a couple of bugs in the MPI - still running into issue with PK issues (I think this is where we are inserting name with same PK value)

* Fixed another bug.  New name_id for every instance of name in patient record

* fix indent to allow multiple blocking passes

* simplify tests

* rip Brandon's easter eggs

* modify test_multi_element_blocking for new MPI schema

* remove obsolete function

* set the matched if a person_id is supplied or external person id is supplied and a person record is found with it

* Update test_mpi_utils.py

* Update test_mpi_utils.py

* removed test items

* remove old MPI connection env vars

* replace old client with new one

* remove old db table names

* point to feature branch

* clean up linkage function

* rename schema file

* update birthdate dob change

* Updated subquery for given_name to combine all the given_names into a single line separated by a space - ordering by given_name_index

* Update test_mpi.py

* Remove \u200b characters

* file rename

* rename and remove old schema

* update schema to allow 7 varchar for `sex` instead of 3

* added helper function to convert date -> str; updated test for functions

* removed for flake8

* delete unused func

* turn off formatting

* added string version of date to test data

* made datetime_to_str more flexible, new tests

* long str -> multi line

* add additional patient resources to test file for seeding

* add file to seeind data in local db

* remove extra break

* Threshold log odds weights (#873)

* Threshold log odds weights

* Fix log-odds scoring

* Another odds update

* Conjoin given name

* clean up

* clean up pt 2

* remove _clean_up

* add additonal patients

* make generic connector client name

* finished adding tests for multiple given names

* linebreak for docstring

* name change

* name change part II

* fix base connector client name

* fix base connector client name

* generalize connector client name

* remove copy

* testing

* change default external_source_id to IRIS

* add LAC as external source

* generalize description

* remove on conflict

* add IRIS external source data

* fix quoting

* add external_source_id

* fix index to refer to id, not name

* clean up testing

* add check for external_source_id existing

* Update containers/record-linkage/app/main.py

* add flake8 ignore

---------

Co-authored-by: robertmitchellv <[email protected]>
Co-authored-by: m-goggins <[email protected]>
Co-authored-by: Marcelle <[email protected]>
Co-authored-by: DanPaseltiner <[email protected]>
Co-authored-by: Nick Clyde <[email protected]>
Co-authored-by: bamader <[email protected]>
  • Loading branch information
7 people authored Oct 25, 2023
1 parent ce38db4 commit 2144717
Show file tree
Hide file tree
Showing 37 changed files with 237,516 additions and 235,896 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
- name: Install dependencies
run: poetry install
- name: test_postgres_connection
run: poetry run pytest tests/linkage/test_postgres_mpi_connector.py
run: poetry run pytest tests/linkage/test_mpi.py
unit-test-phdi-building-blocks-lib:
runs-on: ubuntu-latest
services:
Expand Down
4 changes: 2 additions & 2 deletions containers/record-linkage/app/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from functools import lru_cache
from pydantic import BaseSettings

# from typing import Optional


class Settings(BaseSettings):
mpi_db_type: str
Expand All @@ -9,8 +11,6 @@ class Settings(BaseSettings):
mpi_user: str
mpi_password: str
mpi_port: str
mpi_patient_table: str
mpi_person_table: str


@lru_cache()
Expand Down
7 changes: 3 additions & 4 deletions containers/record-linkage/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
from pydantic import BaseModel, Field
from typing import Optional
from app.utils import (
connect_to_mpi_with_env_vars,
read_json_from_assets,
run_migrations,
)
from phdi.linkage.mpi import DIBBsMPIConnectorClient

# Ensure MPI is configured as expected.
run_migrations()
Expand Down Expand Up @@ -105,7 +105,7 @@ async def health_check() -> HealthCheckResponse:
"""

try:
connect_to_mpi_with_env_vars()
mpi_client = DIBBsMPIConnectorClient() # noqa: F841
except Exception as err:
return {"status": "OK", "mpi_connection_status": str(err)}
return {"status": "OK", "mpi_connection_status": "OK"}
Expand Down Expand Up @@ -176,9 +176,8 @@ async def link_record(
# Initialize a DB connection for use with the MPI
# Then, link away
try:
db_client = connect_to_mpi_with_env_vars()
(found_match, new_person_id) = link_record_against_mpi(
record_to_link, algo_config, db_client, external_id
record_to_link, algo_config, external_id
)
updated_bundle = add_person_resource(
new_person_id, record_to_link.get("id", ""), input_bundle
Expand Down
18 changes: 0 additions & 18 deletions containers/record-linkage/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,6 @@
from typing import Literal

from app.config import get_settings
from phdi.linkage import DIBBsConnectorClient


def connect_to_mpi_with_env_vars():
"""
Helper function to load MPI Database settings from the relevant
environment variables, then spin up a connection to the MPI.
This also automatically tests that a connection can be made as
part of instantiating the DB Client.
"""
dbname, user, password, host = load_mpi_env_vars_os()
port = get_settings().get("mpi_port")
patient_table = get_settings().get("mpi_patient_table")
person_table = get_settings().get("mpi_person_table")
db_client = DIBBsConnectorClient(
dbname, user, password, host, port, patient_table, person_table
)
return db_client


def load_mpi_env_vars_os():
Expand Down
100 changes: 100 additions & 0 deletions containers/record-linkage/migrations/V01_01__flat_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
BEGIN;

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE IF NOT EXISTS person (
person_id UUID DEFAULT uuid_generate_v4 (),
PRIMARY KEY (person_id)
);

CREATE TABLE IF NOT EXISTS patient (
patient_id UUID DEFAULT uuid_generate_v4 (),
person_id UUID,
dob DATE,
sex VARCHAR(7),
race VARCHAR(100),
ethnicity VARCHAR(100),
PRIMARY KEY (patient_id),
CONSTRAINT fk_patient_to_person FOREIGN KEY(person_id) REFERENCES person(person_id)
);

CREATE TABLE IF NOT EXISTS name (
name_id UUID DEFAULT uuid_generate_v4 (),
patient_id UUID,
last_name VARCHAR(255),
type VARCHAR(100),
PRIMARY KEY (name_id),
CONSTRAINT fk_name_to_patient FOREIGN KEY(patient_id) REFERENCES patient(patient_id)
);

CREATE TABLE IF NOT EXISTS given_name (
given_name_id UUID DEFAULT uuid_generate_v4 (),
name_id UUID,
given_name VARCHAR(255),
given_name_index INTEGER,
PRIMARY KEY (given_name_id),
CONSTRAINT fk_given_to_name FOREIGN KEY(name_id) REFERENCES name(name_id)
);

CREATE TABLE IF NOT EXISTS identifier (
identifier_id UUID DEFAULT uuid_generate_v4 (),
patient_id UUID,
patient_identifier VARCHAR(255),
type_code VARCHAR(255),
type_display VARCHAR(255),
type_system VARCHAR(255),
PRIMARY KEY (identifier_id),
CONSTRAINT fk_ident_to_patient FOREIGN KEY(patient_id) REFERENCES patient(patient_id)
);

CREATE TABLE IF NOT EXISTS phone_number (
phone_id UUID DEFAULT uuid_generate_v4 (),
patient_id UUID,
phone_number VARCHAR(20),
type VARCHAR(100),
start_date TIMESTAMP,
end_date TIMESTAMP,
PRIMARY KEY (phone_id),
CONSTRAINT fk_phone_to_patient FOREIGN KEY(patient_id) REFERENCES patient(patient_id)
);

CREATE TABLE IF NOT EXISTS address (
address_id UUID DEFAULT uuid_generate_v4 (),
patient_id UUID,
type VARCHAR(100),
line_1 VARCHAR(100),
line_2 VARCHAR(100),
city VARCHAR(255),
zip_code VARCHAR(10),
state VARCHAR(100),
country VARCHAR(255),
latitude DECIMAL,
longitude DECIMAL,
start_date TIMESTAMP,
end_date TIMESTAMP,
PRIMARY KEY (address_id),
CONSTRAINT fk_addr_to_patient FOREIGN KEY(patient_id) REFERENCES patient(patient_id)
);

CREATE TABLE IF NOT EXISTS external_source (
external_source_id UUID DEFAULT uuid_generate_v4 (),
external_source_name VARCHAR(255),
external_source_description VARCHAR(255),
PRIMARY KEY (external_source_id)
);

CREATE TABLE IF NOT EXISTS external_person (
external_id UUID DEFAULT uuid_generate_v4 (),
person_id UUID,
external_person_id VARCHAR(255),
external_source_id UUID,
PRIMARY KEY (external_id),
CONSTRAINT fk_ext_person_to_person FOREIGN KEY(person_id) REFERENCES person(person_id),
CONSTRAINT fk_ext_person_to_source FOREIGN KEY(external_source_id) REFERENCES external_source(external_source_id)
);

COMMIT;

INSERT INTO external_source (external_source_id, external_source_name, external_source_description)
VALUES ('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380b79','IRIS','LACDPH Surveillance System');
COMMIT;
18 changes: 0 additions & 18 deletions containers/record-linkage/migrations/V01_01__initial_schema.sql

This file was deleted.

2 changes: 1 addition & 1 deletion containers/record-linkage/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ typing_extensions==4.5.0
uvicorn
pydantic==1.10.9
fastapi==0.96.0
phdi @ git+https://github.com/CDCgov/phdi@main
phdi @ git+https://github.com/CDCgov/phdi@brady/update-mpi-schema
httpx
pathlib
pyway
Expand Down
104 changes: 104 additions & 0 deletions containers/record-linkage/seed_testdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# flake8: noqa
# fmt: off
import os
from app.config import get_settings

def set_mpi_env_vars():
os.environ["mpi_db_type"] = "postgres"
os.environ["mpi_dbname"] = "testdb"
os.environ["mpi_user"] = "postgres"
os.environ["mpi_password"] = "pw"
os.environ["mpi_host"] = "localhost"
os.environ["mpi_port"] = "5432"
get_settings.cache_clear()


set_mpi_env_vars()

from fastapi import status
from fastapi.testclient import TestClient
from app.main import app, run_migrations
from sqlalchemy import text
import json
import pathlib
from phdi.linkage.mpi import DIBBsMPIConnectorClient
# fmt: on
client = TestClient(app)


def load_test_bundle():
test_bundle = json.load(
open(
pathlib.Path(__file__).parent
/ "tests"
/ "assets"
/ "patient_bundle_to_link_with_mpi.json"
)
)
return test_bundle


def pop_mpi_env_vars():
os.environ.pop("mpi_db_type", None)
os.environ.pop("mpi_dbname", None)
os.environ.pop("mpi_user", None)
os.environ.pop("mpi_password", None)
os.environ.pop("mpi_host", None)
os.environ.pop("mpi_port", None)


def _clean_up():
MPI = DIBBsMPIConnectorClient()

with MPI.dal.engine.connect() as pg_connection:
pg_connection.execute(text("""DROP TABLE IF EXISTS external_person CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS external_source CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS address CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS phone_number CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS identifier CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS give_name CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS given_name CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS name CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS patient CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS person CASCADE;"""))
pg_connection.execute(text("""DROP TABLE IF EXISTS public.pyway;"""))
pg_connection.commit()
pg_connection.close()


def seed_testdb():
# Start with fresh tables
_clean_up()
set_mpi_env_vars()
run_migrations()
test_bundle = load_test_bundle()

for r in range(len(test_bundle["entry"])):
print(r)
if "resource" in test_bundle["entry"][r].keys():
if r == 1:
external_person_id = "KNOWN IRIS ID"
elif r == 3:
external_person_id = "ANOTHER KNOWN IRIS ID"
else:
external_person_id = None
bundle = {
"resourceType": "Bundle",
"identifier": {"value": "a very contrived FHIR bundle"},
"entry": [test_bundle["entry"][r]],
}
resp = client.post(
"/link-record",
json={
"bundle": bundle,
"use_enhanced": False,
"external_person_id": external_person_id,
},
)
print(resp.json())

pop_mpi_env_vars()


if __name__ == "__main__":
seed_testdb()
Loading

0 comments on commit 2144717

Please sign in to comment.