Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add incremental MPI seeding & time restraints #324

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 100 additions & 27 deletions scripts/Synapse/convertParquetMPI.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@
"from phdi.linkage.seed import convert_to_patient_fhir_resources\n",
"from datetime import date\n",
"import json\n",
"from pyspark.sql import SparkSession\n",
"import os\n",
"from datetime import datetime, timezone, timedelta\n",
"import pytz \n",
"import time\n",
"\n",
"spark = SparkSession.builder.appName(\"ProcessRowsInChunks\").getOrCreate()\n",
"\n",
"# Set up number of rows to be processed at a time\n",
"n_rows = 1000\n",
"\n",
"# Set up file client\n",
"storage_account = \"$STORAGE_ACCOUNT\"\n",
Expand All @@ -87,42 +97,103 @@
" print(\"Already mounted\")\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Read the MPI parquet data into a spark dataframe. Iterate over each row of patient data in the dataframe and convert to a FHIR bundle and associated iris_id. Create a POST request to the record linkage container with FHIR bundle and iris_id."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Convert data and write to blob storage\n",
"def convert_write_data(mpi_incoming_filename):\n",
" curr_date = date.today()\n",
" df = spark.read.parquet(mpi_incoming_filename)\n",
"def get_row_start(filename,patient_data_bucket,storage_account):\n",
" \"\"\"\n",
" Checks where in the seed (or large) file to start processing.\n",
" \"\"\"\n",
" row_count_filename = f\"last_row_added_to_mpi_{filename.split('.')[0]}.json\"\n",
" incoming_file_dir = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/\"\n",
"\n",
" incoming_files = mssparkutils.fs.ls(incoming_file_dir)\n",
" filenames = [file.name for file in incoming_files]\n",
"\n",
" if row_count_filename in filenames:\n",
"\n",
" l = mssparkutils.fs.head(incoming_file_dir + f\"{row_count_filename}\")\n",
" row_start = int(l.split(':')[-1][:-1])\n",
" \n",
" else:\n",
" row_start = 0\n",
" \n",
" return row_start, row_count_filename\n",
"\n",
"def is_valid_time_window():\n",
" \"\"\"\n",
" Checks that updating the MPI occurs outside the window in which eCR data is processed\n",
" \"\"\"\n",
" # Set the timezone to Pacific Time (PT)\n",
" pt_timezone = pytz.timezone(\"US/Pacific\")\n",
"\n",
" # Get the current time in the Pacific Time zone\n",
" current_time = datetime.now().astimezone(pt_timezone)\n",
" \n",
" # Define the time window (9:30am to 11:30am PT)\n",
" start_time = current_time.replace(hour=9, minute=30, second=0, microsecond=0)\n",
" end_time = current_time.replace(hour=11, minute=30, second=0, microsecond=0)\n",
" \n",
" # Check if the current time is NOT within the specified window when eCR data is likely being processed\n",
" valid_time = start_time <= current_time <= end_time\n",
"\n",
" return not valid_time\n",
"\n",
"\n",
"def process_rows_in_chunks(dataframe, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size):\n",
" \"\"\"\n",
" Processes rows to seed data in `n_rows` chunks outside the time window when eCR data is being processed.\n",
" \"\"\"\n",
" curr_date = date.today()\n",
" file_idx = 0\n",
" for row in df.collect():\n",
" file_idx += 1\n",
" total_rows = dataframe.count()\n",
" start = last_processed_row\n",
" idx = start\n",
"\n",
" while start < total_rows:\n",
"\n",
" iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n",
" fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n",
" if is_valid_time_window():\n",
" \n",
" # Process the chunk of data\n",
" for row in dataframe.collect()[start:start+chunk_size]:\n",
" idx +=1 \n",
" iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n",
" fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n",
"\n",
" data = {\n",
" 'bundle': fhir_bundle,\n",
" 'external_person_id': iris_id\n",
" }\n",
" data = {\n",
" 'bundle': fhir_bundle,\n",
" 'external_person_id': iris_id\n",
" }\n",
"\n",
" pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(file_idx)}.json\"\n",
" mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n",
" pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(idx)}.json\"\n",
" mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n",
"\n",
" start += chunk_size\n",
"\n",
" # Update the last processed row in the checkpoint file\n",
" last_row_data = {\"last_row_added_to_mpi\":idx}\n",
" mssparkutils.fs.put(f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{row_count_filename}\", json.dumps(last_row_data), True)\n",
"\n",
" else:\n",
" # Wait for a certain time before checking again\n",
" # Assuming a delay of 15 minutes\n",
" time.sleep(900) # Sleep for 15 minutes before rechecking\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Read in MPI seed data\n",
"df = spark.read.parquet(mpi_incoming_filename) \n",
"\n",
"convert_write_data(mpi_incoming_filename)"
"# Process rows in chunks of n_rows\n",
"last_processed_row, row_count_filename = get_row_start(filename,patient_data_bucket,storage_account)\n",
"process_rows_in_chunks(df, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size=n_rows)\n"
]
},
{
Expand All @@ -131,9 +202,11 @@
"metadata": {},
"outputs": [],
"source": [
"# Move file that triggered the MPI update event into the archive folder\n",
"destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{filename}\"\n",
"mssparkutils.fs.mv(src=mpi_incoming_filename,dest=destination,create_path=True)"
"# Move file that triggered the MPI update event and the row_count_filename file into the archive folder \n",
"for f in filename, row_count_filename:\n",
" source = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{f}\"\n",
" destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{f}\"\n",
" mssparkutils.fs.mv(src=source,dest=destination,create_path=True)"
]
}
],
Expand Down