The Universal Pipeline is the core of what ReportStream is, and the two terms are sometimes used interchangeably. The Universal Pipeline is responsible for receiving public health data from a sender and sending it to one or more receivers. The pipeline was designed to be agnostic of:
- data formats - At present, the pipeline can receive and send HL7v2.5.1 and FHIR formats
- data types - While presently only handling the routing of ELR (Laboratory Data), The pipeline was designed to be able to process the six core data sources as identified by PHDS:
- Case data
- Laboratory data
- Emergency department data
- Vital statistics data
- Immunization data
- Healthcare capacity and utilization data
The rest of this document aims to explore the high level architecture of the Universal Pipeline and how exactly it can meet the promises outlined above!
In order to handle different data formats and types in a scalable and maintainable way, the pipeline has been broken up into the following independent "steps":
flowchart LR;
Receive-->Convert;
Convert-->Destination Filter;
Destination Filter-->Receiver Filter;
Receiver Filter-->Translate;
Translate-->Batch;
Batch-->Send;
The sections listed below will aim to describe each step of the pipeline in technical detail. Each step will refer to its part of the detailed architecture diagram found here. See Pipeline Steps Overview for a high level overview of how the different steps combine to form the core functionality of the Universal Pipeline.
Each pipeline step runs as its own Azure Function (equivalent to lambdas in AWS) in ReportStream's Microsoft Azure Account. This means ReportStream and its Universal Pipeline are architected for, and deployed to, the cloud. The Universal Pipeline is horizontally and vertically scalable. Azure manages the life cycle of each function and dynamically decides how many of each function to instantiate based on load - ensuring ReportStream can handle even peak COVID reporting numbers!
The data that is submitted and flows through ReportStream is referred to as a Report
. A ReportStream Report can contain one or more ReportStream Items. An Item
is a single HL7v2 message or FHIR bundle representing some real world data, like a laboratory test result (ELR data).
The Universal Pipeline is often referred to as the "FHIR" pipeline due to the Convert step translating received messages into the FHIR format. This allows most logic in the pipeline to run on only one data format instead of having to have separate logic for each format. FHIR was chosen to be the internal format of the Universal Pipeline due to it being the future format of healthcare data exchange and because of its modularity, flexibility, and mature libraries.
Each step in the pipeline is its own step precisely because it is in charge of a unique and specific task. For example, the Translate step translates the report from FHIR format to the format that a given receiver has specified via their settings. The Convert step, on the other hand, will convert the report that was received from a sender to the Universal Pipeline's internal format of FHIR so that subsequent steps can perform their function on a normalized format. While the pipeline steps perform different tasks, they generally all:
- Download the report outputted by the previous step from the step's specific folder in ReportStream's internal Azure Storage Container
- Modify the downloaded report or perform some other action. In the case of the Receive step, it will just upload what it received from the client POST request
- Upload the modified report to the next step's associated folder in ReportStream's internal Azure Storage Container
- Schedule the next step in the pipeline to run by placing a message on the AQS queue. This is how most steps are scheduled to be run. See Universal Pipeline AQS Usage for more information on how AQS is used in the Universal Pipeline.
- Update the ReportStream database with metadata concerning the report and its item(s) the step processed. This metadata is used by the history endpoint to communicate the progress and status of data as it flows through the pipeline
There are a few exceptions to the rules outlined above:
- The Receive step, since it is the first step in the pipeline, is not executed via AQS but by a HTTP POST request to
/api/reports
or/api/waters
. It also does not modify the report at all, rather just places the report it received on the queue - The Batch step runs on its own timer and looks for reports that have passed the Translate step successfully
- The Send step, since it is the last step in the pipeline, will not place anything on the AQS queue
The folder structure where reports are stored as they flow through the pipelines is shown below (screenshot from the Azure portal)
Each step in the Universal Pipeline, aside from Send, will upload the modified version of the report it received to the folder associated with the next step in the pipeline. This is done using the BlobAccess
object. For example, the Convert step uploads its output, the converted FHIR bundle, like so:
// upload to blobstore
val bodyBytes = FhirTranscoder.encode(bundle).toByteArray()
val blobInfo = BlobAccess.uploadBody(
MimeFormat.FHIR,
bodyBytes,
report.name,
message.blobSubFolderName,
nextEvent.eventAction
)
The resulting blobInfo.blobUrl
is returned as part of the uploadBody
response object and gets passed to the object that is sent to the AQS queue, allowing the next step in the pipeline to download the output of the previous step! See Universal Pipeline AQS Usage for more information.
Each pipeline step defined in FHIRFunctions.kt
, besides Receive, that is specific to the Universal Pipeline (Batch and Send are old steps shared with the legacy pipeline), use the QueueTrigger
decorator to allow them to be executed via the AQS mechanism in Azure. For example, the Convert step's decorator looks like so:
@QueueTrigger(name = "message", queueName = elrConvertQueueName)
elrConvertQueueName
is set to elr-fhir-convert, which can be found in the azure portal amongst other queues:
When a function has completed successfully, is will use the QueueAccess
object to send a message to AQS for the next step to run. For example, the Convert step will schedule the Destination Filter step for each new Report it created from the original received Report:
messagesToSend.forEach {
this.queue.sendMessage(elrRoutingQueueName, it.serialize())
}
In the Convert case above, messagesToSend
is a list of FHIRConvertMessage
objects which contain the id of the report and some other information that will get sent to the next step in the pipeline. This allows the next step in the pipeline to know which report to work on (report.id
) and how to get it (blobInfo.blobUrl
). In the case of Convert, this code looks like so:
messagesToSend.add(
FhirConvertMessage(
report.id,
blobInfo.blobUrl,
BlobUtils.digestToString(blobInfo.digest),
message.blobSubFolderName,
message.topic
)
)
The four queues specific to the Universal Pipeline are:
- elr-fhir-convert
- elr-fhir-destination-filter
- elr-fhir-receiver-filter
- elr-fhir-translate
The two queues shared with Legacy Pipeline are:
- batch
- send
Each step is responsible for validating its own actions. For example, if the Convert step runs into a problem translating the received report into the Universal Pipeline's internal FHIR format, it is responsible for updating the metadata in the database, specifically the action_log
table, with what error occurred. The action_log
table is one of the sources of information for the history endpoint, which means it is user facing.
While uncommon, a pipeline step may encounter an unexpected failure at some point. If this happens, the AQS message that initiated the step shall remain on the queue and the AQS message will run again at some point in the future. The message will be re-queued five times before the message is moved to the poison queue. The issue can then be analyzed and fixed before the message can then be re-added to the queue.
This retry behavior works differently for the batch and send.
In addition to the built-in AQS retry mechanism, the unexpected failure will also get logged to the Azure logs where it can be troubleshooted.
Before continuing with this section, make sure you understand the concept of a Report and Item in ReportStream!
Generally, each step takes in one Report, performs some operation, and outputs a different Report, referred to as a Child Report
. In some steps, like the Convert step, the received Report may be split up into multiple child reports! For the Convert step specifically, this is the case when a Report contains multiple items. The Convert step will split the Report into multiple Reports, so that each Report contains only one item - this is done to prepare for the Destination Filter, Receiver Filter, and Translate steps.
Because each step, in general, creates a new Report, it becomes necessary to track the descendants and ancestors of a particular Report to answer important questions such as:
- Where did the submitted Report get sent?
- Did the submitted Report get delivered?
In order to answer the questions above, each step will record information regarding the actions it took, commonly referred to in ReportStream as metadata
. The metadata is stored in ReportStream's internal Postgres database and powers API endpoints like /history
to help ReportStream clients (senders) answer questions related to the processing of a particular report.
The Universal Pipeline does NOT support the registering of callbacks and does NOT implement any type of ACK/NACK system, instead it answers the question of "What happened to my report" via the REST history endpoint (HTTP GET) located at /api/waters/report/{report-id}/history
. The information provided by the history endpoint is captured as the report(s) flow through the pipeline and is referred to as metadata
The client (sender in our case) is responsible for polling the history endpoint to ensure either success or failure of their message. ReportStream will not automatically alert senders of changes to their Report's status.
As a Report flows through the Universal Pipeline, metadata such as the following is captured and stored in the internal Postgres database:
- The name of the sender
- The name of the receiver(s)
- Whether the report was successfully sent
- Any errors or warnings associated with a report
- Was a report split up at any point in the pipeline? If so, what are the new reports?
When the history endpoint is called, it will return a JSON object representing the aforementioned metadata stored in the database. This is the same object that is returned by the reports or waters endpoints when submitting a report, only it may contain more information that is not known at the time a report is submitted, like the name of the receiver(s). To better understand the history endpoint, see the example below.
Make sure to check out the Swagger documentation to learn how to execute the various endpoints provided by ReportStream and view their documentation, including the history endpoint!
Step 1: User submits a report via the reports API and gets the following object back in the response. There are two very important things to note here:
- The
overallStatus
isReceived
, meaning RS accepted the file and it is awaiting further processing - The
id
is a generated UUID that uniquely identifies the submitted report. This ID is used in the history query
{
"id": "5036f90b-edd5-4a93-9edd-a71d1fa8fba1",
"submissionId": 980,
"overallStatus": "Received",
"timestamp": "2023-07-25T16:42:26.566Z",
"plannedCompletionAt": null,
"actualCompletionAt": null,
"sender": "CDC-ELIMS-HL7.CDC-ELIMS-SENDER",
"reportItemCount": 1,
"errorCount": 0,
"warningCount": 0,
"httpStatus": 201,
"destinations": [],
"actionName": "receive",
"externalName": null,
"reportId": "5036f90b-edd5-4a93-9edd-a71d1fa8fba1",
"topic": "full-elr",
"errors": [],
"warnings": [],
"destinationCount": 0
}
Step 2: Some time later, could be right away or could be days from the original submission, the user calls the history endpoint with the id returned from the submission query (5036f90b-edd5-4a93-9edd-a71d1fa8fba1
) and gets the following object back in the response. The important things to note here are:
overallStatus
changed fromReceived
toWaiting to Deliver
. This indicates the report made it all the way from the Receive step in the pipeline to the Translate step and is now waiting on the Batch function to run. Once the Batch and Send steps run, theoverallStatus
will be set toDelivered
.destinations
was populated with three receiver objects, indicating the Destination Filter step determined the report contained items that matched the initial topic and jurisdiction filters of those receivers. The filters for the CDC-ELIMS-HL7.CDC-ELIMS-RECEIVER organization allowed the item in the report through (itemCount
matchesitemCountBeforeQualityFiltering
) but the filters for flexion.simulated-lab-2 and flexion.simulated-lab filtered out the item (itemCount
is 0), and thefilteredReportRows
element indicates why. In this case, it was the processing mode filter that determined the item should not go through.destinationCount
is set to1
, meaning the item in the submitted report passed all the filters for one receiver, in this case CDC-ELIMS-HL7.CDC-ELIMS-RECEIVER.
{
"id": "5036f90b-edd5-4a93-9edd-a71d1fa8fba1",
"submissionId": 980,
"overallStatus": "Waiting to Deliver",
"timestamp": "2023-07-25T16:42:26.566Z",
"plannedCompletionAt": null,
"actualCompletionAt": null,
"sender": "CDC-ELIMS-HL7.CDC-ELIMS-SENDER",
"reportItemCount": 1,
"errorCount": 0,
"warningCount": 0,
"httpStatus": 201,
"destinations": [
{
"organization": "Centers for Disease Control ELIMS System",
"organization_id": "CDC-ELIMS-HL7",
"service": "CDC-ELIMS-RECEIVER",
"itemCount": 1,
"itemCountBeforeQualityFiltering": 1,
"filteredReportRows": [],
"filteredReportItems": [],
"sentReports": [],
"downloadedReports": []
},
{
"organization": "Flexion, Inc.",
"organization_id": "flexion",
"service": "simulated-lab-2",
"itemCount": 0,
"itemCountBeforeQualityFiltering": 1,
"filteredReportRows": [
"For flexion.simulated-lab-2, filter (default filter) [Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code.exists() and Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code = 'P'][] filtered out item 3003786103_4988249_33033"
],
"filteredReportItems": [
{
"filterType": "PROCESSING_MODE_FILTER",
"filterName": "(default filter) [Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code.exists() and Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code = 'P']",
"filteredTrackingElement": "3003786103_4988249_33033",
"filterArgs": [],
"message": "For flexion.simulated-lab-2, filter (default filter) [Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code.exists() and Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code = 'P'][] filtered out item 3003786103_4988249_33033"
}],
"sentReports": [],
"downloadedReports": []
},
{
"organization": "Flexion, Inc.",
"organization_id": "flexion",
"service": "simulated-lab",
"itemCount": 0,
"itemCountBeforeQualityFiltering": 1,
"filteredReportRows": [
"For flexion.simulated-lab, filter (default filter) [Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code.exists() and Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code = 'P'][] filtered out item 3003786103_4988249_33033"
],
"filteredReportItems": [
{
"filterType": "PROCESSING_MODE_FILTER",
"filterName": "(default filter) [Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code.exists() and Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code = 'P']",
"filteredTrackingElement": "3003786103_4988249_33033",
"filterArgs": [],
"message": "For flexion.simulated-lab, filter (default filter) [Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code.exists() and Bundle.entry.resource.ofType(MessageHeader).meta.tag.where(system = 'http://terminology.hl7.org/CodeSystem/v2-0103').code = 'P'][] filtered out item 3003786103_4988249_33033"
}],
"sentReports": [],
"downloadedReports": []
}],
"actionName": "receive",
"externalName": null,
"reportId": "5036f90b-edd5-4a93-9edd-a71d1fa8fba1",
"topic": "full-elr",
"errors": [],
"warnings": [],
"destinationCount": 1
}