From 2d1449e13f1148294e275e1970d9fdaaf2ee39a3 Mon Sep 17 00:00:00 2001 From: Brend Braeckmans <72923643+BrendBraeckmans@users.noreply.github.com> Date: Mon, 17 Jun 2024 08:52:52 +0200 Subject: [PATCH] Add streaming option to FileLoader. (#51) ## Description Add streaming option to FileLoader. Default behaviour is batch. ## Related Issue [Issue 50 ](https://github.com/Nike-Inc/koheesio/issues/50) ## Motivation and Context Having a `streaming` option for the `FileLoader` would be beneficial during UT's or for people that are not on Databricks and can't make use of the Databricks proprietary `Autoloader`. ## How Has This Been Tested? - UT's - On Databricks ## Types of changes - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Checklist: - [x] My code follows the code style of this project. - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. - [x] I have added tests to cover my changes. - [ ] All new and existing tests passed. --- src/koheesio/spark/readers/file_loader.py | 6 ++++-- tests/spark/readers/test_file_loader.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/koheesio/spark/readers/file_loader.py b/src/koheesio/spark/readers/file_loader.py index a497a0a..9d33806 100644 --- a/src/koheesio/spark/readers/file_loader.py +++ b/src/koheesio/spark/readers/file_loader.py @@ -97,6 +97,7 @@ class FileLoader(Reader, ExtraParamsMixin): schema_: Optional[Union[StructType, str]] = Field( default=None, description="Schema to use when reading the file", validate_default=False, alias="schema" ) + streaming: Optional[bool] = Field(default=False, description="Whether to read the files as a Stream or not") @field_validator("path") def ensure_path_is_str(cls, v): @@ -106,8 +107,9 @@ def ensure_path_is_str(cls, v): return v def execute(self): - """Reads the file using the specified format, schema, while applying any extra parameters.""" - reader = self.spark.read.format(self.format) + """Reads the file, in batch or as a stream, using the specified format and schema, while applying any extra parameters.""" + reader = self.spark.readStream if self.streaming else self.spark.read + reader = reader.format(self.format) if self.schema_: reader.schema(self.schema_) diff --git a/tests/spark/readers/test_file_loader.py b/tests/spark/readers/test_file_loader.py index d6c3d13..9605e56 100644 --- a/tests/spark/readers/test_file_loader.py +++ b/tests/spark/readers/test_file_loader.py @@ -1,5 +1,7 @@ import pytest +import pyspark.sql.types as T + from koheesio.spark import AnalysisException from koheesio.spark.readers.file_loader import ( AvroReader, @@ -106,6 +108,15 @@ def test_json_reader(json_file): assert actual_data == expected_data +def test_json_stream_reader(json_file): + schema = "string STRING, int INT, float FLOAT" + reader = JsonReader(path=json_file, schema=schema, streaming=True) + assert reader.path == json_file + df = reader.read() + assert df.isStreaming + assert df.schema == T._parse_datatype_string(schema) + + def test_parquet_reader(parquet_file): expected_data = [ {"id": 0},