From 56b1f62e98c7d4e55c40c7621d8c8f06586a3567 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Tue, 22 Nov 2022 09:25:17 -0500 Subject: [PATCH] feat: add Delta Lake support Pass --output-format=deltalake to enable. And make sure your AWS glue is set up to support it. This is not enabled by default (yet) and this commit does not yet do anything clever with incremental bulk exports. But this is a beginning to build upon. --- .gitignore | 5 +- cumulus/etl.py | 6 +- cumulus/formats/__init__.py | 1 + cumulus/formats/athena.py | 54 ++++++++++------- cumulus/formats/deltalake.py | 111 +++++++++++++++++++++++++++++++++++ cumulus/formats/ndjson.py | 4 +- cumulus/formats/parquet.py | 4 +- pyproject.toml | 1 + tests/test_deltalake.py | 77 ++++++++++++++++++++++++ 9 files changed, 234 insertions(+), 29 deletions(-) create mode 100644 cumulus/formats/deltalake.py create mode 100644 tests/test_deltalake.py diff --git a/.gitignore b/.gitignore index 50959b43..c5352919 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,8 @@ # Project specific -example-output/ -example-phi-build/ +/.idea/ +/example-output/ +/example-phi-build/ # Python gitignore https://github.com/github/gitignore/blob/main/Python.gitignore # Byte-compiled / optimized / DLL files diff --git a/cumulus/etl.py b/cumulus/etl.py index 88a0a95c..d2273ab5 100644 --- a/cumulus/etl.py +++ b/cumulus/etl.py @@ -364,7 +364,7 @@ def main(args: List[str]): parser.add_argument('dir_phi', metavar='/path/to/phi') parser.add_argument('--input-format', default='ndjson', choices=['i2b2', 'ndjson'], help='input format (default is ndjson)') - parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'], + parser.add_argument('--output-format', default='parquet', choices=['deltalake', 'json', 'ndjson', 'parquet'], help='output format (default is parquet)') parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000, help='how many entries to process at once and thus ' @@ -401,7 +401,9 @@ def main(args: List[str]): else: config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks) - if args.output_format == 'json': + if args.output_format == 'deltalake': + config_store = formats.DeltaLakeFormat(root_output) + elif args.output_format == 'json': config_store = formats.JsonTreeFormat(root_output) elif args.output_format == 'parquet': config_store = formats.ParquetFormat(root_output) diff --git a/cumulus/formats/__init__.py b/cumulus/formats/__init__.py index 6d2ec483..298610d2 100644 --- a/cumulus/formats/__init__.py +++ b/cumulus/formats/__init__.py @@ -1,5 +1,6 @@ """Classes that know _how_ to write out results to the target folder""" +from .deltalake import DeltaLakeFormat from .json_tree import JsonTreeFormat from .ndjson import NdjsonFormat from .parquet import ParquetFormat diff --git a/cumulus/formats/athena.py b/cumulus/formats/athena.py index 0a151818..2c50ec84 100644 --- a/cumulus/formats/athena.py +++ b/cumulus/formats/athena.py @@ -18,6 +18,36 @@ class AthenaFormat(store.Format): (i.e. one folder per data type, broken into large files) """ + @abc.abstractmethod + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to the output database folder""" + + def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None: + self.write_records(job, patients, 'patient', batch) + + def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None: + self.write_records(job, encounters, 'encounter', batch) + + def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None: + self.write_records(job, labs, 'observation', batch) + + def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None: + self.write_records(job, conditions, 'condition', batch) + + def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None: + self.write_records(job, docrefs, 'documentreference', batch) + + def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None: + self.write_records(job, observations, 'symptom', batch) + + +class AthenaBatchedFileFormat(AthenaFormat): + """ + Stores output files as batched individual files. + + i.e. a few ndjson files that hold all the rows + """ + @property @abc.abstractmethod def suffix(self) -> str: @@ -39,7 +69,7 @@ def write_format(self, df: pandas.DataFrame, path: str) -> None: # ########################################################################################## - def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> None: + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: """Writes the whole dataframe to a single file""" job.attempt += len(df) @@ -49,14 +79,14 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No # our files out. What we really want is some sort of blue/green deploy of data. There's no # satisfying fix while we are writing to the same folder. (Unless we do incremental/delta # writes and keep all old data around still.) - parent_dir = self.root.joinpath(os.path.dirname(path)) + parent_dir = self.root.joinpath(dbname) try: self.root.rm(parent_dir, recursive=True) except FileNotFoundError: pass try: - full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}') + full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}') self.root.makedirs(os.path.dirname(full_path)) self.write_format(df, full_path) @@ -64,21 +94,3 @@ def _write_records(self, job, df: pandas.DataFrame, path: str, batch: int) -> No job.success_rate(1) except Exception: # pylint: disable=broad-except logging.exception('Could not process data records') - - def store_patients(self, job, patients: pandas.DataFrame, batch: int) -> None: - self._write_records(job, patients, 'patient/fhir_patients', batch) - - def store_encounters(self, job, encounters: pandas.DataFrame, batch: int) -> None: - self._write_records(job, encounters, 'encounter/fhir_encounters', batch) - - def store_labs(self, job, labs: pandas.DataFrame, batch: int) -> None: - self._write_records(job, labs, 'observation/fhir_observations', batch) - - def store_conditions(self, job, conditions: pandas.DataFrame, batch: int) -> None: - self._write_records(job, conditions, 'condition/fhir_conditions', batch) - - def store_docrefs(self, job, docrefs: pandas.DataFrame, batch: int) -> None: - self._write_records(job, docrefs, 'documentreference/fhir_documentreferences', batch) - - def store_symptoms(self, job, observations: pandas.DataFrame, batch: int) -> None: - self._write_records(job, observations, 'symptom/fhir_symptoms', batch) diff --git a/cumulus/formats/deltalake.py b/cumulus/formats/deltalake.py new file mode 100644 index 00000000..63227dd5 --- /dev/null +++ b/cumulus/formats/deltalake.py @@ -0,0 +1,111 @@ +""" +An implementation of Format that writes to a Delta Lake. + +See https://delta.io/ +""" + +import contextlib +import logging +import os + +import delta +import pandas +import pyspark +from pyspark.sql.utils import AnalysisException + +from cumulus import store + +from .athena import AthenaFormat + +# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code +# uses and expects (in terms of filesystem writing). +# +# There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to +# fsspec & pandas by default. But it is missing some critical features as of this writing (mostly merges): +# - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850 + + +@contextlib.contextmanager +def _suppress_output(): + """ + Totally hides stdout and stderr unless there is an error, and then stderr is printed. + + This is a more powerful version of contextlib.redirect_stdout that also works for subprocesses / threads. + """ + stdout = os.dup(1) + stderr = os.dup(2) + silent = os.open(os.devnull, os.O_WRONLY) + os.dup2(silent, 1) + os.dup2(silent, 2) + + try: + yield + finally: + os.dup2(stdout, 1) + os.dup2(stderr, 2) + + +class DeltaLakeFormat(AthenaFormat): + """ + Stores data in a delta lake. + """ + def __init__(self, root: store.Root): + super().__init__(root) + + # This _suppress_output call is because pyspark is SO NOISY during session creation. Like 40 lines of trivial + # output. Progress reports of downloading the jars. Comments about default logging level and the hostname. + # I could not find a way to set the log level before the session is created. So here we just suppress + # stdout/stderr entirely. + with _suppress_output(): + # Prep the builder with various config options + builder = pyspark.sql.SparkSession.builder \ + .appName('cumulus-etl') \ + .config('spark.driver.memory', '2g') \ + .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \ + .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') + + # Now add delta's packages and actually build the session + self.spark = delta.configure_spark_with_delta_pip(builder, extra_packages=[ + 'org.apache.hadoop:hadoop-aws:3.3.4', + ]).getOrCreate() + + self.spark.sparkContext.setLogLevel('ERROR') + self._configure_fs() + + def write_records(self, job, df: pandas.DataFrame, dbname: str, batch: int) -> None: + """Writes the whole dataframe to a delta lake""" + job.attempt += len(df) + full_path = self.root.joinpath(dbname).replace('s3://', 's3a://') # hadoop uses the s3a: scheme instead of s3: + + try: + updates = self.spark.createDataFrame(df) + + try: + table = delta.DeltaTable.forPath(self.spark, full_path) + if batch == 0: + table.vacuum() # Clean up unused data files older than retention policy (default 7 days) + table.alias('table') \ + .merge(source=updates.alias('updates'), condition='table.id = updates.id') \ + .whenMatchedUpdateAll() \ + .whenNotMatchedInsertAll() \ + .execute() + except AnalysisException: + # table does not exist yet, let's make an initial version + updates.write.save(path=full_path, format='delta') + + job.success += len(df) + job.success_rate(1) + except Exception: # pylint: disable=broad-except + logging.exception('Could not process data records') + + def _configure_fs(self): + """Tell spark/hadoop how to talk to S3 for us""" + fsspec_options = self.root.fsspec_options() + self.spark.conf.set('fs.s3a.sse.enabled', 'true') + self.spark.conf.set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS') + kms_key = fsspec_options.get('s3_additional_kwargs', {}).get('SSEKMSKeyId') + if kms_key: + self.spark.conf.set('fs.s3a.sse.kms.keyId', kms_key) + region_name = fsspec_options.get('client_kwargs', {}).get('region_name') + if region_name: + self.spark.conf.set('fs.s3a.endpoint.region', region_name) diff --git a/cumulus/formats/ndjson.py b/cumulus/formats/ndjson.py index 21e3f7e6..77700834 100644 --- a/cumulus/formats/ndjson.py +++ b/cumulus/formats/ndjson.py @@ -2,10 +2,10 @@ import pandas -from .athena import AthenaFormat +from .athena import AthenaBatchedFileFormat -class NdjsonFormat(AthenaFormat): +class NdjsonFormat(AthenaBatchedFileFormat): """Stores output files in a few flat ndjson files""" @property diff --git a/cumulus/formats/parquet.py b/cumulus/formats/parquet.py index b11133de..018e1c7a 100644 --- a/cumulus/formats/parquet.py +++ b/cumulus/formats/parquet.py @@ -2,10 +2,10 @@ import pandas -from .athena import AthenaFormat +from .athena import AthenaBatchedFileFormat -class ParquetFormat(AthenaFormat): +class ParquetFormat(AthenaBatchedFileFormat): """Stores output files in a few flat parquet files""" @property diff --git a/pyproject.toml b/pyproject.toml index 5a138bf2..a823b03f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,7 @@ name = "cumulus" requires-python = ">= 3.7" dependencies = [ "ctakesclient >= 1.3", + "delta-spark >= 2.1", # This branch includes some jwt fixes we need (and will hopefully be in mainline in future) "fhirclient @ git+https://github.com/mikix/client-py.git@mikix/oauth2-jwt", "jwcrypto", diff --git a/tests/test_deltalake.py b/tests/test_deltalake.py new file mode 100644 index 00000000..27136998 --- /dev/null +++ b/tests/test_deltalake.py @@ -0,0 +1,77 @@ +"""Tests for Delta Lake support""" + +import os +import shutil +import tempfile +import unittest + +import pandas +from pyspark.sql.utils import AnalysisException +from cumulus import config, formats, store + + +class TestDeltaLake(unittest.TestCase): + """ + Test case for the Delta Lake format writer. + + i.e. tests for deltalake.py + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + output_tempdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with + cls.output_tempdir = output_tempdir + cls.output_dir = output_tempdir.name + + # It is expensive to create a DeltaLakeFormat instance because of all the pyspark jar downloading etc. + # So we only do it once per class suite. (And erase all folder contents per-test) + cls.deltalake = formats.DeltaLakeFormat(store.Root(output_tempdir.name)) + + def setUp(self): + super().setUp() + shutil.rmtree(self.output_dir, ignore_errors=True) + self.job = config.JobSummary() + + @staticmethod + def df(**kwargs) -> pandas.DataFrame: + """ + Creates a dummy DataFrame with ids & values equal to each kwarg provided. + """ + rows = [{'id': k, 'value': v} for k, v in kwargs.items()] + return pandas.DataFrame(rows) + + def store(self, df: pandas.DataFrame, batch: int = 10) -> None: + """ + Writes a single batch of data to the data lake. + + :param df: the data to insert + :param batch: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic + """ + self.deltalake.store_patients(self.job, df, batch) + + def assert_lake_equal(self, df: pandas.DataFrame, when: int = None) -> None: + table_path = os.path.join(self.output_dir, 'patient') + + reader = self.deltalake.spark.read + if when is not None: + reader = reader.option('versionAsOf', when) + + table_df = reader.format('delta').load(table_path).sort('id').toPandas() + self.assertDictEqual(df.to_dict(), table_df.to_dict()) + + def test_creates_if_empty(self): + """Verify that the lake is created when empty""" + # sanity check that it doesn't exist yet + with self.assertRaises(AnalysisException): + self.assert_lake_equal(self.df()) + + self.store(self.df(a=1)) + self.assert_lake_equal(self.df(a=1)) + + def test_upsert(self): + """Verify that we can update and insert data""" + self.store(self.df(a=1, b=2)) + self.store(self.df(b=20, c=3)) + self.assert_lake_equal(self.df(a=1, b=20, c=3)) + assert False