Skip to content

Commit

Permalink
Initial ADLS gen2 support (#453)
Browse files Browse the repository at this point in the history
* minimal ADLS gen2 support

* add rigs back

* Make mocked tests work with adls

* add rigs back; make explicit no dirs

* Update testing and hns key

* format

* update mocked tests

* windows agnostic

* set gen2 var in CI

* new adls fucntionality; better tests and instantiation

* Code review comments

* Tweak HISTORY.md

* TEMP: debug test code

* don't close non-existent file

* Revert "TEMP: debug test code"

This reverts commit bb36a52.

---------

Co-authored-by: Jay Qi <[email protected]>
  • Loading branch information
pjbull and jayqi committed Aug 28, 2024
1 parent f3605a6 commit 5656879
Show file tree
Hide file tree
Showing 19 changed files with 635 additions and 134 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ AWS_SECRET_ACCESS_KEY=your_secret_access_key

AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net

# if testing with ADLS Gen2 storage, set credentials for that account here
AZURE_STORAGE_GEN2_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=your_account_name;AccountKey=your_account_key;EndpointSuffix=core.windows.net


GOOGLE_APPLICATION_CREDENTIALS=.gscreds.json
# or
GCP_PROJECT_ID=your_project_id
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
env:
LIVE_AZURE_CONTAINER: ${{ secrets.LIVE_AZURE_CONTAINER }}
AZURE_STORAGE_CONNECTION_STRING: ${{ secrets.AZURE_STORAGE_CONNECTION_STRING }}
AZURE_STORAGE_GEN2_CONNECTION_STRING: ${{ secrets.AZURE_STORAGE_GEN2_CONNECTION_STRING }}
LIVE_GS_BUCKET: ${{ secrets.LIVE_GS_BUCKET }}
LIVE_S3_BUCKET: ${{ secrets.LIVE_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ docs/docs/changelog.md
docs/docs/contributing.md

# perf output
perf-results.csv
perf-*.csv

## GitHub Python .gitignore ##
# https://github.com/github/gitignore/blob/master/Python.gitignore
Expand Down
9 changes: 9 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ Finally, you may want to run your tests against live servers to ensure that the
make test-live-cloud
```

#### Azure live backend tests

For Azure, you can test both against Azure Blob Storage backends and Azure Data Lake Storage Gen2 backends. To run these tests, you need to set connection strings for both of the backends by setting the following environment variables (in your `.env` file for local development). If `AZURE_STORAGE_GEN2_CONNECTION_STRING` is not set, only the blob storage backend will be tested. To set up a storage account with ADLS Gen2, go through the normal creation flow for a storage account in the Azure portal and select "Enable Hierarchical Namespace" in the "Advanced" tab of the settings when configuring the account.

```bash
AZURE_STORAGE_CONNECTION_STRING=your_connection_string
AZURE_STORAGE_GEN2_CONNECTION_STRING=your_connection_string
```

You can copy `.env.example` to `.env` and fill in the credentials and bucket/container names for the providers you want to test against. **Note that the live tests will create and delete files on the cloud provider.**

You can also skip providers you do not have accounts for by commenting them out in the `rig` and `s3_like_rig` variables defined at the end of `tests/conftest.py`.
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Changed `LocalClient` so that client instances using the default storage access the default local storage directory through the `get_default_storage_dir` rather than having an explicit reference to the path set at instantiation. This means that calling `get_default_storage_dir` will reset the local storage for all clients using the default local storage, whether the client has already been instantiated or is instantiated after resetting. This fixes unintuitive behavior where `reset_local_storage` did not reset local storage when using the default client. (Issue [#414](https://github.com/drivendataorg/cloudpathlib/issues/414))
- Added a new `local_storage_dir` property to `LocalClient`. This will return the current local storage directory used by that client instance.
by reference through the `get_default_ rather than with an explicit.
- Added Azure Data Lake Storage Gen2 support (Issue [#161](https://github.com/drivendataorg/cloudpathlib/issues/161), PR [#450](https://github.com/drivendataorg/cloudpathlib/pull/450)), thanks to [@M0dEx](https://github.com/M0dEx) for PR [#447](https://github.com/drivendataorg/cloudpathlib/pull/447) and PR [#449](https://github.com/drivendataorg/cloudpathlib/pull/449)

## v0.18.1 (2024-02-26)

Expand Down
223 changes: 179 additions & 44 deletions cloudpathlib/azure/azblobclient.py

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions cloudpathlib/azure/azblobpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING

from cloudpathlib.exceptions import CloudPathIsADirectoryError

try:
from azure.core.exceptions import ResourceNotFoundError
except ImportError:
Expand Down Expand Up @@ -44,8 +46,7 @@ def is_file(self) -> bool:
return self.client._is_file_or_dir(self) == "file"

def mkdir(self, parents=False, exist_ok=False):
# not possible to make empty directory on blob storage
pass
self.client._mkdir(self, parents=parents, exist_ok=exist_ok)

def touch(self, exist_ok: bool = True):
if self.exists():
Expand Down Expand Up @@ -84,6 +85,17 @@ def stat(self):
)
)

def replace(self, target: "AzureBlobPath") -> "AzureBlobPath":
try:
return super().replace(target)

# we can rename directories on ADLS Gen2
except CloudPathIsADirectoryError:
if self.client._check_hns():
return self.client._move_file(self, target)
else:
raise

@property
def container(self) -> str:
return self._no_prefix.split("/", 1)[0]
Expand Down
2 changes: 1 addition & 1 deletion cloudpathlib/cloudpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def client(self):

def __del__(self) -> None:
# make sure that file handle to local path is closed
if self._handle is not None:
if self._handle is not None and self._local.exists():
self._handle.close()

# ensure file removed from cache when cloudpath object deleted
Expand Down
7 changes: 7 additions & 0 deletions docs/docs/authentication.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ client.set_as_default_client()
cp3 = CloudPath("s3://cloudpathlib-test-bucket/")
```

## Accessing Azure DataLake Storage Gen2 (ADLS Gen2) storage with hierarchical namespace enabled

Some Azure storage accounts are configured with "hierarchical namespace" enabled. This means that the storage account is backed by the Azure DataLake Storage Gen2 product rather than Azure Blob Storage. For many operations, the two are the same and one can use the Azure Blob Storage API. However, for some operations, a developer will need to use the Azure DataLake Storage API. The `AzureBlobClient` class implemented in cloudpathlib is designed to detect if hierarchical namespace is enabled and use the Azure DataLake Storage API in the places where it is necessary or it provides a performance improvement. Usually, a user of cloudpathlib will not need to know if hierarchical namespace is enabled and the storage account is backed by Azure DataLake Storage Gen2 or Azure Blob Storage.

If needed, the Azure SDK provided `DataLakeServiceClient` object can be accessed via the `AzureBlobClient.data_lake_client`. The Azure SDK provided `BlobServiceClient` object can be accessed via `AzureBlobClient.service_client`.


## Pickling `CloudPath` objects

You can pickle and unpickle `CloudPath` objects normally, for example:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies = [
]

[project.optional-dependencies]
azure = ["azure-storage-blob>=12"]
azure = ["azure-storage-blob>=12", "azure-storage-file-datalake>=12"]
gs = ["google-cloud-storage"]
s3 = ["boto3>=1.34.0"]
all = ["cloudpathlib[azure]", "cloudpathlib[gs]", "cloudpathlib[s3]"]
Expand Down
87 changes: 73 additions & 14 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import os
from pathlib import Path, PurePosixPath
import shutil
from tempfile import TemporaryDirectory
from typing import Dict, Optional

from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import (
DataLakeServiceClient,
)
import boto3
import botocore
from dotenv import find_dotenv, load_dotenv
Expand All @@ -26,8 +30,10 @@
LocalS3Path,
)
import cloudpathlib.azure.azblobclient
from cloudpathlib.azure.azblobclient import _hns_rmtree
import cloudpathlib.s3.s3client
from .mock_clients.mock_azureblob import mocked_client_class_factory, DEFAULT_CONTAINER_NAME
from .mock_clients.mock_azureblob import MockBlobServiceClient, DEFAULT_CONTAINER_NAME
from .mock_clients.mock_adls_gen2 import MockedDataLakeServiceClient
from .mock_clients.mock_gs import (
mocked_client_class_factory as mocked_gsclient_class_factory,
DEFAULT_GS_BUCKET_NAME,
Expand Down Expand Up @@ -109,17 +115,20 @@ def create_test_dir_name(request) -> str:
return test_dir


@fixture()
def azure_rig(request, monkeypatch, assets_dir):
def _azure_fixture(conn_str_env_var, adls_gen2, request, monkeypatch, assets_dir):
drive = os.getenv("LIVE_AZURE_CONTAINER", DEFAULT_CONTAINER_NAME)
test_dir = create_test_dir_name(request)

live_server = os.getenv("USE_LIVE_CLOUD") == "1"

connection_kwargs = dict()
tmpdir = TemporaryDirectory()

if live_server:
# Set up test assets
blob_service_client = BlobServiceClient.from_connection_string(
os.getenv("AZURE_STORAGE_CONNECTION_STRING")
blob_service_client = BlobServiceClient.from_connection_string(os.getenv(conn_str_env_var))
data_lake_service_client = DataLakeServiceClient.from_connection_string(
os.getenv(conn_str_env_var)
)
test_files = [
f for f in assets_dir.glob("**/*") if f.is_file() and f.name not in UPLOAD_IGNORE_LIST
Expand All @@ -130,13 +139,25 @@ def azure_rig(request, monkeypatch, assets_dir):
blob=str(f"{test_dir}/{PurePosixPath(test_file.relative_to(assets_dir))}"),
)
blob_client.upload_blob(test_file.read_bytes(), overwrite=True)

connection_kwargs["connection_string"] = os.getenv(conn_str_env_var)
else:
monkeypatch.setenv("AZURE_STORAGE_CONNECTION_STRING", "")
# Mock cloud SDK
# pass key mocked params to clients via connection string
monkeypatch.setenv(
"AZURE_STORAGE_CONNECTION_STRING", f"{Path(tmpdir.name) / test_dir};{adls_gen2}"
)
monkeypatch.setenv("AZURE_STORAGE_GEN2_CONNECTION_STRING", "")

monkeypatch.setattr(
cloudpathlib.azure.azblobclient,
"BlobServiceClient",
mocked_client_class_factory(test_dir),
MockBlobServiceClient,
)

monkeypatch.setattr(
cloudpathlib.azure.azblobclient,
"DataLakeServiceClient",
MockedDataLakeServiceClient,
)

rig = CloudProviderTestRig(
Expand All @@ -145,19 +166,47 @@ def azure_rig(request, monkeypatch, assets_dir):
drive=drive,
test_dir=test_dir,
live_server=live_server,
required_client_kwargs=connection_kwargs,
)

rig.client_class().set_as_default_client() # set default client
rig.client_class(**connection_kwargs).set_as_default_client() # set default client

# add flag for adls gen2 rig to skip some tests
rig.is_adls_gen2 = adls_gen2
rig.connection_string = os.getenv(conn_str_env_var) # used for client instantiation tests

yield rig

rig.client_class._default_client = None # reset default client

if live_server:
# Clean up test dir
container_client = blob_service_client.get_container_client(drive)
to_delete = container_client.list_blobs(name_starts_with=test_dir)
container_client.delete_blobs(*to_delete)
if blob_service_client.get_account_information().get("is_hns_enabled", False):
_hns_rmtree(data_lake_service_client, drive, test_dir)

else:
# Clean up test dir
container_client = blob_service_client.get_container_client(drive)
to_delete = container_client.list_blobs(name_starts_with=test_dir)
to_delete = sorted(to_delete, key=lambda b: len(b.name.split("/")), reverse=True)

container_client.delete_blobs(*to_delete)

else:
tmpdir.cleanup()


@fixture()
def azure_rig(request, monkeypatch, assets_dir):
yield from _azure_fixture(
"AZURE_STORAGE_CONNECTION_STRING", False, request, monkeypatch, assets_dir
)


@fixture()
def azure_gen2_rig(request, monkeypatch, assets_dir):
yield from _azure_fixture(
"AZURE_STORAGE_GEN2_CONNECTION_STRING", True, request, monkeypatch, assets_dir
)


@fixture()
Expand Down Expand Up @@ -420,10 +469,20 @@ def local_s3_rig(request, monkeypatch, assets_dir):
rig.client_class.reset_default_storage_dir() # reset local storage directory


# create azure fixtures for both blob and gen2 storage
azure_rigs = fixture_union(
"azure_rigs",
[
azure_rig, # azure_rig0
azure_gen2_rig, # azure_rig1
],
)

rig = fixture_union(
"rig",
[
azure_rig,
azure_rig, # azure_rig0
azure_gen2_rig, # azure_rig1
gs_rig,
s3_rig,
custom_s3_rig,
Expand Down
110 changes: 110 additions & 0 deletions tests/mock_clients/mock_adls_gen2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from datetime import datetime
from pathlib import Path, PurePosixPath
from shutil import rmtree
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.filedatalake import FileProperties

from tests.mock_clients.mock_azureblob import _JsonCache, DEFAULT_CONTAINER_NAME


class MockedDataLakeServiceClient:
def __init__(self, test_dir, adls):
# root is parent of the test specific directort
self.root = test_dir.parent
self.test_dir = test_dir
self.adls = adls
self.metadata_cache = _JsonCache(self.root / ".metadata")

@classmethod
def from_connection_string(cls, conn_str, credential):
# configured in conftest.py
test_dir, adls = conn_str.split(";")
adls = adls == "True"
test_dir = Path(test_dir)
return cls(test_dir, adls)

def get_file_system_client(self, file_system):
return MockedFileSystemClient(self.root, self.metadata_cache)


class MockedFileSystemClient:
def __init__(self, root, metadata_cache):
self.root = root
self.metadata_cache = metadata_cache

def get_file_client(self, key):
return MockedFileClient(key, self.root, self.metadata_cache)

def get_directory_client(self, key):
return MockedDirClient(key, self.root)

def get_paths(self, path, recursive=False):
yield from (
MockedFileClient(
PurePosixPath(f.relative_to(self.root)), self.root, self.metadata_cache
).get_file_properties()
for f in (self.root / path).glob("**/*" if recursive else "*")
)


class MockedFileClient:
def __init__(self, key, root, metadata_cache) -> None:
self.key = key
self.root = root
self.metadata_cache = metadata_cache

def get_file_properties(self):
path = self.root / self.key

if path.exists() and path.is_dir():
fp = FileProperties(
**{
"name": self.key,
"size": 0,
"ETag": "etag",
"Last-Modified": datetime.fromtimestamp(path.stat().st_mtime),
"metadata": {"hdi_isfolder": True},
}
)
fp["is_directory"] = True # not part of object def, but still in API responses...
return fp

elif path.exists():
fp = FileProperties(
**{
"name": self.key,
"size": path.stat().st_size,
"ETag": "etag",
"Last-Modified": datetime.fromtimestamp(path.stat().st_mtime),
"metadata": {"hdi_isfolder": False},
"Content-Type": self.metadata_cache.get(self.root / self.key, None),
}
)

fp["is_directory"] = False
return fp
else:
raise ResourceNotFoundError

def rename_file(self, new_name):
new_path = self.root / new_name[len(DEFAULT_CONTAINER_NAME + "/") :]
(self.root / self.key).rename(new_path)


class MockedDirClient:
def __init__(self, key, root) -> None:
self.key = key
self.root = root

def delete_directory(self):
rmtree(self.root / self.key)

def exists(self):
return (self.root / self.key).exists()

def create_directory(self):
(self.root / self.key).mkdir(parents=True, exist_ok=True)

def rename_directory(self, new_name):
new_path = self.root / new_name[len(DEFAULT_CONTAINER_NAME + "/") :]
(self.root / self.key).rename(new_path)
Loading

0 comments on commit 5656879

Please sign in to comment.