diff --git a/scripts/Synapse/convertParquetMPI.ipynb b/scripts/Synapse/convertParquetMPI.ipynb index 6d18c9b9..9cb0ea3d 100644 --- a/scripts/Synapse/convertParquetMPI.ipynb +++ b/scripts/Synapse/convertParquetMPI.ipynb @@ -61,6 +61,16 @@ "from phdi.linkage.seed import convert_to_patient_fhir_resources\n", "from datetime import date\n", "import json\n", + "from pyspark.sql import SparkSession\n", + "import os\n", + "from datetime import datetime, timezone, timedelta\n", + "import pytz \n", + "import time\n", + "\n", + "spark = SparkSession.builder.appName(\"ProcessRowsInChunks\").getOrCreate()\n", + "\n", + "# Set up number of rows to be processed at a time\n", + "n_rows = 1000\n", "\n", "# Set up file client\n", "storage_account = \"$STORAGE_ACCOUNT\"\n", @@ -87,42 +97,103 @@ " print(\"Already mounted\")\n" ] }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Read the MPI parquet data into a spark dataframe. Iterate over each row of patient data in the dataframe and convert to a FHIR bundle and associated iris_id. Create a POST request to the record linkage container with FHIR bundle and iris_id." - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "# Convert data and write to blob storage\n", - "def convert_write_data(mpi_incoming_filename):\n", - " curr_date = date.today()\n", - " df = spark.read.parquet(mpi_incoming_filename)\n", + "def get_row_start(filename,patient_data_bucket,storage_account):\n", + " \"\"\"\n", + " Checks where in the seed (or large) file to start processing.\n", + " \"\"\"\n", + " row_count_filename = f\"last_row_added_to_mpi_{filename.split('.')[0]}.json\"\n", + " incoming_file_dir = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/\"\n", + "\n", + " incoming_files = mssparkutils.fs.ls(incoming_file_dir)\n", + " filenames = [file.name for file in incoming_files]\n", + "\n", + " if row_count_filename in filenames:\n", + "\n", + " l = mssparkutils.fs.head(incoming_file_dir + f\"{row_count_filename}\")\n", + " row_start = int(l.split(':')[-1][:-1])\n", + " \n", + " else:\n", + " row_start = 0\n", + " \n", + " return row_start, row_count_filename\n", + "\n", + "def is_valid_time_window():\n", + " \"\"\"\n", + " Checks that updating the MPI occurs outside the window in which eCR data is processed\n", + " \"\"\"\n", + " # Set the timezone to Pacific Time (PT)\n", + " pt_timezone = pytz.timezone(\"US/Pacific\")\n", + "\n", + " # Get the current time in the Pacific Time zone\n", + " current_time = datetime.now().astimezone(pt_timezone)\n", + " \n", + " # Define the time window (9:30am to 11:30am PT)\n", + " start_time = current_time.replace(hour=9, minute=30, second=0, microsecond=0)\n", + " end_time = current_time.replace(hour=11, minute=30, second=0, microsecond=0)\n", + " \n", + " # Check if the current time is NOT within the specified window when eCR data is likely being processed\n", + " valid_time = start_time <= current_time <= end_time\n", + "\n", + " return not valid_time\n", + "\n", + "\n", + "def process_rows_in_chunks(dataframe, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size):\n", + " \"\"\"\n", + " Processes rows to seed data in `n_rows` chunks outside the time window when eCR data is being processed.\n", + " \"\"\"\n", " curr_date = date.today()\n", - " file_idx = 0\n", - " for row in df.collect():\n", - " file_idx += 1\n", + " total_rows = dataframe.count()\n", + " start = last_processed_row\n", + " idx = start\n", + "\n", + " while start < total_rows:\n", "\n", - " iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n", - " fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n", + " if is_valid_time_window():\n", + " \n", + " # Process the chunk of data\n", + " for row in dataframe.collect()[start:start+chunk_size]:\n", + " idx +=1 \n", + " iris_id, fhir_bundle = convert_to_patient_fhir_resources(row.asDict())\n", + " fhir_bundle[\"meta\"] = {\"source\": \"uri:iris\"}\n", "\n", - " data = {\n", - " 'bundle': fhir_bundle,\n", - " 'external_person_id': iris_id\n", - " }\n", + " data = {\n", + " 'bundle': fhir_bundle,\n", + " 'external_person_id': iris_id\n", + " }\n", "\n", - " pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(file_idx)}.json\"\n", - " mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n", + " pre_filename = f\"abfss://{source_data_bucket}@{storage_account}.dfs.core.windows.net/fhir/lac_extract_{str(curr_date)}_{str(idx)}.json\"\n", + " mssparkutils.fs.put(pre_filename, json.dumps(data), True)\n", "\n", + " start += chunk_size\n", + "\n", + " # Update the last processed row in the checkpoint file\n", + " last_row_data = {\"last_row_added_to_mpi\":idx}\n", + " mssparkutils.fs.put(f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{row_count_filename}\", json.dumps(last_row_data), True)\n", + "\n", + " else:\n", + " # Wait for a certain time before checking again\n", + " # Assuming a delay of 15 minutes\n", + " time.sleep(900) # Sleep for 15 minutes before rechecking\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read in MPI seed data\n", + "df = spark.read.parquet(mpi_incoming_filename) \n", "\n", - "convert_write_data(mpi_incoming_filename)" + "# Process rows in chunks of n_rows\n", + "last_processed_row, row_count_filename = get_row_start(filename,patient_data_bucket,storage_account)\n", + "process_rows_in_chunks(df, last_processed_row, patient_data_bucket, storage_account, row_count_filename, chunk_size=n_rows)\n" ] }, { @@ -131,9 +202,11 @@ "metadata": {}, "outputs": [], "source": [ - "# Move file that triggered the MPI update event into the archive folder\n", - "destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{filename}\"\n", - "mssparkutils.fs.mv(src=mpi_incoming_filename,dest=destination,create_path=True)" + "# Move file that triggered the MPI update event and the row_count_filename file into the archive folder \n", + "for f in filename, row_count_filename:\n", + " source = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/{f}\"\n", + " destination = f\"abfss://{patient_data_bucket}@{storage_account}.dfs.core.windows.net/archive/{f}\"\n", + " mssparkutils.fs.mv(src=source,dest=destination,create_path=True)" ] } ],