Skip to content

Commit

Permalink
[feat] Add CLI for containers (#2892)
Browse files Browse the repository at this point in the history
  • Loading branch information
mihran113 authored Jul 19, 2023
1 parent 91e7f49 commit 91d7beb
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/aimcore/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from aimcore.cli.server import commands as server_commands
from aimcore.cli.telemetry import commands as telemetry_commands
from aimcore.cli.package import commands as package_commands
from aimcore.cli.conatiners import commands as container_commands

core._verify_python3_env = lambda: None

Expand All @@ -22,3 +23,4 @@ def cli_entry_point():
cli_entry_point.add_command(server_commands.server)
cli_entry_point.add_command(telemetry_commands.telemetry)
cli_entry_point.add_command(package_commands.package)
cli_entry_point.add_command(container_commands.containers)
Empty file.
134 changes: 134 additions & 0 deletions src/aimcore/cli/conatiners/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import click

from aimcore.cli.conatiners.utils import match_runs
from aim._sdk.repo import Repo


@click.group()
@click.option('--repo', required=False,
default='aim://0.0.0.0:53800',
type=str)
@click.pass_context
def containers(ctx, repo):
"""Manage containers in aim repository."""
ctx.ensure_object(dict)
ctx.obj['repo'] = repo


@containers.command(name='ls')
@click.pass_context
def list_containers(ctx):
"""List Containers available in Repo."""
#TODO [MV]: add more useful information
repo_path = ctx.obj['repo']
if not Repo.is_remote_path(repo_path):
if not Repo.exists(repo_path):
click.echo(f'\'{repo_path}\' is not a valid aim repo.')
exit(1)

repo = Repo.from_path(repo_path)
container_hashes = repo.container_hashes

click.echo('\t'.join(container_hashes))
click.echo(f'Total {len(container_hashes)} containers.')


@containers.command(name='rm')
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
@click.option('-y', '--yes', is_flag=True, help='Automatically confirm prompt')
def remove_containers(ctx, hashes, yes):
"""Remove Container data for given container hashes."""
if len(hashes) == 0:
click.echo('Please specify at least one Container to delete.')
exit(1)
repo_path = ctx.obj['repo']
repo = Repo.from_path(repo_path)

matched_hashes = match_runs(repo, hashes)
if yes:
confirmed = True
else:
confirmed = click.confirm(f'This command will permanently delete {len(matched_hashes)} containers'
f' from aim repo located at \'{repo_path}\'. Do you want to proceed?')
if not confirmed:
return

success, remaining_containers = repo.delete_containers(matched_hashes)
if success:
click.echo(f'Successfully deleted {len(matched_hashes)} containers.')
else:
click.echo('Something went wrong while deleting containers. Remaining containers are:', err=True)
click.secho('\t'.join(remaining_containers), fg='yellow')


@containers.command(name='cp')
@click.option('--destination', required=True, type=str)
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
def copy_containers(ctx, destination, hashes):
"""Copy Container data for given container hashes to destination Repo."""
if len(hashes) == 0:
click.echo('Please specify at least one Container to copy.')
exit(1)
source = ctx.obj['repo']
source_repo = Repo.from_path(source)
destination_repo = Repo.from_path(destination)

matched_hashes = match_runs(source_repo, hashes)
success, remaining_containers = source_repo.copy_containers(matched_hashes, destination_repo)
if success:
click.echo(f'Successfully copied {len(matched_hashes)} containers.')
else:
click.echo('Something went wrong while copying containers. Remaining containers are:', err=True)
click.secho('\t'.join(remaining_containers), fg='yellow')


@containers.command(name='mv')
@click.option('--destination', required=True,
type=str)
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
def move_containers(ctx, destination, hashes):
"""Move Container data for given container hashes to destination Repo."""
if len(hashes) == 0:
click.echo('Please specify at least one Container to move.')
exit(1)
source = ctx.obj['repo']
source_repo = Repo.from_path(source)
destination_repo = Repo.from_path(destination)

matched_hashes = match_runs(source_repo, hashes)

success, remaining_containers = source_repo.move_containers(matched_hashes, destination_repo)
if success:
click.echo(f'Successfully moved {len(matched_hashes)} containers.')
else:
click.echo('Something went wrong while moving containers. Remaining containers are:', err=True)
click.secho('\t'.join(remaining_containers), fg='yellow')


@containers.command(name='close')
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
@click.option('-y', '--yes', is_flag=True, help='Automatically confirm prompt')
def close_containers(ctx, hashes, yes):
"""Close failed/stalled containers."""
repo_path = ctx.obj['repo']
repo = Repo.from_path(repo_path)

if len(hashes) == 0:
click.echo('Please specify at least one Container to close.')
exit(1)

click.secho(f'This command will forcefully close {len(hashes)} Containers from Aim Repo \'{repo_path}\'. '
f'Please make sure Containers are not active.')
if yes:
confirmed = True
else:
confirmed = click.confirm('Do you want to proceed?')
if not confirmed:
return

for container_hash in hashes:
repo._close_container(container_hash)
26 changes: 26 additions & 0 deletions src/aimcore/cli/conatiners/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import fnmatch

from typing import List, TYPE_CHECKING

if TYPE_CHECKING:
from aim._sdk.repo import Repo


def match_runs(repo: 'Repo', hashes: List[str]) -> List[str]:
matched_hashes = set()
all_run_hashes = None
for run_hash in hashes:
if '*' in run_hash:
expr = run_hash # for the sake of readability
# avoiding multiple or unnecessary list_runs() calls
if not all_run_hashes:
all_run_hashes = repo.container_hashes
if expr == '*':
return all_run_hashes
# update the matches set with current expression matches
matched_hashes.update(fnmatch.filter(all_run_hashes, expr))
else:
matched_hashes.add(run_hash)

return list(matched_hashes)

8 changes: 7 additions & 1 deletion src/aimcore/transport/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def get_tree(**kwargs):
return ResourceRef(repo.storage_engine.tree(hash_=sub, name=name, read_only=read_only))


def get_repo():
repo_path = os.environ.get(AIM_SERVER_MOUNTED_REPO_PATH)
repo = Repo.from_path(repo_path)
return ResourceRef(repo)


def get_khash_array(**kwargs):
tree = kwargs['tree']
path = kwargs['path']
Expand All @@ -60,7 +66,7 @@ def get_lock(**kwargs):
run_hash = kwargs['run_hash']
# TODO Do we need to import SFRunLock here?
from aim._sdk.lock_manager import SFRunLock
return ResourceRef(repo.storage_engine._lock_manager.get_run_lock(run_hash), SFRunLock.release)
return ResourceRef(repo.storage_engine._lock_manager.get_container_lock(run_hash), SFRunLock.release)


def get_file_manager(**kwargs):
Expand Down
2 changes: 2 additions & 0 deletions src/aimcore/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_lock,
get_file_manager,
get_dev_package,
get_repo
)
from aimcore.transport.config import AIM_SERVER_BASE_PATH
from aim._core.storage.treeutils import encode_tree, decode_tree
Expand All @@ -27,6 +28,7 @@ def prepare_resource_registry():
registry.register('Lock', get_lock)
registry.register('FileManager', get_file_manager)
registry.register('Package', get_dev_package)
registry.register('Repo', get_repo)
return registry


Expand Down
3 changes: 3 additions & 0 deletions src/python/aim/_sdk/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ def __hash__(self) -> int:
def _calc_hash(self):
return hash_auto((self.hash, hash(self.storage.url), str(self.mode)))

def close(self):
self._resources._close()


class ContainerSequenceMap(SequenceMap[Sequence]):
def __init__(self, container: Container, sequence_cls: Type[Sequence]):
Expand Down
2 changes: 1 addition & 1 deletion src/python/aim/_sdk/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def tree(self, hash_: Optional[str], name: str, read_only: bool) -> TreeView:
return self.root_tree.subtree(name)

def lock(self, hash_: str, timeout: int = 10) -> 'ContainerLock':
lock = self._lock_manager.get_run_lock(hash_, timeout)
lock = self._lock_manager.get_container_lock(hash_, timeout)
lock.lock()
return lock

Expand Down
6 changes: 3 additions & 3 deletions src/python/aim/_sdk/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, repo_path: Union[str, Path]):
def softlock_fname(name: str) -> str:
return f'{name}.softlock'

def get_run_lock_info(self, run_hash: str) -> LockInfo:
def get_container_lock_info(self, run_hash: str) -> LockInfo:
# check locks created prior to 3.15 version
locked = False
created_at = None
Expand Down Expand Up @@ -115,7 +115,7 @@ def get_run_lock_info(self, run_hash: str) -> LockInfo:

return LockInfo(run_hash=run_hash, locked=locked, created_at=created_at, version=lock_version, type=lock_type)

def get_run_lock(self, run_hash: str, timeout: int = 10) -> ContainerLock:
def get_container_lock(self, run_hash: str, timeout: int = 10) -> ContainerLock:
lock_path = self.locks_path / self.softlock_fname(run_hash)
return SFRunLock(self, run_hash, lock_path, timeout=timeout)

Expand Down Expand Up @@ -151,7 +151,7 @@ def release_locks(self, run_hash: str, force: bool) -> bool:
if lock_path.exists():
lock_path.unlink()
else:
lock_info = self.get_run_lock_info(run_hash)
lock_info = self.get_container_lock_info(run_hash)
if lock_info.locked and lock_info.version == LockingVersion.LEGACY:
success = False
elif lock_info.locked and self.is_stalled_lock(lock_path):
Expand Down
29 changes: 28 additions & 1 deletion src/python/aim/_sdk/remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,31 @@ def remove(self, file):
return self._rpc_client.run_instruction(self._hash, self._handler, 'remove', (file,), is_write_only=True)

def move(self, src, dest):
return self._rpc_client.run_instruction(self._hash, self._handler, 'move', (src, dest), is_write_only=True)
return self._rpc_client.run_instruction(self._hash, self._handler, 'move', (src, dest), is_write_only=True)


class RemoteRepoProxy:
class AutoClean(RemoteResourceAutoClean):
PRIORITY = 60

def __init__(self, client: 'Client'):
self._rpc_client = client

self.init_args = pack_args(encode_tree({}))
self.resource_type = 'Repo'

handler = self._rpc_client.get_resource_handler(self, self.resource_type, args=self.init_args)

self._resources = RemoteRepoProxy.AutoClean(self)
self._resources.rpc_client = client
self._resources.handler = handler
self._handler = handler

def _delete_container(self, hash_):
return self._rpc_client.run_instruction(-1, self._handler, '_delete_container', [hash_])

def prune(self):
return self._rpc_client.run_instruction(-1, self._handler, 'prune', [])

def _close_container(self, hash_):
return self._rpc_client.run_instruction(-1, self._handler, '_close_container', [hash_])
Loading

0 comments on commit 91d7beb

Please sign in to comment.