Skip to content

Commit

Permalink
[feat] Check and skip corrupted runs (#3152)
Browse files Browse the repository at this point in the history
  • Loading branch information
alberttorosyan authored Jun 3, 2024
1 parent d300686 commit fa17ba5
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Fixes
- Handle index db corruption and warn in UI (mihran113)
- Handle and skip corrupted runs (alberttorosyan)

## 3.19.3 Apr 17, 2024
- Resolve issue with new runs after tracking queue shutdown (mihran113)
Expand Down
27 changes: 18 additions & 9 deletions aim/cli/runs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ def runs(ctx, repo):


@runs.command(name='ls')
@click.option('--corrupted', is_flag=True, help='List corrupted runs only')
@click.pass_context
def list_runs(ctx):
def list_runs(ctx, corrupted):
"""List Runs available in Repo."""
repo_path = ctx.obj['repo']
if not Repo.is_remote_path(repo_path):
Expand All @@ -31,7 +32,7 @@ def list_runs(ctx):
exit(1)

repo = Repo.from_path(repo_path)
run_hashes = repo.list_all_runs()
run_hashes = repo.list_corrupted_runs() if corrupted else repo.list_all_runs()

click.echo('\t'.join(run_hashes))
click.echo(f'Total {len(run_hashes)} runs.')
Expand All @@ -40,27 +41,35 @@ def list_runs(ctx):
@runs.command(name='rm')
@click.argument('hashes', nargs=-1, type=str)
@click.pass_context
@click.option('--corrupted', is_flag=True, help='Remove all corrupted runs')
@click.option('-y', '--yes', is_flag=True, help='Automatically confirm prompt')
def remove_runs(ctx, hashes, yes):
def remove_runs(ctx, hashes, corrupted, yes):
"""Remove Run data for given run hashes."""
if len(hashes) == 0:
click.echo('Please specify at least one Run to delete.')
if len(hashes) == 0 and corrupted is False:
click.echo('Please specify Run hashes or `--corrupted` flag to delete runs.')
exit(1)
repo_path = ctx.obj['repo']
repo = Repo.from_path(repo_path)

matched_hashes = match_runs(repo, hashes)
if corrupted:
run_hashes = repo.list_corrupted_runs()
else:
run_hashes = match_runs(repo, hashes)
if len(run_hashes) == 0:
click.echo('No matching runs found.')
exit(0)

if yes:
confirmed = True
else:
confirmed = click.confirm(f'This command will permanently delete {len(matched_hashes)} runs from aim repo '
confirmed = click.confirm(f'This command will permanently delete {len(run_hashes)} runs from aim repo '
f'located at \'{repo_path}\'. Do you want to proceed?')
if not confirmed:
return

success, remaining_runs = repo.delete_runs(matched_hashes)
success, remaining_runs = repo.delete_runs(run_hashes)
if success:
click.echo(f'Successfully deleted {len(matched_hashes)} runs.')
click.echo(f'Successfully deleted {len(run_hashes)} runs.')
else:
click.echo('Something went wrong while deleting runs. Remaining runs are:', err=True)
click.secho('\t'.join(remaining_runs), fg='yellow')
Expand Down
21 changes: 14 additions & 7 deletions aim/sdk/index_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import contextlib
import time
import datetime

import aimrocks.errors
import pytz
import logging
import os
Expand Down Expand Up @@ -41,6 +43,7 @@ def __init__(self, repo: Repo):

self._indexing_in_progress = False
self._reindex_thread: Thread = None
self._corrupted_runs = set()

@property
def repo_status(self):
Expand Down Expand Up @@ -82,7 +85,7 @@ def _run_forever(self):
time.sleep(sleep_interval)

def _runs_with_progress(self) -> Iterable[str]:
runs_with_progress = os.listdir(self.progress_dir)
runs_with_progress = filter(lambda x: x not in self._corrupted_runs, os.listdir(self.progress_dir))
run_hashes = sorted(runs_with_progress, key=lambda r: os.path.getmtime(os.path.join(self.progress_dir, r)))
return run_hashes

Expand Down Expand Up @@ -168,10 +171,14 @@ def index(self, run_hash, ) -> bool:
lock = RefreshLock(self._index_lock_path(), timeout=10)
with self.lock_index(lock):
index = self.repo._get_index_tree('meta', 0).view(())
meta_tree = self.repo.request_tree(
'meta', run_hash, read_only=True, from_union=False, no_cache=True).subtree('meta')
meta_run_tree = meta_tree.subtree('chunks').subtree(run_hash)
meta_run_tree.finalize(index=index)
if meta_run_tree['end_time'] is None:
index['meta', 'chunks', run_hash, 'end_time'] = datetime.datetime.now(pytz.utc).timestamp()
try:
meta_tree = self.repo.request_tree(
'meta', run_hash, read_only=True, from_union=False, no_cache=True).subtree('meta')
meta_run_tree = meta_tree.subtree('chunks').subtree(run_hash)
meta_run_tree.finalize(index=index)
if meta_run_tree['end_time'] is None:
index['meta', 'chunks', run_hash, 'end_time'] = datetime.datetime.now(pytz.utc).timestamp()
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
logger.warning(f'Indexing thread detected corrupted run \'{run_hash}\'. Skipping.')
self._corrupted_runs.add(run_hash)
return True
22 changes: 18 additions & 4 deletions aim/sdk/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ def iter_runs_from_cache(self, offset: str = None) -> Iterator['Run']:
def run_exists(self, run_hash: str) -> bool:
return run_hash in self._all_run_hashes()

def is_index_corrupted(self) -> bool:
corruption_marker = os.path.join(self.path, 'meta', 'index', '.corrupted')
return os.path.exists(corruption_marker)

@ttl_cache(ttl=0.5)
def _all_run_hashes(self) -> Set[str]:
if self.is_remote_repo:
Expand All @@ -420,6 +424,15 @@ def _all_run_hashes(self) -> Set[str]:
def list_all_runs(self) -> List[str]:
return list(self._all_run_hashes())

def list_corrupted_runs(self) -> List[str]:
from aim.storage.encoding import decode_path

def get_run_hash_from_prefix(prefix: bytes):
return decode_path(prefix)[-1]

container = RocksUnionContainer(os.path.join(self.path, 'meta'), read_only=True)
return list(map(get_run_hash_from_prefix, container.corrupted_dbs))

def _active_run_hashes(self) -> Set[str]:
if self.is_remote_repo:
return set(self._remote_repo_proxy.list_active_runs())
Expand Down Expand Up @@ -792,10 +805,6 @@ def _delete_run(self, run_hash):
if self.is_remote_repo:
return self._remote_repo_proxy.delete_run(run_hash)

# check run lock info. in progress runs can't be deleted
if self._lock_manager.get_run_lock_info(run_hash).locked:
raise RuntimeError(f'Cannot delete Run \'{run_hash}\'. Run is locked.')

with self.structured_db: # rollback db entity delete if subsequent actions fail.
# remove database entry
self.structured_db.delete_run(run_hash)
Expand All @@ -818,6 +827,11 @@ def _delete_run(self, run_hash):
else:
shutil.rmtree(seqs_path, ignore_errors=True)

# remove dangling locks
lock_path = os.path.join(self.path, 'locks', f'{run_hash}.softlock')
if os.path.exists(lock_path):
os.remove(lock_path)

def _copy_run(self, run_hash, dest_repo):
def copy_trees():
# copy run meta tree
Expand Down
70 changes: 59 additions & 11 deletions aim/storage/union.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ from aim.storage.container import Container, ContainerItemsIterator
from aim.storage.prefixview import PrefixView
from aim.storage.rockscontainer import RocksContainer, optimize_db_for_read

from typing import Dict, List, NamedTuple, Tuple
from typing import Dict, List, NamedTuple, Tuple, Set


logger = logging.getLogger(__name__)
Expand All @@ -29,10 +29,11 @@ class Racer(NamedTuple):


class ItemsIterator(ContainerItemsIterator):
def __init__(self, dbs: Dict[bytes, "aimrocks.DB"], *args, **kwargs):
def __init__(self, dbs: Dict[bytes, "aimrocks.DB"], corrupted_dbs: Set[bytes], *args, **kwargs):
self.args = args
self.kwargs = kwargs
self._iterators = {}
self._corrupted_dbs = corrupted_dbs
for key, value in dbs.items():
self._iterators[key] = value.iteritems(*args, **kwargs)
self._priority: Dict[bytes, int] = {
Expand All @@ -48,23 +49,59 @@ class ItemsIterator(ContainerItemsIterator):
raise NotImplementedError

def seek_to_first(self):
corrupted_dbs = set()
for prefix, iterator in self._iterators.items():
iterator.seek_to_first()
try:
iterator.seek_to_first()
except (aimrocks.errorsRocksIOError, aimrocks.errors.Corruption):
logger.debug(f'Detected corrupted db chunk \'{prefix}\'.')
corrupted_dbs.add(prefix)
self._corrupted_dbs.update(corrupted_dbs)
for prefix in corrupted_dbs:
del self._iterators[prefix]
del self._priority[prefix]
self._init_heap()

def seek_to_last(self):
corrupted_dbs = set()
for prefix, iterator in self._iterators.items():
iterator.seek_to_last()
try:
iterator.seek_to_last()
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
logger.debug(f'Detected corrupted db chunk \'{prefix}\'.')
corrupted_dbs.add(prefix)
self._corrupted_dbs.update(corrupted_dbs)
for prefix in corrupted_dbs:
del self._iterators[prefix]
del self._priority[prefix]
self._init_heap()

def seek(self, key: bytes):
corrupted_dbs = set()
for prefix, iterator in self._iterators.items():
iterator.seek(key)
try:
iterator.seek(key)
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
logger.debug(f'Detected corrupted db chunk \'{prefix}\'.')
corrupted_dbs.add(prefix)
self._corrupted_dbs.update(corrupted_dbs)
for prefix in corrupted_dbs:
del self._iterators[prefix]
del self._priority[prefix]
self._init_heap()

def seek_for_prev(self, key):
corrupted_dbs = set()
for prefix, iterator in self._iterators.items():
iterator.seek_for_prev(key)
try:
iterator.seek_for_prev(key)
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
logger.debug(f'Detected corrupted db chunk \'{prefix}\'.')
corrupted_dbs.add(prefix)
self._corrupted_dbs.update(corrupted_dbs)
for prefix in corrupted_dbs:
del self._iterators[prefix]
del self._priority[prefix]
max_key = self._init_heap()
self.seek(max_key)

Expand Down Expand Up @@ -155,6 +192,7 @@ class DB(object):
self.db_name = db_name
self.opts = opts
self._dbs: Dict[bytes, aimrocks.DB] = dict()
self._corrupted_dbs: Set[bytes] = set()

def _get_db(
self,
Expand Down Expand Up @@ -187,7 +225,7 @@ class DB(object):
index_db = self._get_db(index_prefix, index_path, self._dbs)
# do a random read to check if index db is corrupted or not
index_db.get(index_prefix)
except aimrocks.errors.RocksIOError:
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
# delete index db and mark as corrupted
corruption_marker = Path(index_path) / '.corrupted'
if not corruption_marker.exists():
Expand All @@ -209,7 +247,11 @@ class DB(object):
for prefix in self._list_dir(db_dir):
path = os.path.join(self.db_path, self.db_name, "chunks", prefix)
prefix = encode_path((self.db_name, "chunks", prefix))
self._get_db(prefix, path, self._dbs, new_dbs)
try:
self._get_db(prefix, path, self._dbs, new_dbs)
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
logger.debug(f'Detected corrupted db chunk \'{prefix}\'.')
self._corrupted_dbs.add(prefix)

if index_db is not None:
new_dbs[b""] = index_db
Expand All @@ -229,17 +271,17 @@ class DB(object):
def iteritems(
self, *args, **kwargs
) -> "ItemsIterator":
return ItemsIterator(self.dbs, *args, **kwargs)
return ItemsIterator(self.dbs, self._corrupted_dbs, *args, **kwargs)

def iterkeys(
self, *args, **kwargs
) -> "KeysIterator":
return KeysIterator(self.dbs, *args, **kwargs)
return KeysIterator(self.dbs, self._corrupted_dbs, *args, **kwargs)

def itervalues(
self, *args, **kwargs
) -> "ValuesIterator":
return ValuesIterator(self.dbs, *args, **kwargs)
return ValuesIterator(self.dbs, self._corrupted_dbs, *args, **kwargs)


class RocksUnionContainer(RocksContainer):
Expand All @@ -265,6 +307,12 @@ class RocksUnionContainer(RocksContainer):

return self._db

@property
def corrupted_dbs(self) -> Set[bytes]:
# trigger db corruption checks
self.db.iteritems().seek_to_first()
return self.db._corrupted_dbs

def view(
self,
prefix: bytes = b''
Expand Down
1 change: 1 addition & 0 deletions aim/web/api/projects/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ProjectApiOut(BaseModel):
description: str
telemetry_enabled: int
warn_index: Optional[bool] = False
warn_runs: Optional[bool] = False


class ProjectParamsOut(BaseModel):
Expand Down
21 changes: 14 additions & 7 deletions aim/web/api/projects/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,26 @@ async def project_api():
raise HTTPException(status_code=404)

# check if the index db was corrupted and deleted
corruption_marker = os.path.join(project.repo_path, 'meta', 'index', '.corrupted')
warning_message = ''
if os.path.exists(corruption_marker):
warning_message = 'Index db was corrupted and deleted. ' \
'Please run `aim storage reindex` command to restore optimal performance.'
logger.warning(warning_message)
index_corrupted = project.repo.is_index_corrupted()
if index_corrupted:
runs_corrupted = False # prevent multiple alert banners in UI
logger.warning('Index db was corrupted and deleted. '
'Please run `aim storage reindex` command to restore optimal performance.')
else:
# check are there any corrupted run chunks
runs_corrupted = len(project.repo.list_corrupted_runs()) > 0
if runs_corrupted:
logger.warning('Corrupted Runs were detected. '
'Please run `aim runs rm --corrupted` command to remove corrupted runs. '
'You can list corrupted run hashes using `aim runs ls --corrupted` command.')

return {
'name': project.name,
'path': project.path,
'description': project.description,
'telemetry_enabled': 0,
'warn_index': bool(warning_message),
'warn_index': index_corrupted,
'warn_runs': runs_corrupted,
}


Expand Down
12 changes: 6 additions & 6 deletions aim/web/ui/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ function App(): React.FunctionComponentElement<React.ReactNode> {
<BrowserRouter basename={basePath}>
<ProjectWrapper />
<Theme>
{isVisibleCacheBanner && (
<AlertBanner type='warning' isVisiblePermanently={true}>
You are using UI from notebook env, please make sure to
<b>keep server running</b> for a better experience
</AlertBanner>
)}
{projectsData?.project?.warn_index && (
<AlertBanner type='warning'>
Index db was corrupted and deleted. Please run
<b>`aim storage reindex`</b> command to restore optimal performance.
</AlertBanner>
)}
{projectsData?.project?.warn_runs && (
<AlertBanner type='warning'>
Corrupted runs were detected. Please run
<b>`aim runs rm --corrupted`</b> command to remove corrupted runs.
</AlertBanner>
)}
<div className='pageContainer'>
<ErrorBoundary>
<SideBar />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface IProject {
path?: string;
telemetry_enabled?: string | boolean;
warn_index?: boolean;
warn_runs?: boolean;
}

export interface IProjectParamsMetrics {
Expand Down

0 comments on commit fa17ba5

Please sign in to comment.