diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 54d37f8c08..fbf552d3b1 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -33,6 +33,7 @@ from dlt.destinations.job_impl import ReferenceFollowupJobRequest AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "abfss", "abfs"] +SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + ["s3", "gs", "gcs"] class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs): @@ -69,11 +70,12 @@ def run(self) -> None: bucket_url = urlparse(bucket_path) bucket_scheme = bucket_url.scheme - if bucket_scheme not in AZURE_BLOB_STORAGE_PROTOCOLS + ["s3"]: + if bucket_scheme not in SUPPORTED_BLOB_STORAGE_PROTOCOLS: raise LoadJobTerminalException( self._file_path, - f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and" - " azure buckets are supported", + f"Databricks cannot load data from staging bucket {bucket_path}. Only s3, azure" + " and gcs buckets are supported. Please note that gcs buckets are supported" + " only via named credential", ) if self._job_client.config.is_staging_external_location: @@ -106,6 +108,12 @@ def run(self) -> None: bucket_path = self.ensure_databricks_abfss_url( bucket_path, staging_credentials.azure_storage_account_name ) + else: + raise LoadJobTerminalException( + self._file_path, + "You need to use Databricks named credential to use google storage." + " Passing explicit Google credentials is not supported by Databricks.", + ) if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS: assert isinstance( @@ -125,7 +133,7 @@ def run(self) -> None: raise LoadJobTerminalException( self._file_path, "Cannot load from local file. Databricks does not support loading from local files." - " Configure staging with an s3 or azure storage bucket.", + " Configure staging with an s3, azure or google storage bucket.", ) # decide on source format, stage_file_path will either be a local file or a bucket path diff --git a/docs/website/docs/dlt-ecosystem/destinations/databricks.md b/docs/website/docs/dlt-ecosystem/destinations/databricks.md index ddbf930306..f601f10240 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/databricks.md +++ b/docs/website/docs/dlt-ecosystem/destinations/databricks.md @@ -141,7 +141,7 @@ The `jsonl` format has some limitations when used with Databricks: ## Staging support -Databricks supports both Amazon S3 and Azure Blob Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there. +Databricks supports both Amazon S3, Azure Blob Storage and Google GCS as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there. ### Databricks and Amazon S3 @@ -187,6 +187,11 @@ pipeline = dlt.pipeline( ``` +### Databricks and Google Cloud Storage + +In order to load from GCS stage you must set-up the credentials via **named credential**. See below. Databricks does not allow to pass Google Credentials +explicitly in SQL Statements. + ### Use external locations and stored credentials `dlt` forwards bucket credentials to the `COPY INTO` SQL command by default. You may prefer to use [external locations or stored credentials instead](https://docs.databricks.com/en/sql/language-manual/sql-ref-external-locations.html#external-location) that are stored on the Databricks side. diff --git a/tests/load/pipeline/test_databricks_pipeline.py b/tests/load/pipeline/test_databricks_pipeline.py index 2225d0001c..2d8588e53c 100644 --- a/tests/load/pipeline/test_databricks_pipeline.py +++ b/tests/load/pipeline/test_databricks_pipeline.py @@ -2,7 +2,12 @@ import os from dlt.common.utils import uniq_id -from tests.load.utils import DestinationTestConfiguration, destinations_configs, AZ_BUCKET +from tests.load.utils import ( + GCS_BUCKET, + DestinationTestConfiguration, + destinations_configs, + AZ_BUCKET, +) from tests.pipeline.utils import assert_load_info @@ -13,7 +18,7 @@ @pytest.mark.parametrize( "destination_config", destinations_configs( - default_sql_configs=True, bucket_subset=(AZ_BUCKET), subset=("databricks",) + default_sql_configs=True, bucket_subset=(AZ_BUCKET,), subset=("databricks",) ), ids=lambda x: x.name, ) @@ -62,7 +67,7 @@ def test_databricks_external_location(destination_config: DestinationTestConfigu in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message ) - # # should fail on non existing stored credentials + # should fail on non existing stored credentials bricks = databricks(is_staging_external_location=False, staging_credentials_name="CREDENTIAL_X") pipeline = destination_config.setup_pipeline( "test_databricks_external_location", @@ -90,3 +95,68 @@ def test_databricks_external_location(destination_config: DestinationTestConfigu assert ( "credential_x" in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message ) + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, bucket_subset=(AZ_BUCKET,), subset=("databricks",) + ), + ids=lambda x: x.name, +) +def test_databricks_gcs_external_location(destination_config: DestinationTestConfiguration) -> None: + # do not interfere with state + os.environ["RESTORE_FROM_DESTINATION"] = "False" + # let the package complete even with failed jobs + os.environ["RAISE_ON_FAILED_JOBS"] = "false" + + dataset_name = "test_databricks_gcs_external_location" + uniq_id() + + # swap AZ bucket for GCS_BUCKET + from dlt.destinations import databricks, filesystem + + stage = filesystem(GCS_BUCKET) + + # explicit cred handover should fail + bricks = databricks() + pipeline = destination_config.setup_pipeline( + "test_databricks_gcs_external_location", + dataset_name=dataset_name, + destination=bricks, + staging=stage, + ) + info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs) + assert info.has_failed_jobs is True + assert ( + "You need to use Databricks named credential or external location" + in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message + ) + + # should fail on internal config error as external location is not configured + # bricks = databricks(is_staging_external_location=True) + # pipeline = destination_config.setup_pipeline( + # "test_databricks_gcs_external_location", + # dataset_name=dataset_name, + # destination=bricks, + # staging=stage, + # ) + # info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs) + # assert info.has_failed_jobs is True + # assert ( + # "Invalid configuration value detected" + # in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message + # ) + + # should fail on non existing stored credentials + bricks = databricks(is_staging_external_location=False, staging_credentials_name="CREDENTIAL_X") + pipeline = destination_config.setup_pipeline( + "test_databricks_external_location", + dataset_name=dataset_name, + destination=bricks, + staging=stage, + ) + info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs) + assert info.has_failed_jobs is True + assert ( + "credential_x" in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message + )