Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Azure Blob Storage: add client_credentials auth #50398

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,43 @@
}
},
"required": ["azure_blob_storage_account_key", "auth_type"]
},
{
"title": "Authenticate via Client Credentials",
"type": "object",
"properties": {
"auth_type": {
"title": "Auth Type",
"default": "client_credentials",
"const": "client_credentials",
"enum": ["client_credentials"],
"type": "string"
},
"app_tenant_id": {
"title": "Tenant ID",
"description": "Tenant ID of the Microsoft Azure Application",
"airbyte_secret": true,
"type": "string"
},
"app_client_id": {
"title": "Client ID",
"description": "Client ID of your Microsoft developer application",
"airbyte_secret": true,
"type": "string"
},
"app_client_secret": {
"title": "Client Secret",
"description": "Client Secret of your Microsoft developer application",
"airbyte_secret": true,
"type": "string"
}
},
"required": [
"app_tenant_id",
"app_client_id",
"app_client_secret",
"auth_type"
]
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
dockerImageTag: 0.4.4
dockerImageTag: 0.5.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dockerImageTag: 0.5.4
dockerImageTag: 0.5.0

Please bump to 0.5.0 version.

dockerRepository: airbyte/source-azure-blob-storage
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
githubIssueLabel: source-azure-blob-storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.4.4"
version = "0.5.4"
name = "source-azure-blob-storage"
description = "Source implementation for Azure Blob Storage."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ class Config(OneOfOptionConfig):
)


class ClientCredentials(BaseModel):
class Config(OneOfOptionConfig):
title = "Authenticate via Client Credentials"
discriminator = "auth_type"

auth_type: Literal["client_credentials"] = Field("client_credentials", const=True)
app_tenant_id: str = Field(title="Tenant ID", description="Tenant ID of the Microsoft Azure Application", airbyte_secret=True)
app_client_id: str = Field(
title="Client ID",
description="Client ID of your Microsoft developer application",
airbyte_secret=True,
)
app_client_secret: str = Field(
title="Client Secret",
description="Client Secret of your Microsoft developer application",
airbyte_secret=True,
)


class StorageAccountKey(BaseModel):
class Config(OneOfOptionConfig):
title = "Authenticate via Storage Account Key"
Expand All @@ -61,7 +80,7 @@ class SourceAzureBlobStorageSpec(AbstractFileBasedSpec):
def documentation_url(cls) -> AnyUrl:
return AnyUrl("https://docs.airbyte.com/integrations/sources/azure-blob-storage", scheme="https")

credentials: Union[Oauth2, StorageAccountKey] = Field(
credentials: Union[Oauth2, ClientCredentials, StorageAccountKey] = Field(
title="Authentication",
description="Credentials for connecting to the Azure Blob Storage",
discriminator="auth_type",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import logging
from io import IOBase
from typing import Iterable, List, Optional, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pytz
from azure.core.credentials import AccessToken
from azure.core.credentials import AccessToken, TokenCredential
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, ContainerClient
from smart_open import open
Expand All @@ -19,7 +19,46 @@
from .spec import SourceAzureBlobStorageSpec


class AzureOauth2Authenticator(Oauth2Authenticator):
class AzureClientCredentialsAuthenticator(Oauth2Authenticator, TokenCredential):
def __init__(self, tenant_id: str, client_id: str, client_secret: str, **kwargs):
super().__init__(
token_refresh_endpoint=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
client_id=client_id,
client_secret=client_secret,
grant_type="client_credentials",
scopes=["https://storage.azure.com/.default"],
refresh_token=None,
)

def build_refresh_request_body(self) -> Mapping[str, Any]:
"""
Returns the request body to set on the refresh request

Override to define additional parameters
"""
payload: MutableMapping[str, Any] = {
"grant_type": self.get_grant_type(),
"client_id": self.get_client_id(),
"client_secret": self.get_client_secret(),
}

if self.get_scopes():
payload["scope"] = " ".join(self.get_scopes())

if self.get_refresh_request_body():
for key, val in self.get_refresh_request_body().items():
# We defer to existing oauth constructs over custom configured fields
if key not in payload:
payload[key] = val

return payload

def get_token(self, *args, **kwargs) -> AccessToken:
"""Parent class handles Oauth Refresh token logic."""
return AccessToken(token=self.get_access_token(), expires_on=int(self.get_token_expiry_date().timestamp()))


class AzureOauth2Authenticator(Oauth2Authenticator, TokenCredential):
"""
Authenticator for Azure Blob Storage SDK to align with azure.core.credentials.TokenCredential protocol
"""
Expand Down Expand Up @@ -63,17 +102,24 @@ def azure_blob_service_client(self):
return BlobServiceClient(self.account_url, credential=self._credentials)

@property
def azure_credentials(self) -> Union[str, AzureOauth2Authenticator]:
def azure_credentials(self) -> Union[str, AzureOauth2Authenticator, AzureClientCredentialsAuthenticator]:
if not self._credentials:
if self.config.credentials.auth_type == "storage_account_key":
self._credentials = self.config.credentials.azure_blob_storage_account_key
else:
elif self.config.credentials.auth_type == "oauth2":
self._credentials = AzureOauth2Authenticator(
token_refresh_endpoint=f"https://login.microsoftonline.com/{self.config.credentials.tenant_id}/oauth2/v2.0/token",
client_id=self.config.credentials.client_id,
client_secret=self.config.credentials.client_secret,
refresh_token=self.config.credentials.refresh_token,
)
elif self.config.credentials.auth_type == "client_credentials":
self._credentials = AzureClientCredentialsAuthenticator(
tenant_id=self.config.credentials.app_tenant_id,
client_id=self.config.credentials.app_client_id,
client_secret=self.config.credentials.app_client_secret,
)

return self._credentials

def get_matching_files(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


from azure.core.credentials import AccessToken
from source_azure_blob_storage.stream_reader import AzureOauth2Authenticator
from source_azure_blob_storage.stream_reader import AzureClientCredentialsAuthenticator, AzureOauth2Authenticator


def test_custom_authenticator(requests_mock):
Expand All @@ -24,3 +24,23 @@ def test_custom_authenticator(requests_mock):
new_token = authenticator.get_token()
assert isinstance(new_token, AccessToken)
assert new_token.token == "access_token"


def test_client_authenticator(requests_mock):
authenticator = AzureClientCredentialsAuthenticator(
token_refresh_endpoint="https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token",
tenant_id="tenant_id",
client_id="client_id",
client_secret="client_secret",
)
token_response = {
"token_type": "Bearer",
"scope": "https://storage.azure.com/.default",
"expires_in": 3600,
"ext_expires_in": 3600,
"access_token": "access_token_123",
}
requests_mock.post("https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token", json=token_response)
new_token = authenticator.get_token()
assert isinstance(new_token, AccessToken)
assert new_token.token == "access_token_123"
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,35 @@

import json
import os
from pathlib import Path
from shutil import copytree
from tempfile import TemporaryDirectory
from typing import Any, Mapping

from pytest import fixture
from source_azure_blob_storage import SourceAzureBlobStorage, SourceAzureBlobStorageSpec, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage.config_migrations import MigrateCredentials, MigrateLegacyConfig

from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor


@fixture
def temp_configs():
config_path = f"{os.path.dirname(__file__)}/test_configs/"
with TemporaryDirectory() as _tempdir:
configs_dir = Path(_tempdir) / "test_configs"
copytree(config_path, configs_dir)
yield configs_dir


# HELPERS
def load_config(config_path: str) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)


def test_legacy_config_migration():
config_path = f"{os.path.dirname(__file__)}/test_configs/test_legacy_config.json"
def test_legacy_config_migration(temp_configs):
config_path = str((Path(temp_configs) / "test_legacy_config.json").resolve())
migration_instance = MigrateLegacyConfig
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
Expand Down Expand Up @@ -47,9 +60,11 @@ def test_legacy_config_migration():
assert test_migrated_config == expected_config


def test_credentials_config_migration():
config_path = f"{os.path.dirname(__file__)}/test_configs/test_config_without_credentials.json"
def test_credentials_config_migration(temp_configs):
config_path = str((Path(temp_configs) / "test_config_without_credentials.json").resolve())
initial_config = load_config(config_path)
expected = initial_config["azure_blob_storage_account_key"]

migration_instance = MigrateCredentials
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
Expand All @@ -61,4 +76,4 @@ def test_credentials_config_migration():
)
migration_instance.migrate(["check", "--config", config_path], source)
test_migrated_config = load_config(config_path)
assert test_migrated_config["credentials"]["azure_blob_storage_account_key"] == initial_config["azure_blob_storage_account_key"]
assert test_migrated_config["credentials"]["azure_blob_storage_account_key"] == expected
33 changes: 26 additions & 7 deletions docs/integrations/sources/azure-blob-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<HideInUI>

This page contains the setup guide and reference information for the [Azure Blob Storage](https://learn.microsoft.com/en-us/azure/?product=popular) source connector.
This page contains the setup guide and reference information for the [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs/) source connector.

</HideInUI>

Expand Down Expand Up @@ -41,10 +41,10 @@ Minimum permissions (role [Storage Blob Data Reader](https://learn.microsoft.com

### Step 1: Set up Azure Blob Storage

- Create a storage account with the permissions [details](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal)
- Create a storage account and grant roles [details](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal)

:::warning
To use Oauth 2.0 Authentication method, Access Control (IAM) should be setup.
To use Oauth2 or Client Credentials Authentication methods, Access Control (IAM) should be setup.
It is recommended
to use role [Storage Blob Data Reader](https://learn.microsoft.com/en-gb/azure/storage/blobs/assign-azure-role-data-access?tabs=portal)

Expand All @@ -62,6 +62,20 @@ Follow these steps to set up an IAM role:
</details>
:::

<details>
<summary>
Follow these steps to set up a Service Principal to use the Client Credentials authentication method.
</summary>

In the Azure portal, navigate to your Service Principal's App Registration.

Note the `Directory (tenant) ID` and `Application (client) ID` in the Overview panel.

In the `Manage / Certificates & secrets` panel, click `Client Secrets` and create a new secret. Note the `Value` of the secret.

</details>


### Step 2: Set up the Azure Blob Storage connector in Airbyte

<!-- env:cloud -->
Expand Down Expand Up @@ -93,10 +107,14 @@ Follow these steps to set up an IAM role:
2. Click Sources and then click + New source.
3. On the Set up the source page, select Azure Blob Storage from the Source type dropdown.
4. Enter a name for the Azure Blob Storage connector.
5. Enter the name of your Azure **Account**.
6. Enter your Tenant ID and Click **Authenticate your Azure Blob Storage account**.
7. Log in and authorize the Azure Blob Storage account.
8. Enter the name of the **Container** containing your files to replicate.
5. Enter the name of your Azure **Storage Account** and **container**.
6. Choose the Authentication method.
1. If you are accessing through a Storage Account Key, choose `Authenticate via Storage Account Key` and enter the key.
1. If you are accessing through a Service Principal, choose the `Authenticate via Client Credentials`.
0. See [above](#step-1-set-up-azure-blob-storage) regarding setting IAM role bindings for the Service Principal and getting detail of the app registration
1. Enter the `Directory (tenant) ID` value from app registration in Azure Portal into the `Tenant ID` field.
2. Enter the `Application (client) ID` from Azure Portal into the `Tenant ID` field. Note this is **not** the secret ID
3. Enter the Secret `Value` from Azure Portal into the `Client Secret` field.
9. Add a stream
1. Write the **File Type**
2. In the **Format** box, use the dropdown menu to select the format of the files you'd like to replicate. The supported formats are **CSV**, **Parquet**, **Avro** and **JSONL**. Toggling the **Optional fields** button within the **Format** box will allow you to enter additional configurations based on the selected format. For a detailed breakdown of these settings, refer to the [File Format section](#file-format-settings) below.
Expand Down Expand Up @@ -283,6 +301,7 @@ The Azure Blob Storage connector should not encounter any [Microsoft API limitat

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------|
| 0.5.4 | 2025-01-01 | [50398](https://github.com/airbytehq/airbyte/pull/50398) | Add client_credentials auth for Azure Service Principals |
| 0.4.4 | 2024-06-06 | [39275](https://github.com/airbytehq/airbyte/pull/39275) | [autopull] Upgrade base image to v1.2.2 |
| 0.4.3 | 2024-05-29 | [38701](https://github.com/airbytehq/airbyte/pull/38701) | Avoid error on empty stream when running discover |
| 0.4.2 | 2024-04-23 | [37504](https://github.com/airbytehq/airbyte/pull/37504) | Update specification |
Expand Down
Loading