Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert non-null MRN & SSN identifiers and non-null phone numbers #343

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 160 additions & 3 deletions scripts/Synapse/convertParquetMPI.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"metadata": {},
"outputs": [],
"source": [
"pip install --upgrade pip"
"# pip install --upgrade pip"
]
},
{
Expand All @@ -25,7 +25,7 @@
"metadata": {},
"outputs": [],
"source": [
"pip install git+https://github.com/CDCgov/phdi@main"
"# pip install git+https://github.com/CDCgov/phdi@main"
]
},
{
Expand All @@ -50,6 +50,163 @@
"filename=\"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# This script converts patient data from parquet to patient FHIR resources.\n",
"from typing import Dict, Tuple\n",
"import uuid\n",
"from datetime import datetime\n",
"\n",
"\n",
"def extract_given_name(data: Dict):\n",
" first_name = data.get(\"first_name\", None)\n",
" middle_name = data.get(\"middle_name\", None)\n",
"\n",
" given_names = []\n",
"\n",
" for name in [first_name, middle_name]:\n",
" if name is not None:\n",
" for n in name.split():\n",
" given_names.append(n)\n",
"\n",
" if len(given_names) > 0:\n",
" return given_names\n",
" else:\n",
" return None\n",
"\n",
"\n",
"def adjust_birthdate(data: Dict):\n",
" # TODO: remove this function and pass in the `format` parameter to dob\n",
" # standardization in ReadSourceData for LAC\n",
" format = \"%d%b%Y:00:00:00.000\"\n",
" dob = data.get(\"birthdate\", None)\n",
" if dob is not None and \":\" in dob:\n",
" datetime_str = datetime.strptime(dob, format)\n",
" dob = datetime_str.strftime(\"%Y-%m-%d\")\n",
" return dob\n",
"\n",
"def convert_to_patient_fhir_resources(data: Dict) -> Tuple:\n",
" \"\"\"\n",
" Converts and returns a row of patient data into patient resource in a FHIR-formatted\n",
" patient resouce with a newly generated patient id as well as the\n",
" `external_person_id`.\n",
"\n",
" :param data: Dictionary of patient data that optionionally includes the following\n",
" fields: mrn, ssn, first_name, middle_name, last_name, home_phone, cell-phone, sex,\n",
" birthdate, address, city, state, zip.\n",
" :return: Tuple of the `external_person_id` and FHIR-formatted patient resource.\n",
" \"\"\"\n",
"\n",
" patient_id = str(uuid.uuid4())\n",
"\n",
" optional_data = {\n",
" \"mrn\": data.get(\"mrn\", None),\n",
" \"ssn\": data.get(\"ssn\", None),\n",
" \"home_phone\": data.get(\"home_phone\", None),\n",
" \"cell_phone\": data.get(\"cell_phone\", None),\n",
" \"email\": data.get(\"email\", None),\n",
" }\n",
" identifiers = []\n",
" telecom = []\n",
"\n",
" # Iterate through each patient and convert patient data to FHIR resource\n",
" patient_resource = {\n",
" \"resourceType\": \"Patient\",\n",
" \"id\": f\"{patient_id}\",\n",
" \"name\": [\n",
" {\n",
" \"family\": f\"{data.get('last_name',None)}\",\n",
" \"given\": extract_given_name(data),\n",
" }\n",
" ],\n",
" \"gender\": f\"{data.get('sex',None)}\",\n",
" \"birthDate\": adjust_birthdate(data),\n",
" \"address\": [\n",
" {\n",
" \"use\": \"home\",\n",
" \"line\": [f\"{data.get('address',None)}\"],\n",
" \"city\": f\"{data.get('city',None)}\",\n",
" \"state\": f\"{data.get('state',None)}\",\n",
" \"postalCode\": f\"{data.get('zip',None)}\",\n",
" }\n",
" ],\n",
" }\n",
"\n",
" for col, value in optional_data.items():\n",
" if value is not None:\n",
" if col == \"mrn\":\n",
" mrn = {\n",
" \"type\": {\n",
" \"coding\": [\n",
" {\n",
" \"system\": \"http://terminology.hl7.org/CodeSystem/v2-0203\",\n",
" \"code\": \"MR\",\n",
" }\n",
" ]\n",
" },\n",
" \"value\": value,\n",
" }\n",
" identifiers.append(mrn)\n",
" elif col == \"ssn\":\n",
" ssn = {\n",
" \"type\": {\n",
" \"coding\": [\n",
" {\n",
" \"system\": \"http://terminology.hl7.org/CodeSystem/v2-0203\",\n",
" \"code\": \"SS\",\n",
" }\n",
" ]\n",
" },\n",
" \"value\": value,\n",
" }\n",
" identifiers.append(ssn)\n",
" elif col == \"home_phone\":\n",
" home_phone = (\n",
" {\n",
" \"system\": \"phone\",\n",
" \"value\": value,\n",
" \"use\": \"home\",\n",
" },\n",
" )\n",
" telecom.append(home_phone)\n",
" elif col == \"cell_phone\":\n",
" cell_phone = {\n",
" \"system\": \"phone\",\n",
" \"value\": value,\n",
" \"use\": \"cell\",\n",
" }\n",
" telecom.append(cell_phone)\n",
"\n",
" elif col == \"email\":\n",
" email = {\"value\": value, \"system\": \"email\"}\n",
" telecom.append(email)\n",
"\n",
" if len(identifiers) > 0:\n",
" patient_resource[\"identifier\"] = identifiers\n",
" if len(telecom) > 0:\n",
" patient_resource[\"telecom\"] = telecom\n",
"\n",
" fhir_bundle = {\n",
" \"resourceType\": \"Bundle\",\n",
" \"type\": \"batch\",\n",
" \"id\": str(uuid.uuid4()),\n",
" \"entry\": [\n",
" {\n",
" \"fullUrl\": f\"urn:uuid:{patient_id}\",\n",
" \"resource\": patient_resource,\n",
" \"request\": {\"method\": \"PUT\", \"url\": f\"Patient/{patient_id}\"},\n",
" },\n",
" ],\n",
" }\n",
"\n",
" external_person_id = data.get(\"person_id\", None)\n",
" return (external_person_id, fhir_bundle)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -67,7 +224,7 @@
"outputs": [],
"source": [
"from notebookutils import mssparkutils\n",
"from phdi.linkage.seed import convert_to_patient_fhir_resources\n",
"# from phdi.linkage.seed import convert_to_patient_fhir_resources\n",
"from datetime import date\n",
"import json\n",
"from pyspark.sql import SparkSession\n",
Expand Down
Loading