From de107f8c5cf675849b56e140ed8a95b743012138 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Tue, 13 Jun 2023 13:43:38 +0100 Subject: [PATCH 1/8] fix malform docstring in SparkStreamingDataSet Signed-off-by: Nok Chan --- .../spark/spark_streaming_dataset.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index 2f7743e65..c7327c864 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -20,15 +20,16 @@ class SparkStreamingDataSet(AbstractDataSet): `YAML API `_: .. code-block:: yaml + raw.new_inventory: - type: streaming.extras.datasets.spark_streaming_dataset.SparkStreamingDataSet - filepath: data/01_raw/stream/inventory/ - file_format: json - save_args: - output_mode: append - checkpoint: data/04_checkpoint/raw_new_inventory - header: True - load_args: + type: spark.SparkStreamingDataSet + filepath: data/01_raw/stream/inventory/ + file_format: json + save_args: + output_mode: append + checkpoint: data/04_checkpoint/raw_new_inventory + header: True + load_args: schema: filepath: data/01_raw/schema/inventory_schema.json """ From c419202859452b6a6e420f99a9fbe3a34931bf48 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Tue, 13 Jun 2023 15:18:23 +0100 Subject: [PATCH 2/8] change indent Signed-off-by: Nok Chan --- .../kedro_datasets/spark/spark_streaming_dataset.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index c7327c864..1f90326a5 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -26,9 +26,9 @@ class SparkStreamingDataSet(AbstractDataSet): filepath: data/01_raw/stream/inventory/ file_format: json save_args: - output_mode: append - checkpoint: data/04_checkpoint/raw_new_inventory - header: True + output_mode: append + checkpoint: data/04_checkpoint/raw_new_inventory + header: True load_args: schema: filepath: data/01_raw/schema/inventory_schema.json From 6079395b7c962b15871cf090436c8d428afc5d3a Mon Sep 17 00:00:00 2001 From: Ahdra Merali Date: Tue, 13 Jun 2023 17:19:25 +0100 Subject: [PATCH 3/8] Test without docstring Signed-off-by: Ahdra Merali --- .../spark/spark_streaming_dataset.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index 1f90326a5..8cef0209f 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -15,25 +15,6 @@ class SparkStreamingDataSet(AbstractDataSet): - """``SparkStreamingDataSet`` loads data into Spark Streaming Dataframe objects. - Example usage for the - `YAML API `_: - .. code-block:: yaml - - raw.new_inventory: - type: spark.SparkStreamingDataSet - filepath: data/01_raw/stream/inventory/ - file_format: json - save_args: - output_mode: append - checkpoint: data/04_checkpoint/raw_new_inventory - header: True - load_args: - schema: - filepath: data/01_raw/schema/inventory_schema.json - """ - DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] From 7a6042b36feb104777570f4caa2a63f3c9443411 Mon Sep 17 00:00:00 2001 From: Ahdra Merali Date: Tue, 13 Jun 2023 17:42:37 +0100 Subject: [PATCH 4/8] Add back docstring Signed-off-by: Ahdra Merali --- .../spark/spark_streaming_dataset.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index 8cef0209f..5f78d4abd 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -15,6 +15,26 @@ class SparkStreamingDataSet(AbstractDataSet): + """``SparkStreamingDataSet`` loads data to Spark Streaming Dataframe objects. + + Example usage for the + `YAML API `_: + + .. code-block:: yaml + + raw.new_inventory: + type: spark.SparkStreamingDataSet + filepath: data/01_raw/stream/inventory/ + file_format: json + save_args: + output_mode: append + checkpoint: data/04_checkpoint/raw_new_inventory + header: True + load_args: + schema: + filepath: data/01_raw/schema/inventory_schema.json + """ DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] From e1edb09a22347e2d61fd17768a1d854533c859ff Mon Sep 17 00:00:00 2001 From: Ahdra Merali Date: Tue, 13 Jun 2023 18:17:18 +0100 Subject: [PATCH 5/8] Format docstring Signed-off-by: Ahdra Merali --- .../spark/spark_streaming_dataset.py | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index 5f78d4abd..b234b9dae 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -46,28 +46,27 @@ def __init__( load_args: Dict[str, Any] = None, ) -> None: """Creates a new instance of SparkStreamingDataSet. + Args: filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks - specify ``filepath``s starting with ``/dbfs/``. For message brokers such as + specify ``filepath``s starting with ``/dbfs/``. For messag ebrokers such as Kafka and all filepath is not required. - file_format: File format used during load and save - operations. These are formats supported by the running - SparkContext include parquet, csv, delta. For a list of supported - formats please refer to Apache Spark documentation at + file_format: File format used during load and save operations. + These are formats supported by the running SparkContext including parquet, + csv, and delta. For a list of supported formats please refer to the Apache + Spark documentation at https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html load_args: Load args passed to Spark DataFrameReader load method. - It is dependent on the selected file format. You can find - a list of read options for each supported format - in Spark DataFrame read documentation: - https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html, - Please note that a schema is mandatory for a streaming DataFrame + It is dependent on the selected file format. You can find a list of read options + for each selected format in Spark DataFrame read documentation, see + https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html. + Please note that a schema is mandatory for a streaming DataFrame if ``schemaInference`` is not True. - save_args: Save args passed to Spark DataFrame write options. - Similar to load_args this is dependent on the selected file - format. You can pass ``mode`` and ``partitionBy`` to specify - your overwrite mode and partitioning respectively. You can find - a list of options for each format in Spark DataFrame - write documentation: + save_args: Save args passed to Spark DataFrameReader write options. + Similar to load_args, this is dependent on the selected file format. You can pass + ``mode`` and ``partitionBy`` to specify your overwrite mode and partitioning + respectively. You can find a list of options for each selected format in + Spark DataFrame write documentation, see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html """ self._file_format = file_format From 35d994d13c03dd899c53c84e25c7112392ebd240 Mon Sep 17 00:00:00 2001 From: Ahdra Merali Date: Tue, 13 Jun 2023 18:35:29 +0100 Subject: [PATCH 6/8] Fix typo Signed-off-by: Ahdra Merali --- kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index b234b9dae..84571a8c4 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -49,7 +49,7 @@ def __init__( Args: filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks - specify ``filepath``s starting with ``/dbfs/``. For messag ebrokers such as + specify ``filepath``s starting with ``/dbfs/``. For message brokers such as Kafka and all filepath is not required. file_format: File format used during load and save operations. These are formats supported by the running SparkContext including parquet, From e6164851291ed431cc4cbed6993770a45303d136 Mon Sep 17 00:00:00 2001 From: Ahdra Merali <90615669+AhdraMeraliQB@users.noreply.github.com> Date: Tue, 13 Jun 2023 18:37:09 +0100 Subject: [PATCH 7/8] Fix typo --- kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index b234b9dae..84571a8c4 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -49,7 +49,7 @@ def __init__( Args: filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks - specify ``filepath``s starting with ``/dbfs/``. For messag ebrokers such as + specify ``filepath``s starting with ``/dbfs/``. For message brokers such as Kafka and all filepath is not required. file_format: File format used during load and save operations. These are formats supported by the running SparkContext including parquet, From 998d7bc1df198a9afd322a695701c41e2bcbb0da Mon Sep 17 00:00:00 2001 From: Ahdra Merali Date: Tue, 13 Jun 2023 18:47:05 +0100 Subject: [PATCH 8/8] Lint Signed-off-by: Ahdra Merali --- .../kedro_datasets/spark/spark_streaming_dataset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index 84571a8c4..2026419c9 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -35,6 +35,7 @@ class SparkStreamingDataSet(AbstractDataSet): schema: filepath: data/01_raw/schema/inventory_schema.json """ + DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] @@ -51,21 +52,21 @@ def __init__( filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks specify ``filepath``s starting with ``/dbfs/``. For message brokers such as Kafka and all filepath is not required. - file_format: File format used during load and save operations. + file_format: File format used during load and save operations. These are formats supported by the running SparkContext including parquet, csv, and delta. For a list of supported formats please refer to the Apache - Spark documentation at + Spark documentation at https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html load_args: Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each selected format in Spark DataFrame read documentation, see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html. - Please note that a schema is mandatory for a streaming DataFrame + Please note that a schema is mandatory for a streaming DataFrame if ``schemaInference`` is not True. save_args: Save args passed to Spark DataFrameReader write options. Similar to load_args, this is dependent on the selected file format. You can pass ``mode`` and ``partitionBy`` to specify your overwrite mode and partitioning - respectively. You can find a list of options for each selected format in + respectively. You can find a list of options for each selected format in Spark DataFrame write documentation, see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html """