From 032218fa1cf31c8ddb3c24874e1ecec3a4d70745 Mon Sep 17 00:00:00 2001 From: m-goggins Date: Thu, 2 Nov 2023 07:59:56 -0400 Subject: [PATCH 1/4] add incremental MPI seeding & time restraints --- scripts/Synapse/convertParquetMPI.ipynb | 123 ++++++++++++++++++------ 1 file changed, 96 insertions(+), 27 deletions(-) diff --git a/scripts/Synapse/convertParquetMPI.ipynb b/scripts/Synapse/convertParquetMPI.ipynb index 6d18c9b9..16100b96 100644 --- a/scripts/Synapse/convertParquetMPI.ipynb +++ b/scripts/Synapse/convertParquetMPI.ipynb @@ -61,6 +61,16 @@ "from phdi.linkage.seed import convert_to_patient_fhir_resources\n", "from datetime import date\n", "import json\n", + "from pyspark.sql import SparkSession\n", + "import os\n", + "from datetime import datetime, timezone, timedelta\n", + "import pytz \n", + "import time\n", + "\n", + "spark = SparkSession.builder.appName(\"ProcessRowsInChunks\").getOrCreate()\n", + "\n", + "# Set up number of rows to be processed at a time\n", + "n_rows = 3\n", "\n", "# Set up file client\n", "storage_account = \"$STORAGE_ACCOUNT\"\n", @@ -87,42 +97,99 @@ " print(\"Already mounted\")\n" ] }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Read the MPI parquet data into a spark dataframe. Iterate over each row of patient data in the dataframe and convert to a FHIR bundle and associated iris_id. Create a POST request to the record linkage container with FHIR bundle and iris_id." - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "# Convert data and write to blob storage\n", - "def convert_write_data(mpi_incoming_filename):\n", - " curr_date = date.today()\n", - " df = spark.read.parquet(mpi_incoming_filename)\n", + "def get_row_start(filename,patient_data_bucket,storage_account):\n", + " \"\"\"\n", + " Checks where in the seed (or large) file to start processing.\n", + " \"\"\"\n", + " row_count_filename = f\"last_row_added_to_mpi_{filename.split('.')[0]}.json\"\n", + " incoming_file_dir = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/\"\n", + "\n", + " incoming_files = mssparkutils.fs.ls(incoming_file_dir)\n", + " filenames = [file.name for file in incoming_files]\n", + " if row_count_filename in filenames:\n", + " l = mssparkutils.fs.head(incoming_file_dir + f\"{row_count_filename}\")\n", + " row_start = int(l.split(':')[-1][:-1])\n", + " else:\n", + " row_start = 0\n", + " \n", + " return row_start, row_count_filename\n", + "\n", + "def is_valid_time_window():\n", + " \"\"\"\n", + " Checks that updating the MPI occurs outside the window in which eCR data is processed\n", + " \"\"\"\n", + " # Set the timezone to Pacific Time (PT)\n", + " pt_timezone = pytz.timezone(\"US/Pacific\")\n", + "\n", + " # Get the current time in the Pacific Time zone\n", + " current_time = datetime.now().astimezone(pt_timezone)\n", + " \n", + " # Define the time window (9:30am to 11:30am PT)\n", + " start_time = current_time.replace(hour=10, minute=8, second=0, microsecond=0)\n", + " end_time = current_time.replace(hour=10, minute=15, second=0, microsecond=0)\n", + " \n", + " # Check if the current time is NOT within the specified window when eCR data is likely being processed\n", + " valid_time = start_time <= current_time <= end_time\n", + " return not valid_time\n", + "\n", + "\n", + "def process_rows_in_chunks(dataframe, last_processed_row, patient_data_bucket, storage_account, row_count_filename,chunk_size):\n", + " \"\"\"\n", + " Processes rows to seed data in `n_rows` chunks outside the time window when eCR data is being processed.\n", + " \"\"\"\n", " curr_date = date.today()\n", - " file_idx = 0\n", - " for row in df.collect():\n", - " file_idx += 1\n", + " total_rows = dataframe.count()\n", + " start = last_processed_row\n", + " idx = start\n", + " total_rows = 10\n", + "\n", + " while start < total_rows:\n", + " if is_valid_time_window():\n", + " \n", + " # Process the chunk of data (here just printing the content)\n", + " for row in dataframe.collect()[start:start+chunk_size]:\n", + " idx +=1 \n", + " iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n", + " fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n", "\n", - " iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n", - " fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n", + " data = {\n", + " 'bundle': fhir_bundle,\n", + " 'external_person_id': iris_id\n", + " }\n", "\n", - " data = {\n", - " 'bundle': fhir_bundle,\n", - " 'external_person_id': iris_id\n", - " }\n", + " pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(idx)}.json\"\n", + " mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n", "\n", - " pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(file_idx)}.json\"\n", - " mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n", + " start += chunk_size\n", "\n", + " # Update the last processed row in the checkpoint file\n", + " last_row_data = {\"last_row_added_to_mpi\":idx}\n", + " mssparkutils.fs.put(f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{row_count_filename}\", json.dumps(last_row_data), True)\n", + "\n", + " else:\n", + " # Wait for a certain time before checking again\n", + " # Assuming a delay of 15 minutes\n", + " time.sleep(900) # Sleep for 15 minutes before rechecking\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read in MPI seed data\n", + "df = spark.read.parquet(mpi_incoming_filename) \n", "\n", - "convert_write_data(mpi_incoming_filename)" + "# Process rows in chunks of n_rows\n", + "last_processed_row, row_count_filename = get_row_start(filename,patient_data_bucket,storage_account)\n", + "process_rows_in_chunks(df, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size=n_rows)\n" ] }, { @@ -131,9 +198,11 @@ "metadata": {}, "outputs": [], "source": [ - "# Move file that triggered the MPI update event into the archive folder\n", - "destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{filename}\"\n", - "mssparkutils.fs.mv(src=mpi_incoming_filename,dest=destination,create_path=True)" + "# Move file that triggered the MPI update event and the row_count_filename file into the archive folder \n", + "for f in filename, row_count_filename:\n", + " source = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{f}\"\n", + " destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{f}\"\n", + " mssparkutils.fs.mv(src=source,dest=destination,create_path=True)" ] } ], From 4028e7e74c005dd95be24cbcd00d7df6dd010ef1 Mon Sep 17 00:00:00 2001 From: m-goggins Date: Thu, 2 Nov 2023 08:01:43 -0400 Subject: [PATCH 2/4] clean up & fix time window --- scripts/Synapse/convertParquetMPI.ipynb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scripts/Synapse/convertParquetMPI.ipynb b/scripts/Synapse/convertParquetMPI.ipynb index 16100b96..5234765e 100644 --- a/scripts/Synapse/convertParquetMPI.ipynb +++ b/scripts/Synapse/convertParquetMPI.ipynb @@ -131,8 +131,8 @@ " current_time = datetime.now().astimezone(pt_timezone)\n", " \n", " # Define the time window (9:30am to 11:30am PT)\n", - " start_time = current_time.replace(hour=10, minute=8, second=0, microsecond=0)\n", - " end_time = current_time.replace(hour=10, minute=15, second=0, microsecond=0)\n", + " start_time = current_time.replace(hour=9, minute=30, second=0, microsecond=0)\n", + " end_time = current_time.replace(hour=11, minute=30, second=0, microsecond=0)\n", " \n", " # Check if the current time is NOT within the specified window when eCR data is likely being processed\n", " valid_time = start_time <= current_time <= end_time\n", @@ -147,7 +147,6 @@ " total_rows = dataframe.count()\n", " start = last_processed_row\n", " idx = start\n", - " total_rows = 10\n", "\n", " while start < total_rows:\n", " if is_valid_time_window():\n", From 3b4c50d2e4447679a3c0b5e987bddbcfcd4df608 Mon Sep 17 00:00:00 2001 From: m-goggins Date: Thu, 2 Nov 2023 08:06:20 -0400 Subject: [PATCH 3/4] set n_rows to 1000 --- scripts/Synapse/convertParquetMPI.ipynb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/Synapse/convertParquetMPI.ipynb b/scripts/Synapse/convertParquetMPI.ipynb index 5234765e..4586ac15 100644 --- a/scripts/Synapse/convertParquetMPI.ipynb +++ b/scripts/Synapse/convertParquetMPI.ipynb @@ -70,7 +70,7 @@ "spark = SparkSession.builder.appName(\"ProcessRowsInChunks\").getOrCreate()\n", "\n", "# Set up number of rows to be processed at a time\n", - "n_rows = 3\n", + "n_rows = 1000\n", "\n", "# Set up file client\n", "storage_account = \"$STORAGE_ACCOUNT\"\n", @@ -139,7 +139,7 @@ " return not valid_time\n", "\n", "\n", - "def process_rows_in_chunks(dataframe, last_processed_row, patient_data_bucket, storage_account, row_count_filename,chunk_size):\n", + "def process_rows_in_chunks(dataframe, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size):\n", " \"\"\"\n", " Processes rows to seed data in `n_rows` chunks outside the time window when eCR data is being processed.\n", " \"\"\"\n", From a0883b6cd47625c91c491c4b9a1387589e32fb76 Mon Sep 17 00:00:00 2001 From: m-goggins Date: Thu, 2 Nov 2023 08:08:16 -0400 Subject: [PATCH 4/4] clean up pt2 --- scripts/Synapse/convertParquetMPI.ipynb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/scripts/Synapse/convertParquetMPI.ipynb b/scripts/Synapse/convertParquetMPI.ipynb index 4586ac15..9cb0ea3d 100644 --- a/scripts/Synapse/convertParquetMPI.ipynb +++ b/scripts/Synapse/convertParquetMPI.ipynb @@ -112,9 +112,12 @@ "\n", " incoming_files = mssparkutils.fs.ls(incoming_file_dir)\n", " filenames = [file.name for file in incoming_files]\n", + "\n", " if row_count_filename in filenames:\n", + "\n", " l = mssparkutils.fs.head(incoming_file_dir + f\"{row_count_filename}\")\n", " row_start = int(l.split(':')[-1][:-1])\n", + " \n", " else:\n", " row_start = 0\n", " \n", @@ -136,6 +139,7 @@ " \n", " # Check if the current time is NOT within the specified window when eCR data is likely being processed\n", " valid_time = start_time <= current_time <= end_time\n", + "\n", " return not valid_time\n", "\n", "\n", @@ -149,9 +153,10 @@ " idx = start\n", "\n", " while start < total_rows:\n", + "\n", " if is_valid_time_window():\n", " \n", - " # Process the chunk of data (here just printing the content)\n", + " # Process the chunk of data\n", " for row in dataframe.collect()[start:start+chunk_size]:\n", " idx +=1 \n", " iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n",