Skip to content

Commit

Permalink
[feat] Add generic error handler (#2958)
Browse files Browse the repository at this point in the history
  • Loading branch information
alberttorosyan authored Sep 4, 2023
1 parent 34e5c2c commit cc5ab0c
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 114 deletions.
2 changes: 1 addition & 1 deletion pkgs/aimstack/ml/training_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def exception_raised(
"""Is called when exception is raised from Aim codebase. """

def handle_exceptions(self):
from aim._ext.exception_resistant import set_exception_callback
from aimcore.error_handling import set_exception_callback

def callback(e: Exception, func: callable):
self.exception_raised(exception=e, function=func)
Expand Down
46 changes: 46 additions & 0 deletions src/aimcore/error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging

from functools import wraps
from typing import Type, Optional

logger = logging.getLogger(__name__)


class _SafeModeConfig:
@staticmethod
def log_exception(e: Exception, func: callable):
logger.warning(f'Exception "{str(e)}" raised in function "{func.__name__}"')

@staticmethod
def reraise_exception(e: Exception, func: callable):
raise e

exception_callback = reraise_exception


def enable_safe_mode():
_SafeModeConfig.exception_callback = _SafeModeConfig.log_exception


def disable_safe_mode():
_SafeModeConfig.exception_callback = _SafeModeConfig.reraise_exception


def set_exception_callback(callback: callable):
_SafeModeConfig.exception_callback = callback


def handle_exception(exc_type: Type[Exception], error_message: Optional[str] = None):
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except exc_type as e: # noqa
if error_message is not None:
logger.error(error_message)
raise RuntimeError(error_message)
else: # silent mode
pass
return wrapper
return inner
3 changes: 3 additions & 0 deletions src/aimcore/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Tuple
from websockets.sync.client import connect

from aimcore.error_handling import handle_exception
from aimcore.transport.message_utils import raise_exception, pack_args, unpack_stream, unpack_args
from aimcore.transport.rpc_queue import RpcQueueWithRetry
from aimcore.transport.heartbeat import HeartbeatSender
Expand Down Expand Up @@ -124,6 +125,8 @@ def client_heartbeat(self):

return response

@handle_exception(requests.ConnectionError,
error_message='Failed to connect to Aim Server. Have you forgot to run `aim server` command?')
def connect(self):
endpoint = f'{self._http_protocol}{self._client_endpoint}/connect/{self.uri}/'
response = requests.get(endpoint)
Expand Down
39 changes: 1 addition & 38 deletions src/python/aim/_core/storage/rockscontainer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import aimrocks
from typing import Iterator, Optional, Tuple

from aimcore.cleanup import AutoClean
from aim._ext.exception_resistant import exception_resistant
from aim._core.storage.locking import SoftFileLock, NoopLock
from aim._core.storage.types import BLOB
from aim._core.storage.container import Container, ContainerKey, ContainerValue, ContainerItemsIterator
Expand Down Expand Up @@ -156,15 +155,14 @@ class RocksContainer(Container):
self._lock.acquire()
secondary_path = None
else:
self.optimize_for_read()
secondary_path = tempfile.mkdtemp()

self._db = aimrocks.DB(str(self.path),
aimrocks.Options(**self._db_opts),
read_only=self.read_only, secondary_path=secondary_path)

return self._db


def finalize(self, index: Container):
"""Finalize the Container.
Expand Down Expand Up @@ -552,41 +550,6 @@ class RocksContainer(Container):
return key, value
def optimize_for_read(self):
optimize_db_for_read(self.path, self._db_opts, run_compactions=self._extra_opts.get('compaction', False))
@exception_resistant(silent=True)
def optimize_db_for_read(path: Path, options: dict, run_compactions: bool = False):
"""
This function will try to open rocksdb db in write mode and force WAL files recovery. Once done the underlying
db will contain .sst files only which will significantly reduce further open and read operations. Further
optimizations can be done by running compactions but this is a costly operation to be performed online.


Args:
path (:obj:`Path`): Path to rocksdb.
options (:obj:`dict`): options to be passed to aimrocks.DB object __init__.
run_compactions (:obj:`bool`, optional): Flag used to run rocksdb range compactions. False by default.
"""
def non_empty_wal():
for wal_path in path.glob('*.log'):
if os.path.getsize(wal_path) > 0:
return True
return False
if non_empty_wal():
lock_path = prepare_lock_path(path)
with SoftFileLock(lock_path, timeout=0):
wdb = aimrocks.DB(str(path), aimrocks.Options(**options), read_only=False)
wdb.flush()
wdb.flush_wal()
if run_compactions:
wdb.compact_range()
del wdb
def prepare_lock_path(path: Path):
"""
Expand Down
3 changes: 1 addition & 2 deletions src/python/aim/_core/storage/union.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ from pathlib import Path
from aim._core.storage.encoding import encode_path
from aim._core.storage.container import Container, ContainerItemsIterator
from aim._core.storage.prefixview import PrefixView
from aim._core.storage.rockscontainer import RocksContainer, optimize_db_for_read
from aim._core.storage.rockscontainer import RocksContainer

from typing import Dict, List, NamedTuple, Tuple

Expand Down Expand Up @@ -161,7 +161,6 @@ class DB(object):
):
db = cache.get(prefix)
if db is None:
optimize_db_for_read(Path(path), self.opts)
db = aimrocks.DB(path, opts=aimrocks.Options(**self.opts), read_only=True)
if store is not None:
store[prefix] = db
Expand Down
1 change: 0 additions & 1 deletion src/python/aim/_core/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from aim._ext.exception_resistant import enable_safe_mode, disable_safe_mode # noqa
72 changes: 0 additions & 72 deletions src/python/aim/_ext/exception_resistant.py

This file was deleted.

0 comments on commit cc5ab0c

Please sign in to comment.