Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add CLI for containers #2892

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/aimcore/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from aimcore.cli.server import commands as server_commands
from aimcore.cli.telemetry import commands as telemetry_commands
from aimcore.cli.package import commands as package_commands
from aimcore.cli.conatiners import commands as container_commands

core._verify_python3_env = lambda: None

Expand All @@ -22,3 +23,4 @@ def cli_entry_point():
cli_entry_point.add_command(server_commands.server)
cli_entry_point.add_command(telemetry_commands.telemetry)
cli_entry_point.add_command(package_commands.package)
cli_entry_point.add_command(container_commands.containers)
Empty file.
134 changes: 134 additions & 0 deletions src/aimcore/cli/conatiners/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import click

from aimcore.cli.conatiners.utils import match_runs
from aim._sdk.repo import Repo


@click.group()
@click.option('--repo', required=False,
default='aim://0.0.0.0:53800',
type=str)
@click.pass_context
def containers(ctx, repo):
"""Manage containers in aim repository."""
ctx.ensure_object(dict)
ctx.obj['repo'] = repo


@containers.command(name='ls')
@click.pass_context
def list_containers(ctx):
"""List Containers available in Repo."""
#TODO [MV]: add more useful information
repo_path = ctx.obj['repo']
if not Repo.is_remote_path(repo_path):
if not Repo.exists(repo_path):
click.echo(f'\'{repo_path}\' is not a valid aim repo.')
exit(1)

repo = Repo.from_path(repo_path)
container_hashes = repo.container_hashes

click.echo('\t'.join(container_hashes))
click.echo(f'Total {len(container_hashes)} containers.')


@containers.command(name='rm')
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
@click.option('-y', '--yes', is_flag=True, help='Automatically confirm prompt')
def remove_containers(ctx, hashes, yes):
"""Remove Container data for given container hashes."""
if len(hashes) == 0:
click.echo('Please specify at least one Container to delete.')
exit(1)
repo_path = ctx.obj['repo']
repo = Repo.from_path(repo_path)

matched_hashes = match_runs(repo, hashes)
if yes:
confirmed = True
else:
confirmed = click.confirm(f'This command will permanently delete {len(matched_hashes)} containers'
f' from aim repo located at \'{repo_path}\'. Do you want to proceed?')
if not confirmed:
return

success, remaining_containers = repo.delete_containers(matched_hashes)
if success:
click.echo(f'Successfully deleted {len(matched_hashes)} containers.')
else:
click.echo('Something went wrong while deleting containers. Remaining containers are:', err=True)
click.secho('\t'.join(remaining_containers), fg='yellow')


@containers.command(name='cp')
@click.option('--destination', required=True, type=str)
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
def copy_containers(ctx, destination, hashes):
"""Copy Container data for given container hashes to destination Repo."""
if len(hashes) == 0:
click.echo('Please specify at least one Container to copy.')
exit(1)
source = ctx.obj['repo']
source_repo = Repo.from_path(source)
destination_repo = Repo.from_path(destination)

matched_hashes = match_runs(source_repo, hashes)
success, remaining_containers = source_repo.copy_containers(matched_hashes, destination_repo)
if success:
click.echo(f'Successfully copied {len(matched_hashes)} containers.')
else:
click.echo('Something went wrong while copying containers. Remaining containers are:', err=True)
click.secho('\t'.join(remaining_containers), fg='yellow')


@containers.command(name='mv')
@click.option('--destination', required=True,
type=str)
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
def move_containers(ctx, destination, hashes):
"""Move Container data for given container hashes to destination Repo."""
if len(hashes) == 0:
click.echo('Please specify at least one Container to move.')
exit(1)
source = ctx.obj['repo']
source_repo = Repo.from_path(source)
destination_repo = Repo.from_path(destination)

matched_hashes = match_runs(source_repo, hashes)

success, remaining_containers = source_repo.move_containers(matched_hashes, destination_repo)
if success:
click.echo(f'Successfully moved {len(matched_hashes)} containers.')
else:
click.echo('Something went wrong while moving containers. Remaining containers are:', err=True)
click.secho('\t'.join(remaining_containers), fg='yellow')


@containers.command(name='close')
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
@click.option('-y', '--yes', is_flag=True, help='Automatically confirm prompt')
def close_containers(ctx, hashes, yes):
"""Close failed/stalled containers."""
repo_path = ctx.obj['repo']
repo = Repo.from_path(repo_path)

if len(hashes) == 0:
click.echo('Please specify at least one Container to close.')
exit(1)

click.secho(f'This command will forcefully close {len(hashes)} Containers from Aim Repo \'{repo_path}\'. '
f'Please make sure Containers are not active.')
if yes:
confirmed = True
else:
confirmed = click.confirm('Do you want to proceed?')
if not confirmed:
return

for container_hash in hashes:
repo._close_container(container_hash)
26 changes: 26 additions & 0 deletions src/aimcore/cli/conatiners/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import fnmatch

from typing import List, TYPE_CHECKING

if TYPE_CHECKING:
from aim._sdk.repo import Repo


def match_runs(repo: 'Repo', hashes: List[str]) -> List[str]:
matched_hashes = set()
all_run_hashes = None
for run_hash in hashes:
if '*' in run_hash:
expr = run_hash # for the sake of readability
# avoiding multiple or unnecessary list_runs() calls
if not all_run_hashes:
all_run_hashes = repo.container_hashes
if expr == '*':
return all_run_hashes
# update the matches set with current expression matches
matched_hashes.update(fnmatch.filter(all_run_hashes, expr))
else:
matched_hashes.add(run_hash)

return list(matched_hashes)

8 changes: 7 additions & 1 deletion src/aimcore/transport/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def get_tree(**kwargs):
return ResourceRef(repo.storage_engine.tree(hash_=sub, name=name, read_only=read_only))


def get_repo():
repo_path = os.environ.get(AIM_SERVER_MOUNTED_REPO_PATH)
repo = Repo.from_path(repo_path)
return ResourceRef(repo)


def get_khash_array(**kwargs):
tree = kwargs['tree']
path = kwargs['path']
Expand All @@ -60,7 +66,7 @@ def get_lock(**kwargs):
run_hash = kwargs['run_hash']
# TODO Do we need to import SFRunLock here?
from aim._sdk.lock_manager import SFRunLock
return ResourceRef(repo.storage_engine._lock_manager.get_run_lock(run_hash), SFRunLock.release)
return ResourceRef(repo.storage_engine._lock_manager.get_container_lock(run_hash), SFRunLock.release)


def get_file_manager(**kwargs):
Expand Down
2 changes: 2 additions & 0 deletions src/aimcore/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_lock,
get_file_manager,
get_dev_package,
get_repo
)
from aimcore.transport.config import AIM_SERVER_BASE_PATH
from aim._core.storage.treeutils import encode_tree, decode_tree
Expand All @@ -27,6 +28,7 @@ def prepare_resource_registry():
registry.register('Lock', get_lock)
registry.register('FileManager', get_file_manager)
registry.register('Package', get_dev_package)
registry.register('Repo', get_repo)
return registry


Expand Down
3 changes: 3 additions & 0 deletions src/python/aim/_sdk/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ def __hash__(self) -> int:
def _calc_hash(self):
return hash_auto((self.hash, hash(self.storage.url), str(self.mode)))

def close(self):
self._resources._close()


class ContainerSequenceMap(SequenceMap[Sequence]):
def __init__(self, container: Container, sequence_cls: Type[Sequence]):
Expand Down
2 changes: 1 addition & 1 deletion src/python/aim/_sdk/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def tree(self, hash_: Optional[str], name: str, read_only: bool) -> TreeView:
return self.root_tree.subtree(name)

def lock(self, hash_: str, timeout: int = 10) -> 'ContainerLock':
lock = self._lock_manager.get_run_lock(hash_, timeout)
lock = self._lock_manager.get_container_lock(hash_, timeout)
lock.lock()
return lock

Expand Down
6 changes: 3 additions & 3 deletions src/python/aim/_sdk/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, repo_path: Union[str, Path]):
def softlock_fname(name: str) -> str:
return f'{name}.softlock'

def get_run_lock_info(self, run_hash: str) -> LockInfo:
def get_container_lock_info(self, run_hash: str) -> LockInfo:
# check locks created prior to 3.15 version
locked = False
created_at = None
Expand Down Expand Up @@ -115,7 +115,7 @@ def get_run_lock_info(self, run_hash: str) -> LockInfo:

return LockInfo(run_hash=run_hash, locked=locked, created_at=created_at, version=lock_version, type=lock_type)

def get_run_lock(self, run_hash: str, timeout: int = 10) -> ContainerLock:
def get_container_lock(self, run_hash: str, timeout: int = 10) -> ContainerLock:
lock_path = self.locks_path / self.softlock_fname(run_hash)
return SFRunLock(self, run_hash, lock_path, timeout=timeout)

Expand Down Expand Up @@ -151,7 +151,7 @@ def release_locks(self, run_hash: str, force: bool) -> bool:
if lock_path.exists():
lock_path.unlink()
else:
lock_info = self.get_run_lock_info(run_hash)
lock_info = self.get_container_lock_info(run_hash)
if lock_info.locked and lock_info.version == LockingVersion.LEGACY:
success = False
elif lock_info.locked and self.is_stalled_lock(lock_path):
Expand Down
29 changes: 28 additions & 1 deletion src/python/aim/_sdk/remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,31 @@ def remove(self, file):
return self._rpc_client.run_instruction(self._hash, self._handler, 'remove', (file,), is_write_only=True)

def move(self, src, dest):
return self._rpc_client.run_instruction(self._hash, self._handler, 'move', (src, dest), is_write_only=True)
return self._rpc_client.run_instruction(self._hash, self._handler, 'move', (src, dest), is_write_only=True)


class RemoteRepoProxy:
class AutoClean(RemoteResourceAutoClean):
PRIORITY = 60

def __init__(self, client: 'Client'):
self._rpc_client = client

self.init_args = pack_args(encode_tree({}))
self.resource_type = 'Repo'

handler = self._rpc_client.get_resource_handler(self, self.resource_type, args=self.init_args)

self._resources = RemoteRepoProxy.AutoClean(self)
self._resources.rpc_client = client
self._resources.handler = handler
self._handler = handler

def _delete_container(self, hash_):
return self._rpc_client.run_instruction(-1, self._handler, '_delete_container', [hash_])

def prune(self):
return self._rpc_client.run_instruction(-1, self._handler, 'prune', [])

def _close_container(self, hash_):
return self._rpc_client.run_instruction(-1, self._handler, '_close_container', [hash_])
Loading
Loading