Skip to content

Commit

Permalink
Add incremental MPI seeding & time restraints (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-goggins authored Nov 2, 2023
1 parent e02dcb7 commit 5c8d527
Showing 1 changed file with 100 additions and 27 deletions.
127 changes: 100 additions & 27 deletions scripts/Synapse/convertParquetMPI.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@
"from phdi.linkage.seed import convert_to_patient_fhir_resources\n",
"from datetime import date\n",
"import json\n",
"from pyspark.sql import SparkSession\n",
"import os\n",
"from datetime import datetime, timezone, timedelta\n",
"import pytz \n",
"import time\n",
"\n",
"spark = SparkSession.builder.appName(\"ProcessRowsInChunks\").getOrCreate()\n",
"\n",
"# Set up number of rows to be processed at a time\n",
"n_rows = 1000\n",
"\n",
"# Set up file client\n",
"storage_account = \"$STORAGE_ACCOUNT\"\n",
Expand All @@ -87,42 +97,103 @@
" print(\"Already mounted\")\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Read the MPI parquet data into a spark dataframe. Iterate over each row of patient data in the dataframe and convert to a FHIR bundle and associated iris_id. Create a POST request to the record linkage container with FHIR bundle and iris_id."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Convert data and write to blob storage\n",
"def convert_write_data(mpi_incoming_filename):\n",
" curr_date = date.today()\n",
" df = spark.read.parquet(mpi_incoming_filename)\n",
"def get_row_start(filename,patient_data_bucket,storage_account):\n",
" \"\"\"\n",
" Checks where in the seed (or large) file to start processing.\n",
" \"\"\"\n",
" row_count_filename = f\"last_row_added_to_mpi_{filename.split('.')[0]}.json\"\n",
" incoming_file_dir = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/\"\n",
"\n",
" incoming_files = mssparkutils.fs.ls(incoming_file_dir)\n",
" filenames = [file.name for file in incoming_files]\n",
"\n",
" if row_count_filename in filenames:\n",
"\n",
" l = mssparkutils.fs.head(incoming_file_dir + f\"{row_count_filename}\")\n",
" row_start = int(l.split(':')[-1][:-1])\n",
" \n",
" else:\n",
" row_start = 0\n",
" \n",
" return row_start, row_count_filename\n",
"\n",
"def is_valid_time_window():\n",
" \"\"\"\n",
" Checks that updating the MPI occurs outside the window in which eCR data is processed\n",
" \"\"\"\n",
" # Set the timezone to Pacific Time (PT)\n",
" pt_timezone = pytz.timezone(\"US/Pacific\")\n",
"\n",
" # Get the current time in the Pacific Time zone\n",
" current_time = datetime.now().astimezone(pt_timezone)\n",
" \n",
" # Define the time window (9:30am to 11:30am PT)\n",
" start_time = current_time.replace(hour=9, minute=30, second=0, microsecond=0)\n",
" end_time = current_time.replace(hour=11, minute=30, second=0, microsecond=0)\n",
" \n",
" # Check if the current time is NOT within the specified window when eCR data is likely being processed\n",
" valid_time = start_time <= current_time <= end_time\n",
"\n",
" return not valid_time\n",
"\n",
"\n",
"def process_rows_in_chunks(dataframe, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size):\n",
" \"\"\"\n",
" Processes rows to seed data in `n_rows` chunks outside the time window when eCR data is being processed.\n",
" \"\"\"\n",
" curr_date = date.today()\n",
" file_idx = 0\n",
" for row in df.collect():\n",
" file_idx += 1\n",
" total_rows = dataframe.count()\n",
" start = last_processed_row\n",
" idx = start\n",
"\n",
" while start < total_rows:\n",
"\n",
" iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n",
" fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n",
" if is_valid_time_window():\n",
" \n",
" # Process the chunk of data\n",
" for row in dataframe.collect()[start:start+chunk_size]:\n",
" idx +=1 \n",
" iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n",
" fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n",
"\n",
" data = {\n",
" 'bundle': fhir_bundle,\n",
" 'external_person_id': iris_id\n",
" }\n",
" data = {\n",
" 'bundle': fhir_bundle,\n",
" 'external_person_id': iris_id\n",
" }\n",
"\n",
" pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(file_idx)}.json\"\n",
" mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n",
" pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(idx)}.json\"\n",
" mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n",
"\n",
" start += chunk_size\n",
"\n",
" # Update the last processed row in the checkpoint file\n",
" last_row_data = {\"last_row_added_to_mpi\":idx}\n",
" mssparkutils.fs.put(f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{row_count_filename}\", json.dumps(last_row_data), True)\n",
"\n",
" else:\n",
" # Wait for a certain time before checking again\n",
" # Assuming a delay of 15 minutes\n",
" time.sleep(900) # Sleep for 15 minutes before rechecking\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Read in MPI seed data\n",
"df = spark.read.parquet(mpi_incoming_filename) \n",
"\n",
"convert_write_data(mpi_incoming_filename)"
"# Process rows in chunks of n_rows\n",
"last_processed_row, row_count_filename = get_row_start(filename,patient_data_bucket,storage_account)\n",
"process_rows_in_chunks(df, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size=n_rows)\n"
]
},
{
Expand All @@ -131,9 +202,11 @@
"metadata": {},
"outputs": [],
"source": [
"# Move file that triggered the MPI update event into the archive folder\n",
"destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{filename}\"\n",
"mssparkutils.fs.mv(src=mpi_incoming_filename,dest=destination,create_path=True)"
"# Move file that triggered the MPI update event and the row_count_filename file into the archive folder \n",
"for f in filename, row_count_filename:\n",
" source = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{f}\"\n",
" destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{f}\"\n",
" mssparkutils.fs.mv(src=source,dest=destination,create_path=True)"
]
}
],
Expand Down

0 comments on commit 5c8d527

Please sign in to comment.