Skip to content

Commit

Permalink
Lakefs client update (#19)
Browse files Browse the repository at this point in the history
* Bump lakefs_client to 0.113.0

* Fix mocks

* Bump lakefs_client to 1.0
  • Loading branch information
limx0 authored Oct 21, 2023
1 parent c6d5ce9 commit 40d98da
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 279 deletions.
48 changes: 1 addition & 47 deletions prefect_lakefs/commits.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from typing import Any, Dict

from lakefs_client.models import Commit, CommitCreation, CommitList
from lakefs_client.models import Commit, CommitCreation
from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread

Expand Down Expand Up @@ -110,49 +110,3 @@ def get_commit_for_example_repo():
commit_id=commit_id,
**lakefs_kwargs,
)


@task
async def log_branch_commits(
repository: str,
branch: str,
lakefs_credentials: LakeFSCredentials,
**lakefs_kwargs: Dict[str, Any],
) -> CommitList:
"""get commit log for a lakefs branch provided.
Args:
lakefs_credentials: `LakeFSCredentials` block for creating
authenticated LakeFS API clients.
repository: name of a lakefs repository.
branch: branch to fetch the commit log for.
**lakefs_kwargs: Optional extra keyword arguments to pass to the LakeFS API.
Returns:
A LakeFS `CommitList` object.
Example:
get commit log from repository named `example` given branch `main`:
```python
from prefect import flow
from prefect_lakefs import LakeFSCredentials
from prefect_lakefs.tasks import log_branch_commits
@flow
def log_branch_commits_for_main_example_repo():
commit = log_branch_commits(
lakefs_credentials=LakeFSCredentials.load("lakefs-creds"),
repository="example",
branch="main",
)
```
"""

with lakefs_credentials.get_client("commits") as commits:
return await run_sync_in_worker_thread(
commits.log_branch_commits,
repository=repository,
branch=branch,
**lakefs_kwargs,
)
4 changes: 3 additions & 1 deletion prefect_lakefs/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class LakeFSCredentials(Block):
@contextmanager
def get_client(
self,
client_type: Literal["branches", "commits", "objects", "repository", "refs"],
client_type: Literal[
"branches", "commits", "objects", "repository", "refs", "internal"
],
) -> Generator[LakeFSClient, None, None]:
"""Convenience method for retrieving a LakeFS API client for deployment resources.
Expand Down
162 changes: 162 additions & 0 deletions prefect_lakefs/internal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Module for interacting with LakeFS internal api from Prefect flows."""
from typing import Any, Dict

from lakefs_client.models import ObjectStageCreation, ObjectStats, RefsDump
from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread

from prefect_lakefs.credentials import LakeFSCredentials


@task
async def dump_refs(
repository: str,
lakefs_credentials: LakeFSCredentials,
**lakefs_kwargs: Dict[str, Any],
) -> RefsDump:
"""Dump repository refs (tags, commits, branches) to object store.
Args:
lakefs_credentials: `LakeFSCredentials` block for creating
authenticated LakeFS API clients.
repository: name of a lakefs repository.
**lakefs_kwargs: Optional extra keyword arguments to pass to the LakeFS API.
Returns:
A LakeFS `RefsDump` object.
Example:
dump refs for repository named `example`:
```python
from prefect import flow
from prefect_lakefs import LakeFSCredentials
from prefect_lakefs.tasks import dump_refs
@flow
def dump_refs_example_repo():
dump = dump_refs(
lakefs_credentials=LakeFSCredentials.load("lakefs-creds"),
repository="example",
)
```
"""

with lakefs_credentials.get_client("internal") as refs:
return await run_sync_in_worker_thread(
refs.dump_refs,
repository=repository,
**lakefs_kwargs,
)


@task
async def stage_object(
repository: str,
ref: str,
path: str,
physical_address: str,
checksum: str,
size_bytes: int,
lakefs_credentials: LakeFSCredentials,
**lakefs_kwargs: Dict[str, Any],
) -> ObjectStats:
"""stage object's metadata at given ref prefix.
Args:
lakefs_credentials: `LakeFSCredentials` block for creating
authenticated LakeFS API clients.
repository: name of a lakefs repository.
ref: branch/ref name for which the objects to be staged for.
**lakefs_kwargs: Optional extra keyword arguments to pass to the LakeFS API.
Returns:
ObjectStats
Example:
get object's existence at given path in branch `main`:
```python
from prefect import flow
from prefect_lakefs import LakeFSCredentials
from prefect_lakefs.tasks import stage_object
@flow
def stage_object_for_main_example_repo():
object_stat = await stage_object(
lakefs_credentials=LakeFSCredentials.load("lakefs-creds"),
repository="example",
ref="main",
path="obj/path",
physical_address="node_addr",
checksum="checksum_hash",
size_bytes=11,
)
```
"""

# TODO: should this be the expected behaviour? or fetch ObjectStats from elsewhere?
with lakefs_credentials.get_client("internal") as objects:
return await run_sync_in_worker_thread(
objects.stage_object,
repository=repository,
ref=ref,
path=path,
object_stage_creation=ObjectStageCreation(
physical_address=physical_address,
checksum=checksum,
size_bytes=size_bytes,
),
**lakefs_kwargs,
)


@task
async def restore_refs(
repository: str,
refs_dump: RefsDump,
lakefs_credentials: LakeFSCredentials,
**lakefs_kwargs: Dict[str, Any],
) -> None:
"""Restore repository refs form object store.
Args:
lakefs_credentials: `LakeFSCredentials` block for creating
authenticated LakeFS API clients.
repository: name of a lakefs repository.
refs_dump: LakeFS RefsDump Model.
**lakefs_kwargs: Optional extra keyword arguments to pass to the LakeFS API.
Returns:
None
Example:
restore refs dump for repository named `example`:
```python
from prefect import flow
from prefect_lakefs import LakeFSCredentials
from prefect_lakefs.tasks import dump_refs
from prefect_lakefs.tasks import restore_refs
@flow
def restore_refs_example_repo():
dump = dump_refs(
lakefs_credentials=LakeFSCredentials.load("lakefs-creds"),
repository="example",
)
restore_refs(
lakefs_credentials=LakeFSCredentials.load("lakefs-creds"),
repository="example",
refs_dump=dump,
)
```
"""

with lakefs_credentials.get_client("internal") as refs:
return await run_sync_in_worker_thread(
refs.restore_refs,
repository=repository,
refs_dump=refs_dump,
**lakefs_kwargs,
)
62 changes: 0 additions & 62 deletions prefect_lakefs/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from lakefs_client.models import (
ObjectCopyCreation,
ObjectErrorList,
ObjectStageCreation,
ObjectStats,
ObjectStatsList,
PathList,
Expand Down Expand Up @@ -377,67 +376,6 @@ def list_objects_for_main_example_repo():
)


@task
async def stage_object(
repository: str,
ref: str,
path: str,
physical_address: str,
checksum: str,
size_bytes: int,
lakefs_credentials: LakeFSCredentials,
**lakefs_kwargs: Dict[str, Any],
) -> ObjectStats:
"""stage object's metadata at given ref prefix.
Args:
lakefs_credentials: `LakeFSCredentials` block for creating
authenticated LakeFS API clients.
repository: name of a lakefs repository.
ref: branch/ref name for which the objects to be staged for.
**lakefs_kwargs: Optional extra keyword arguments to pass to the LakeFS API.
Returns:
ObjectStats
Example:
get object's existence at given path in branch `main`:
```python
from prefect import flow
from prefect_lakefs import LakeFSCredentials
from prefect_lakefs.tasks import stage_object
@flow
def stage_object_for_main_example_repo():
object_stat = await stage_object(
lakefs_credentials=LakeFSCredentials.load("lakefs-creds"),
repository="example",
ref="main",
path="obj/path",
physical_address="node_addr",
checksum="checksum_hash",
size_bytes=11,
)
```
"""

# TODO: should this be the expected behaviour? or fetch ObjectStats from elsewhere?
with lakefs_credentials.get_client("objects") as objects:
return await run_sync_in_worker_thread(
objects.stage_object,
repository=repository,
ref=ref,
path=path,
object_stage_creation=ObjectStageCreation(
physical_address=physical_address,
checksum=checksum,
size_bytes=size_bytes,
),
**lakefs_kwargs,
)


@task
async def stat_object(
repository: str,
Expand Down
Loading

0 comments on commit 40d98da

Please sign in to comment.