Skip to content

Commit

Permalink
enables gcs staging for Databricks via named credential
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Oct 7, 2024
1 parent b5b3ab1 commit 62bf0b7
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
16 changes: 12 additions & 4 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dlt.destinations.job_impl import ReferenceFollowupJobRequest

AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "abfss", "abfs"]
SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + ["s3", "gs", "gcs"]


class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs):
Expand Down Expand Up @@ -69,11 +70,12 @@ def run(self) -> None:
bucket_url = urlparse(bucket_path)
bucket_scheme = bucket_url.scheme

if bucket_scheme not in AZURE_BLOB_STORAGE_PROTOCOLS + ["s3"]:
if bucket_scheme not in SUPPORTED_BLOB_STORAGE_PROTOCOLS:
raise LoadJobTerminalException(
self._file_path,
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
" azure buckets are supported",
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3, azure"
" and gcs buckets are supported. Please note that gcs buckets are supported"
" only via named credential",
)

if self._job_client.config.is_staging_external_location:
Expand Down Expand Up @@ -106,6 +108,12 @@ def run(self) -> None:
bucket_path = self.ensure_databricks_abfss_url(
bucket_path, staging_credentials.azure_storage_account_name
)
else:
raise LoadJobTerminalException(
self._file_path,
"You need to use Databricks named credential to use google storage."
" Passing explicit Google credentials is not supported by Databricks.",
)

if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
assert isinstance(
Expand All @@ -125,7 +133,7 @@ def run(self) -> None:
raise LoadJobTerminalException(
self._file_path,
"Cannot load from local file. Databricks does not support loading from local files."
" Configure staging with an s3 or azure storage bucket.",
" Configure staging with an s3, azure or google storage bucket.",
)

# decide on source format, stage_file_path will either be a local file or a bucket path
Expand Down
7 changes: 6 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The `jsonl` format has some limitations when used with Databricks:

## Staging support

Databricks supports both Amazon S3 and Azure Blob Storage as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there.
Databricks supports both Amazon S3, Azure Blob Storage and Google GCS as staging locations. `dlt` will upload files in `parquet` format to the staging location and will instruct Databricks to load data from there.

### Databricks and Amazon S3

Expand Down Expand Up @@ -187,6 +187,11 @@ pipeline = dlt.pipeline(

```

### Databricks and Google Cloud Storage

In order to load from GCS stage you must set-up the credentials via **named credential**. See below. Databricks does not allow to pass Google Credentials
explicitly in SQL Statements.

### Use external locations and stored credentials
`dlt` forwards bucket credentials to the `COPY INTO` SQL command by default. You may prefer to use [external locations or stored credentials instead](https://docs.databricks.com/en/sql/language-manual/sql-ref-external-locations.html#external-location) that are stored on the Databricks side.

Expand Down
76 changes: 73 additions & 3 deletions tests/load/pipeline/test_databricks_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import os

from dlt.common.utils import uniq_id
from tests.load.utils import DestinationTestConfiguration, destinations_configs, AZ_BUCKET
from tests.load.utils import (
GCS_BUCKET,
DestinationTestConfiguration,
destinations_configs,
AZ_BUCKET,
)
from tests.pipeline.utils import assert_load_info


Expand All @@ -13,7 +18,7 @@
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True, bucket_subset=(AZ_BUCKET), subset=("databricks",)
default_sql_configs=True, bucket_subset=(AZ_BUCKET,), subset=("databricks",)
),
ids=lambda x: x.name,
)
Expand Down Expand Up @@ -62,7 +67,7 @@ def test_databricks_external_location(destination_config: DestinationTestConfigu
in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)

# # should fail on non existing stored credentials
# should fail on non existing stored credentials
bricks = databricks(is_staging_external_location=False, staging_credentials_name="CREDENTIAL_X")
pipeline = destination_config.setup_pipeline(
"test_databricks_external_location",
Expand Down Expand Up @@ -90,3 +95,68 @@ def test_databricks_external_location(destination_config: DestinationTestConfigu
assert (
"credential_x" in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True, bucket_subset=(AZ_BUCKET,), subset=("databricks",)
),
ids=lambda x: x.name,
)
def test_databricks_gcs_external_location(destination_config: DestinationTestConfiguration) -> None:
# do not interfere with state
os.environ["RESTORE_FROM_DESTINATION"] = "False"
# let the package complete even with failed jobs
os.environ["RAISE_ON_FAILED_JOBS"] = "false"

dataset_name = "test_databricks_gcs_external_location" + uniq_id()

# swap AZ bucket for GCS_BUCKET
from dlt.destinations import databricks, filesystem

stage = filesystem(GCS_BUCKET)

# explicit cred handover should fail
bricks = databricks()
pipeline = destination_config.setup_pipeline(
"test_databricks_gcs_external_location",
dataset_name=dataset_name,
destination=bricks,
staging=stage,
)
info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs)
assert info.has_failed_jobs is True
assert (
"You need to use Databricks named credential or external location"
in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)

# should fail on internal config error as external location is not configured
# bricks = databricks(is_staging_external_location=True)
# pipeline = destination_config.setup_pipeline(
# "test_databricks_gcs_external_location",
# dataset_name=dataset_name,
# destination=bricks,
# staging=stage,
# )
# info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs)
# assert info.has_failed_jobs is True
# assert (
# "Invalid configuration value detected"
# in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
# )

# should fail on non existing stored credentials
bricks = databricks(is_staging_external_location=False, staging_credentials_name="CREDENTIAL_X")
pipeline = destination_config.setup_pipeline(
"test_databricks_external_location",
dataset_name=dataset_name,
destination=bricks,
staging=stage,
)
info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs)
assert info.has_failed_jobs is True
assert (
"credential_x" in pipeline.list_failed_jobs_in_package(info.loads_ids[0])[0].failed_message
)

0 comments on commit 62bf0b7

Please sign in to comment.