The batch directory contains code for a Java JAR which transforms data from a FHIR server to either Apache Parquet files for analysis or another FHIR store for data integration.
There are two options for reading the source FHIR server (input):
- FHIR-Search: This mode uses FHIR Search APIs to select resources to copy, retrieves them as FHIR resources, and transfers the data via FHIR APIs or Parquet files. This mode should work with most FHIR servers and has been tested with HAPI FHIR server and GCP FHIR store.
- JDBC: This mode uses the Java Database Connectivity (JDBC) API to read FHIR resources directly from the database of a FHIR server. It's tested with HAPI FHIR server using PostgreSQL database or an OpenMRS instance using MySQL. Note: JDBC support beyond HAPI FHIR and OpenMRS is not currently planned. Our long-term approach for a generic high-throughput alternative is to use the FHIR Bulk Export API.
There are two options for transforming the data (output):
- Parquet: Outputs the FHIR resources as Parquet files, using the SQL-on-FHIR schema.
- FHIR: Copies the FHIR resources to another FHIR server using FHIR APIs.
- Clone the FHIR Data Pipes project to your machine.
- Set the
utils
directory to world-readable:chmod -R 755 ./utils
. - Build binaries by running
mvn clean install
from the root directory of the repository.
Run the pipeline directly using the java
command:
java -jar ./pipelines/batch/target/batch-bundled-0.1.0-SNAPSHOT.jar \
org.openmrs.analytics.FhirEtl \
--fhirServerUrl=http://example.org/fhir \
--outputParquetPath=/tmp/parquet/
--[see additional parameters below]
Add the necessary parameters depending on your use case. The methods used for reading the source FHIR server and outputting the data depend on the parameters used. You can output to both Parquet files and a FHIR server by including the required parameters for both.
This section documents the parameters used by the various pipelines. For more
information on parameters, see
FhirEtlOptions
or run the pipeline with the help
option:
java -jar ./batch/target/batch-bundled-0.1.0-SNAPSHOT.jar --help=FhirEtlOptions
.
These parameters are used regardless of other pipeline options.
resourceList
- A comma-separated list of FHIR resources to include in the pipeline. Default:Patient,Encounter,Observation
runner
- The Apache Beam Runner to use. Pipelines supportsDirectRunner
andFlinkRunner
by default; other runners can be enabled by Maven profiles, e.g., DataflowRunner. See also A note about Beam runners. Default:DirectRunner
The pipeline will use FHIR-Search to fetch data as long as jdbcModeEnabled
and
jdbcModeHapi
are unset or false.
fhirServerUrl
- The base URL of the source FHIR server. Required.fhirServerUserName
- The HTTP Basic Auth username to access the FHIR server APIs. Default:admin
fhirServerPassword
- The HTTP Basic Auth password to access the FHIR server APIs. Default:Admin123
batchSize
- The number of resources to fetch in each API call. Default:100
JDBC mode is used if a JDBC flag is true
.
To use JDBC mode:
1: Create a copy of hapi-postgres-config.json and edit the values to match your database server.
2: Enable JDBC mode for your source server:
- OpenMRS
jdbcModeEnabled=true
- HAPI FHIR server
jdbcModeHapi=true
3: Specify the path to your config file.
fhirDatabaseConfigPath=./path/to/config.json
All JDBC parameters:
jdbcModeHapi
- If true, uses JDBC mode for HAPI FHIR server. Default:false
jdbcModeEnabled
- If true, uses JDBC mode for OpenMRS. Default:false
fhirDatabaseConfigPath
- Path to the FHIR database config for JDBC mode. Default:../utils/hapi-postgres-config.json
jdbcFetchSize
- The fetch size of each JDBC database query. Default:10000
jdbcMaxPoolSize
- The maximum number of database connections. Default:50
Parquet files are output when outputParquetPath
is set.
outputParquetPath
- The file path to write Parquet files to, e.g.,./tmp/parquet/
. Default: empty string, which does not output Parquet files.secondsToFlushParquetFiles
- The number of seconds to wait before flushing all Parquet writers with non-empty content to files. Use0
to disable. Default:3600
.rowGroupSizeForParquetFiles
- The approximate size in bytes of the row-groups in Parquet files. When this size is reached, the content is flushed to disk. This is not used if there are less than 100 records. Use0
to use the default Parquet row-group size. Default:0
.
Resources will be copied to the FHIR server specified in fhirSinkPath
if that
field is set.
fhirSinkPath
- A base URL to a target FHIR server, or the relative path of a GCP FHIR store, e.g.http://localhost:8091/fhir
for a FHIR server orprojects/PROJECT/locations/LOCATION/datasets/DATASET/fhirStores/FHIR-STORE-NAME
for a GCP FHIR store. If using a GCP FHIR store, see here for setup information. default: none, resources are not copiedsinkUserName
- The HTTP Basic Auth username to access the FHIR sink. Not used for GCP FHIR stores.sinkPassword
- The HTTP Basic Auth password to access the FHIR sink. Not used for GCP FHIR stores.
If the pipeline is run on a single machine (i.e., not on a distributed cluster),
for large datasets consider using a production grade runner like
Flink. This can be done
by adding the parameter --runner=FlinkRunner
(use --maxParallelism
and
--parallelism
to control parallelism). This may avoid some of the memory
issues of DirectRunner
.
These examples are set up to work with local test servers.
Example run:
java -cp ./pipelines/batch/target/batch-bundled-0.1.0-SNAPSHOT.jar \
org.openmrs.analytics.FhirEtl \
--fhirServerUrl=http://localhost:8091/fhir \
--outputParquetPath=/tmp/TEST/ \
--resourceList=Patient,Encounter,Observation
Example run:
java -cp ./pipelines/batch/target/batch-bundled-0.1.0-SNAPSHOT.jar \
org.openmrs.analytics.FhirEtl \
--fhirServerUrl=http://localhost:8091/fhir \
--resourceList=Patient,Encounter,Observation \
--fhirDatabaseConfigPath=./utils/hapi-postgres-config.json \
--jdbcModeEnabled=true --jdbcModeHapi=true \
--jdbcMaxPoolSize=50 --jdbcFetchSize=1000 \
--jdbcDriverClass=org.postgresql.Driver \
--fhirSinkPath=http://localhost:8099/fhir \
--sinkUserName=hapi --sinkPassword=hapi
To query Parquet files, load them into a compatible data engine such as Apache Spark. The single machine Docker Compose configuration runs the pipeline and loads data into an Apache Spark Thrift server for you.