Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add AzureBlobStorageContainer block (#139)
Browse files Browse the repository at this point in the history
* Adds initial block implementation

* Fixes a couple of bugs

* Adds docstrings

* Fix static analysis

* Adds tests

* Fixes formatting

* Fixes unrelated failing test

* Remove unneeded import

* Adds additonal error handling for better error messages

* Fix typo

* Adds check for existing directory

* Expand try except to catch missing blobs
desertaxle authored Mar 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 8a2b87c commit 90f6ae5
Showing 4 changed files with 785 additions and 19 deletions.
538 changes: 533 additions & 5 deletions prefect_azure/blob_storage.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
"""Tasks for interacting with Azure Blob Storage"""
"""Integrations for interacting with Azure Blob Storage"""

import uuid
from typing import TYPE_CHECKING, List, Union
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, Coroutine, Dict, List, Optional, Union

from azure.core.exceptions import ResourceNotFoundError

if TYPE_CHECKING:
from azure.storage.blob import BlobProperties

from prefect import task
from prefect.blocks.abstract import ObjectStorageBlock
from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem
from prefect.logging import get_run_logger
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.filesystem import filter_files
from pydantic import VERSION as PYDANTIC_VERSION

if TYPE_CHECKING:
from prefect_azure.credentials import AzureBlobStorageCredentials
if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field
else:
from pydantic import Field

from prefect_azure.credentials import AzureBlobStorageCredentials


@task
@@ -125,7 +139,7 @@ async def blob_storage_list(
blob_storage_credentials: "AzureBlobStorageCredentials",
name_starts_with: str = None,
include: Union[str, List[str]] = None,
**kwargs
**kwargs,
) -> List["BlobProperties"]:
"""
List objects from a given Blob Storage container.
@@ -177,3 +191,517 @@ def example_blob_storage_list_flow():
]

return blobs


class AzureBlobStorageContainer(
ObjectStorageBlock, WritableFileSystem, WritableDeploymentStorage
):
"""
Represents a container in Azure Blob Storage.
This class provides methods for downloading and uploading files and folders
to and from the Azure Blob Storage container.
Attributes:
container_name: The name of the Azure Blob Storage container.
credentials: The credentials to use for authentication with Azure.
base_folder: A base path to a folder within the container to use
for reading and writing objects.
"""

_block_type_name = "Azure Blob Storage Container"
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/54e3fa7e00197a4fbd1d82ed62494cb58d08c96a-250x250.png" # noqa
_documentation_url = "https://prefecthq.github.io/prefect-azure/blob_storage/#prefect_azure.blob_storabe.AzureBlobStorageContainer" # noqa

container_name: str = Field(
default=..., description="The name of a Azure Blob Storage container."
)
credentials: AzureBlobStorageCredentials = Field(
default_factory=AzureBlobStorageCredentials,
description="The credentials to use for authentication with Azure.",
)
base_folder: Optional[str] = Field(
default=None,
description=(
"A base path to a folder within the container to use "
"for reading and writing objects."
),
)

def _get_path_relative_to_base_folder(self, path: Optional[str] = None) -> str:
if path is None and self.base_folder is None:
return ""
if path is None:
return self.base_folder
if self.base_folder is None:
return path
return (Path(self.base_folder) / Path(path)).as_posix()

@sync_compatible
async def download_folder_to_path(
self,
from_folder: str,
to_folder: Union[str, Path],
**download_kwargs: Dict[str, Any],
) -> Coroutine[Any, Any, Path]:
"""Download a folder from the container to a local path.
Args:
from_folder: The folder path in the container to download.
to_folder: The local path to download the folder to.
**download_kwargs: Additional keyword arguments passed into
`BlobClient.download_blob`.
Returns:
The local path where the folder was downloaded.
Example:
Download the contents of container folder `folder` from the container
to the local folder `local_folder`:
```python
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer
credentials = AzureBlobStorageCredentials(
connection_string="connection_string",
)
block = AzureBlobStorageContainer(
container_name="container",
credentials=credentials,
)
block.download_folder_to_path(
from_folder="folder",
to_folder="local_folder"
)
```
"""
self.logger.info(
"Downloading folder from container %s to path %s",
self.container_name,
to_folder,
)
full_container_path = self._get_path_relative_to_base_folder(from_folder)
async with self.credentials.get_container_client(
self.container_name
) as container_client:
try:
async for blob in container_client.list_blobs(
name_starts_with=full_container_path
):
blob_path = blob.name
local_path = Path(to_folder) / Path(blob_path).relative_to(
full_container_path
)
local_path.parent.mkdir(parents=True, exist_ok=True)
async with container_client.get_blob_client(
blob_path
) as blob_client:
blob_obj = await blob_client.download_blob(**download_kwargs)

with local_path.open(mode="wb") as to_file:
await blob_obj.readinto(to_file)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc

return Path(to_folder)

@sync_compatible
async def download_object_to_file_object(
self,
from_path: str,
to_file_object: BinaryIO,
**download_kwargs: Dict[str, Any],
) -> Coroutine[Any, Any, BinaryIO]:
"""
Downloads an object from the container to a file object.
Args:
from_path : The path of the object to download within the container.
to_file_object: The file object to download the object to.
**download_kwargs: Additional keyword arguments for the download
operation.
Returns:
The file object that the object was downloaded to.
Example:
Download the object `object` from the container to a file object:
```python
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer
credentials = AzureBlobStorageCredentials(
connection_string="connection_string",
)
block = AzureBlobStorageContainer(
container_name="container",
credentials=credentials,
)
with open("file.txt", "wb") as f:
block.download_object_to_file_object(
from_path="object",
to_file_object=f
)
```
"""
self.logger.info(
"Downloading object from container %s to file object", self.container_name
)
full_container_path = self._get_path_relative_to_base_folder(from_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
blob_obj = await blob_client.download_blob(**download_kwargs)
await blob_obj.download_to_stream(to_file_object)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_file_object

@sync_compatible
async def download_object_to_path(
self,
from_path: str,
to_path: Union[str, Path],
**download_kwargs: Dict[str, Any],
) -> Coroutine[Any, Any, Path]:
"""
Downloads an object from a container to a specified path.
Args:
from_path: The path of the object in the container.
to_path: The path where the object will be downloaded to.
**download_kwargs (Dict[str, Any]): Additional keyword arguments
for the download operation.
Returns:
The path where the object was downloaded to.
Example:
Download the object `object` from the container to the local path
`file.txt`:
```python
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer
credentials = AzureBlobStorageCredentials(
connection_string="connection_string",
)
block = AzureBlobStorageContainer(
container_name="container",
credentials=credentials,
)
block.download_object_to_path(
from_path="object",
to_path="file.txt"
)
```
"""
self.logger.info(
"Downloading object from container %s to path %s",
self.container_name,
to_path,
)
full_container_path = self._get_path_relative_to_base_folder(from_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
blob_obj = await blob_client.download_blob(**download_kwargs)

path = Path(to_path)

path.parent.mkdir(parents=True, exist_ok=True)

with path.open(mode="wb") as to_file:
await blob_obj.readinto(to_file)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc
return Path(to_path)

@sync_compatible
async def upload_from_file_object(
self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any]
) -> Coroutine[Any, Any, str]:
"""
Uploads an object from a file object to the specified path in the blob
storage container.
Args:
from_file_object: The file object to upload.
to_path: The path in the blob storage container to upload the
object to.
**upload_kwargs: Additional keyword arguments to pass to the
upload_blob method.
Returns:
The path where the object was uploaded to.
Example:
Upload a file object to the container at the path `object`:
```python
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer
credentials = AzureBlobStorageCredentials(
connection_string="connection_string",
)
block = AzureBlobStorageContainer(
container_name="container",
credentials=credentials,
)
with open("file.txt", "rb") as f:
block.upload_from_file_object(
from_file_object=f,
to_path="object"
)
```
"""
self.logger.info(
"Uploading object to container %s with key %s", self.container_name, to_path
)
full_container_path = self._get_path_relative_to_base_folder(to_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
await blob_client.upload_blob(from_file_object, **upload_kwargs)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload from container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_path

@sync_compatible
async def upload_from_path(
self, from_path: Union[str, Path], to_path: str, **upload_kwargs: Dict[str, Any]
) -> Coroutine[Any, Any, str]:
"""
Uploads an object from a local path to the specified destination path in the
blob storage container.
Args:
from_path: The local path of the object to upload.
to_path: The destination path in the blob storage container.
**upload_kwargs: Additional keyword arguments to pass to the
`upload_blob` method.
Returns:
The destination path in the blob storage container.
Example:
Upload a file from the local path `file.txt` to the container
at the path `object`:
```python
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer
credentials = AzureBlobStorageCredentials(
connection_string="connection_string",
)
block = AzureBlobStorageContainer(
container_name="container",
credentials=credentials,
)
block.upload_from_path(
from_path="file.txt",
to_path="object"
)
```
"""
self.logger.info(
"Uploading object to container %s with key %s", self.container_name, to_path
)
full_container_path = self._get_path_relative_to_base_folder(to_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
with open(from_path, "rb") as f:
await blob_client.upload_blob(f, **upload_kwargs)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload to container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_path

@sync_compatible
async def upload_from_folder(
self,
from_folder: Union[str, Path],
to_folder: str,
**upload_kwargs: Dict[str, Any],
) -> Coroutine[Any, Any, str]:
"""
Uploads files from a local folder to a specified folder in the Azure
Blob Storage container.
Args:
from_folder: The path to the local folder containing the files to upload.
to_folder: The destination folder in the Azure Blob Storage container.
**upload_kwargs: Additional keyword arguments to pass to the
`upload_blob` method.
Returns:
The full path of the destination folder in the container.
Example:
Upload the contents of the local folder `local_folder` to the container
folder `folder`:
```python
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import AzureBlobStorageContainer
credentials = AzureBlobStorageCredentials(
connection_string="connection_string",
)
block = AzureBlobStorageContainer(
container_name="container",
credentials=credentials,
)
block.upload_from_folder(
from_folder="local_folder",
to_folder="folder"
)
```
"""
self.logger.info(
"Uploading folder to container %s with key %s",
self.container_name,
to_folder,
)
full_container_path = self._get_path_relative_to_base_folder(to_folder)
async with self.credentials.get_container_client(
self.container_name
) as container_client:
if not Path(from_folder).is_dir():
raise ValueError(f"{from_folder} is not a directory")
for path in Path(from_folder).rglob("*"):
if path.is_file():
blob_path = Path(full_container_path) / path.relative_to(
from_folder
)
async with container_client.get_blob_client(
blob_path.as_posix()
) as blob_client:
try:
await blob_client.upload_blob(
path.read_bytes(), **upload_kwargs
)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload to "
f"container {self.container_name}: {exc.reason}"
) from exc
return full_container_path

@sync_compatible
async def get_directory(
self, from_path: str = None, local_path: str = None
) -> None:
"""
Downloads the contents of a direry from the blob storage to a local path.
Used to enable flow code storage for deployments.
Args:
from_path: The path of the directory in the blob storage.
local_path: The local path where the directory will be downloaded.
"""
await self.download_folder_to_path(from_path, local_path)

@sync_compatible
async def put_directory(
self, local_path: str = None, to_path: str = None, ignore_file: str = None
) -> None:
"""
Uploads a directory to the blob storage.
Used to enable flow code storage for deployments.
Args:
local_path: The local path of the directory to upload. Defaults to
current directory.
to_path: The destination path in the blob storage. Defaults to
root directory.
ignore_file: The path to a file containing patterns to ignore
during upload.
"""
to_path = "" if to_path is None else to_path

if local_path is None:
local_path = "."

included_files = None
if ignore_file:
with open(ignore_file, "r") as f:
ignore_patterns = f.readlines()

included_files = filter_files(local_path, ignore_patterns)

for local_file_path in Path(local_path).expanduser().rglob("*"):
if (
included_files is not None
and str(local_file_path.relative_to(local_path)) not in included_files
):
continue
elif not local_file_path.is_dir():
remote_file_path = Path(to_path) / local_file_path.relative_to(
local_path
)
with open(local_file_path, "rb") as local_file:
local_file_content = local_file.read()

await self.write_path(
remote_file_path.as_posix(), content=local_file_content
)

@sync_compatible
async def read_path(self, path: str) -> bytes:
"""
Reads the contents of a file at the specified path and returns it as bytes.
Used to enable results storage.
Args:
path: The path of the file to read.
Returns:
The contents of the file as bytes.
"""
file_obj = BytesIO()
await self.download_object_to_file_object(path, file_obj)
return file_obj.getvalue()

@sync_compatible
async def write_path(self, path: str, content: bytes) -> None:
"""
Writes the content to the specified path in the blob storage.
Used to enable results storage.
Args:
path: The path where the content will be written.
content: The content to be written.
"""
await self.upload_from_file_object(BytesIO(content), path)
48 changes: 44 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -2,8 +2,11 @@

import pytest
from azure.core.exceptions import ResourceExistsError
from azure.storage.blob.aio import ContainerClient
from prefect.testing.utilities import AsyncMock, prefect_test_harness

from prefect_azure.credentials import AzureBlobStorageCredentials


@pytest.fixture(scope="session", autouse=True)
def prefect_db():
@@ -34,7 +37,10 @@ async def __aiter__(self):
yield item


mock_container = {"prefect.txt": b"prefect_works"}
mock_container = {
"prefect.txt": b"prefect_works",
"folder/prefect.txt": b"prefect_works",
}


class BlobStorageClientMethodsMock:
@@ -53,13 +59,23 @@ async def __aexit__(self, *exc):

async def download_blob(self):
return AsyncMock(
content_as_bytes=AsyncMock(return_value=mock_container.get(self.blob))
name="blob_obj",
content_as_bytes=AsyncMock(return_value=mock_container.get(self.blob)),
download_to_stream=AsyncMock(
side_effect=lambda f: f.write(mock_container.get(self.blob))
),
readinto=AsyncMock(
side_effect=lambda f: f.write(mock_container.get(self.blob))
),
)

async def upload_blob(self, data, overwrite):
async def upload_blob(self, data, overwrite=False):
if not overwrite and self.blob in mock_container:
raise ResourceExistsError("Cannot overwrite existing blob")
mock_container[self.blob] = data
if isinstance(data, (str, bytes)):
mock_container[self.blob] = data
else:
mock_container[self.blob] = data.read()
return self.blob

def list_blobs(self, name_starts_with=None, include=None, **kwargs):
@@ -97,6 +113,30 @@ def blob_storage_credentials():
return blob_storage_credentials


@pytest.fixture
def mock_blob_storage_credentials():
blob_storage_credentials = AzureBlobStorageCredentials(connection_string="mock")
mock_get_blob_client_with_container = MagicMock(
side_effect=lambda container, blob: BlobStorageClientMethodsMock(blob)
)

blob_storage_credentials.get_blob_client = mock_get_blob_client_with_container

mock_get_blob_client_without_container = MagicMock(
side_effect=lambda blob: BlobStorageClientMethodsMock(blob)
)

blob_storage_credentials.get_container_client = MagicMock(spec=ContainerClient)
block_mock = MagicMock()
block_mock.name = "folder/prefect.txt"
blob_storage_credentials.get_container_client().__aenter__.return_value = AsyncMock(
list_blobs=MagicMock(return_value=AsyncIter([block_mock])),
get_blob_client=mock_get_blob_client_without_container,
)

return blob_storage_credentials


class CosmosDbClientMethodsMock:
def query_items(self, *args, **kwargs):
return [{"name": "Someone", "age": 23}]
200 changes: 200 additions & 0 deletions tests/test_blob_storage.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import uuid
from io import BytesIO
from pathlib import Path

import pytest
from azure.core.exceptions import ResourceExistsError
from prefect import flow

from prefect_azure.blob_storage import (
AzureBlobStorageContainer,
blob_storage_download,
blob_storage_list,
blob_storage_upload,
@@ -111,3 +114,200 @@ async def blob_storage_list_flow():
assert len(blobs) == 5
for blob_data in blobs:
assert isinstance(blob_data["metadata"], dict)


class TestAzureBlobStorageContainer:
async def test_download_folder_to_path(
self, mock_blob_storage_credentials, tmp_path
):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
await container.download_folder_to_path("folder", tmp_path / "folder")

assert (tmp_path / "folder").exists()
assert (tmp_path / "folder" / "prefect.txt").exists()
with open(tmp_path / "folder" / "prefect.txt", "rb") as f:
assert f.read() == b"prefect_works"

async def test_download_object_to_file_object(
self, mock_blob_storage_credentials, tmp_path
):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
file_path = tmp_path / "file.txt"
with open(file_path, "wb") as f:
await container.download_object_to_file_object(
from_path="prefect.txt", to_file_object=f
)

assert file_path.exists()
with open(file_path, "rb") as f:
assert f.read() == b"prefect_works"

async def test_download_object_to_path(
self, mock_blob_storage_credentials, tmp_path
):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
from_path = "prefect.txt"
to_path = tmp_path / "file.txt"

await container.download_object_to_path(from_path, to_path)

assert to_path.exists()
with open(to_path, "rb") as f:
assert f.read() == b"prefect_works"

async def test_upload_from_file_object(
self, mock_blob_storage_credentials, tmp_path
):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
file_content = b"prefect_works_again"
file_object = BytesIO(file_content)
to_path = "object"

uploaded_path = await container.upload_from_file_object(
from_file_object=file_object,
to_path=to_path,
)

assert uploaded_path == to_path

await container.download_object_to_path("object", tmp_path / "file.txt")

with open(tmp_path / "file.txt", "rb") as f:
assert f.read() == b"prefect_works_again"

async def test_upload_from_path(self, mock_blob_storage_credentials, tmp_path):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
from_path = tmp_path / "file.txt"
to_path = "object-from-path"

with open(from_path, "wb") as f:
f.write(b"prefect_works_yet_again")

uploaded_path = await container.upload_from_path(
from_path=from_path,
to_path=to_path,
)

assert uploaded_path == to_path

await container.download_object_to_path(to_path, tmp_path / "file.txt")

with open(tmp_path / "file.txt", "rb") as f:
assert f.read() == b"prefect_works_yet_again"

async def test_upload_from_folder(
self, mock_blob_storage_credentials, tmp_path: Path
):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
from_folder = tmp_path / "local_folder"
from_folder.mkdir(parents=True, exist_ok=True)
to_folder = "folder"

file1_path = from_folder / "file1.txt"
file2_path = from_folder / "file2.txt"
file1_path.write_bytes(b"file1_content")
file2_path.write_bytes(b"file2_content")

await container.upload_from_folder(
from_folder=from_folder,
to_folder=to_folder,
)

await container.download_object_to_path(
"folder/file1.txt", tmp_path / "read_file1.txt"
)

with open(tmp_path / "read_file1.txt", "rb") as f:
assert f.read() == b"file1_content"

await container.download_object_to_path(
"folder/file2.txt", tmp_path / "read_file2.txt"
)

with open(tmp_path / "read_file2.txt", "rb") as f:
assert f.read() == b"file2_content"

async def test_get_directory(slef, mock_blob_storage_credentials, tmp_path):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
from_path = "folder"
local_path = str(tmp_path / "local_directory")

await container.get_directory(from_path, local_path)

assert (tmp_path / "local_directory").exists()
assert (tmp_path / "local_directory" / "prefect.txt").exists()
with open(tmp_path / "local_directory" / "prefect.txt", "rb") as f:
assert f.read() == b"prefect_works"

async def test_put_directory(self, mock_blob_storage_credentials, tmp_path):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
local_path = tmp_path / "local_directory"
to_path = "destination_directory"

local_path.mkdir()
file1_path = local_path / "file1.txt"
file2_path = local_path / "file2.txt"
file1_path.write_bytes(b"file1_content")
file2_path.write_bytes(b"file2_content")

await container.put_directory(local_path=str(local_path), to_path=to_path)

await container.download_object_to_path(
"destination_directory/file1.txt", tmp_path / "read_file1.txt"
)
with open(tmp_path / "read_file1.txt", "rb") as f:
assert f.read() == b"file1_content"

await container.download_object_to_path(
"destination_directory/file2.txt", tmp_path / "read_file2.txt"
)
with open(tmp_path / "read_file2.txt", "rb") as f:
assert f.read() == b"file2_content"

async def test_read_path(self, mock_blob_storage_credentials):
container = AzureBlobStorageContainer(
container_name="container",
credentials=mock_blob_storage_credentials,
)
path = "file.txt"
file_content = b"prefect_works"
await container.upload_from_file_object(BytesIO(file_content), path)

result = await container.read_path(path)

assert result == file_content

async def test_blob_storage_write_path(self, mock_blob_storage_credentials):
container = AzureBlobStorageContainer(
container_name="prefect",
credentials=mock_blob_storage_credentials,
)
await container.write_path("prefect-write-path.txt", b"write_path_works")

result = await container.read_path("prefect-write-path.txt")

assert result == b"write_path_works"
18 changes: 8 additions & 10 deletions tests/test_ml_datastore.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from pathlib import Path

from prefect import flow

from prefect_azure.ml_datastore import (
@@ -40,37 +38,37 @@ async def ml_get_datastore_flow():
assert result.datastore_name == "default"


async def test_ml_upload_datastore_flow(ml_credentials, datastore):
async def test_ml_upload_datastore_flow(ml_credentials, datastore, tmp_path):
@flow
async def ml_upload_datastore_flow():
result = await ml_upload_datastore(
"tests/",
str(tmp_path),
ml_credentials,
target_path="target_path",
overwrite=True,
)
return result

result = await ml_upload_datastore_flow()
assert result["src_dir"] == "tests/"
assert result["src_dir"] == str(tmp_path)
assert result["target_path"] == "target_path"
assert result["overwrite"]


async def test_ml_upload_datastore_flow_pathlib(ml_credentials, datastore):
async def test_ml_upload_datastore_flow_pathlib(ml_credentials, datastore, tmp_path):
@flow
async def ml_upload_datastore_flow():
result = await ml_upload_datastore(
Path("tests/"),
tmp_path,
ml_credentials,
target_path=Path("target/path"),
target_path="target_path",
overwrite=True,
)
return result

result = await ml_upload_datastore_flow()
assert result["src_dir"] == "tests"
assert result["target_path"] == "target/path"
assert result["src_dir"] == str(tmp_path)
assert result["target_path"] == "target_path"
assert result["overwrite"]


0 comments on commit 90f6ae5

Please sign in to comment.