From 169dddb43a2119fda5abbc5787760d033d767b86 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Tue, 12 Nov 2024 21:15:42 -0800 Subject: [PATCH 1/2] [FSTORE-1604] Add option to avoid setting s3a global options --- python/hsfs/engine/spark.py | 8 +++- python/tests/engine/test_spark.py | 79 +++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 4ce48a5fc1..12a37859a8 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1152,8 +1152,12 @@ def setup_storage_connector(self, storage_connector, path=None): return path def _setup_s3_hadoop_conf(self, storage_connector, path): - # For legacy behaviour set the S3 values at global level - self._set_s3_hadoop_conf(storage_connector, "fs.s3a") + FS_S3_GLOBAL_CONF = "fs.s3a.global-conf" + + # The argument arrive here as strings + if storage_connector.arguments.get(FS_S3_GLOBAL_CONF, "True").lower() == "true": + # For legacy behaviour set the S3 values at global level + self._set_s3_hadoop_conf(storage_connector, "fs.s3a") # Set credentials at bucket level as well to allow users to use multiple # storage connector in the same application. diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index e3f6948e95..e308c1948b 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -15,6 +15,8 @@ # from __future__ import annotations +from unittest.mock import call + import numpy import pandas as pd import pytest @@ -4162,6 +4164,83 @@ def test_setup_s3_hadoop_conf_legacy(self, mocker): "fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint") ) + def test_setup_s3_hadoop_conf_disable_legacy(self, mocker): + # Arrange + mock_pyspark_getOrCreate = mocker.patch( + "pyspark.sql.session.SparkSession.builder.getOrCreate" + ) + + spark_engine = spark.Engine() + + s3_connector = storage_connector.S3Connector( + id=1, + name="test_connector", + featurestore_id=99, + bucket="bucket-name", + access_key="1", + secret_key="2", + server_encryption_algorithm="3", + server_encryption_key="4", + session_token="5", + arguments=[ + {"name": "fs.s3a.endpoint", "value": "testEndpoint"}, + {"name": "fs.s3a.global-conf", "value": "False"}, + ], + ) + + # Act + result = spark_engine._setup_s3_hadoop_conf( + storage_connector=s3_connector, + path="s3://_test_path", + ) + + # Assert + assert result == "s3a://_test_path" + assert ( + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count + == 7 # Options should only be set at bucket level + ) + assert ( + call("fs.s3a.access.key", s3_connector.access_key) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + assert ( + call("fs.s3a.secret.key", s3_connector.secret_key) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + assert ( + call( + "fs.s3a.server-side-encryption-algorithm", + s3_connector.server_encryption_algorithm, + ) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + + assert ( + call( + "fs.s3a.server-side-encryption-key", s3_connector.server_encryption_key + ) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + + assert ( + call( + "fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider", + ) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + + assert ( + call("fs.s3a.session.token", s3_connector.session_token) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + + assert ( + call("fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint")) + not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls + ) + def test_setup_s3_hadoop_conf_bucket_scope(self, mocker): # Arrange mock_pyspark_getOrCreate = mocker.patch( From 64751b3b16523087a613996513cc159491f3e7d8 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Wed, 13 Nov 2024 21:06:40 -0800 Subject: [PATCH 2/2] Add positive tests as well --- python/tests/engine/test_spark.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index e308c1948b..b7d9d1d287 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -4241,6 +4241,32 @@ def test_setup_s3_hadoop_conf_disable_legacy(self, mocker): not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.access.key", s3_connector.access_key + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.secret.key", s3_connector.secret_key + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.server-side-encryption-algorithm", + s3_connector.server_encryption_algorithm, + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.server-side-encryption-key", + s3_connector.server_encryption_key, + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider", + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.session.token", s3_connector.session_token + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.endpoint", + s3_connector.arguments.get("fs.s3a.endpoint"), + ) + def test_setup_s3_hadoop_conf_bucket_scope(self, mocker): # Arrange mock_pyspark_getOrCreate = mocker.patch(