From cc5ab0ce334fba48722f2d519d58afc75e46914c Mon Sep 17 00:00:00 2001 From: Albert Torosyan <32957250+alberttorosyan@users.noreply.github.com> Date: Mon, 4 Sep 2023 09:48:19 +0400 Subject: [PATCH] [feat] Add generic error handler (#2958) --- pkgs/aimstack/ml/training_flow.py | 2 +- src/aimcore/error_handling.py | 46 ++++++++++++ src/aimcore/transport/client.py | 3 + .../aim/_core/storage/rockscontainer.pyx | 39 +--------- src/python/aim/_core/storage/union.pyx | 3 +- src/python/aim/_core/utils/__init__.py | 1 - src/python/aim/_ext/exception_resistant.py | 72 ------------------- 7 files changed, 52 insertions(+), 114 deletions(-) create mode 100644 src/aimcore/error_handling.py delete mode 100644 src/python/aim/_ext/exception_resistant.py diff --git a/pkgs/aimstack/ml/training_flow.py b/pkgs/aimstack/ml/training_flow.py index 8559798d88..d1682f4ce7 100644 --- a/pkgs/aimstack/ml/training_flow.py +++ b/pkgs/aimstack/ml/training_flow.py @@ -69,7 +69,7 @@ def exception_raised( """Is called when exception is raised from Aim codebase. """ def handle_exceptions(self): - from aim._ext.exception_resistant import set_exception_callback + from aimcore.error_handling import set_exception_callback def callback(e: Exception, func: callable): self.exception_raised(exception=e, function=func) diff --git a/src/aimcore/error_handling.py b/src/aimcore/error_handling.py new file mode 100644 index 0000000000..3722f2facc --- /dev/null +++ b/src/aimcore/error_handling.py @@ -0,0 +1,46 @@ +import logging + +from functools import wraps +from typing import Type, Optional + +logger = logging.getLogger(__name__) + + +class _SafeModeConfig: + @staticmethod + def log_exception(e: Exception, func: callable): + logger.warning(f'Exception "{str(e)}" raised in function "{func.__name__}"') + + @staticmethod + def reraise_exception(e: Exception, func: callable): + raise e + + exception_callback = reraise_exception + + +def enable_safe_mode(): + _SafeModeConfig.exception_callback = _SafeModeConfig.log_exception + + +def disable_safe_mode(): + _SafeModeConfig.exception_callback = _SafeModeConfig.reraise_exception + + +def set_exception_callback(callback: callable): + _SafeModeConfig.exception_callback = callback + + +def handle_exception(exc_type: Type[Exception], error_message: Optional[str] = None): + def inner(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except exc_type as e: # noqa + if error_message is not None: + logger.error(error_message) + raise RuntimeError(error_message) + else: # silent mode + pass + return wrapper + return inner diff --git a/src/aimcore/transport/client.py b/src/aimcore/transport/client.py index debafd399c..1db20904ce 100644 --- a/src/aimcore/transport/client.py +++ b/src/aimcore/transport/client.py @@ -9,6 +9,7 @@ from typing import Tuple from websockets.sync.client import connect +from aimcore.error_handling import handle_exception from aimcore.transport.message_utils import raise_exception, pack_args, unpack_stream, unpack_args from aimcore.transport.rpc_queue import RpcQueueWithRetry from aimcore.transport.heartbeat import HeartbeatSender @@ -124,6 +125,8 @@ def client_heartbeat(self): return response + @handle_exception(requests.ConnectionError, + error_message='Failed to connect to Aim Server. Have you forgot to run `aim server` command?') def connect(self): endpoint = f'{self._http_protocol}{self._client_endpoint}/connect/{self.uri}/' response = requests.get(endpoint) diff --git a/src/python/aim/_core/storage/rockscontainer.pyx b/src/python/aim/_core/storage/rockscontainer.pyx index 039137a351..ffa9b9266d 100644 --- a/src/python/aim/_core/storage/rockscontainer.pyx +++ b/src/python/aim/_core/storage/rockscontainer.pyx @@ -9,7 +9,6 @@ import aimrocks from typing import Iterator, Optional, Tuple from aimcore.cleanup import AutoClean -from aim._ext.exception_resistant import exception_resistant from aim._core.storage.locking import SoftFileLock, NoopLock from aim._core.storage.types import BLOB from aim._core.storage.container import Container, ContainerKey, ContainerValue, ContainerItemsIterator @@ -156,15 +155,14 @@ class RocksContainer(Container): self._lock.acquire() secondary_path = None else: - self.optimize_for_read() secondary_path = tempfile.mkdtemp() + self._db = aimrocks.DB(str(self.path), aimrocks.Options(**self._db_opts), read_only=self.read_only, secondary_path=secondary_path) return self._db - def finalize(self, index: Container): """Finalize the Container. @@ -552,41 +550,6 @@ class RocksContainer(Container): return key, value - def optimize_for_read(self): - optimize_db_for_read(self.path, self._db_opts, run_compactions=self._extra_opts.get('compaction', False)) - - -@exception_resistant(silent=True) -def optimize_db_for_read(path: Path, options: dict, run_compactions: bool = False): - """ - This function will try to open rocksdb db in write mode and force WAL files recovery. Once done the underlying - db will contain .sst files only which will significantly reduce further open and read operations. Further - optimizations can be done by running compactions but this is a costly operation to be performed online. - - - Args: - path (:obj:`Path`): Path to rocksdb. - options (:obj:`dict`): options to be passed to aimrocks.DB object __init__. - run_compactions (:obj:`bool`, optional): Flag used to run rocksdb range compactions. False by default. - """ - - def non_empty_wal(): - for wal_path in path.glob('*.log'): - if os.path.getsize(wal_path) > 0: - return True - return False - - if non_empty_wal(): - lock_path = prepare_lock_path(path) - - with SoftFileLock(lock_path, timeout=0): - wdb = aimrocks.DB(str(path), aimrocks.Options(**options), read_only=False) - wdb.flush() - wdb.flush_wal() - if run_compactions: - wdb.compact_range() - del wdb - def prepare_lock_path(path: Path): """ diff --git a/src/python/aim/_core/storage/union.pyx b/src/python/aim/_core/storage/union.pyx index de895bbdd9..e089de5a7c 100644 --- a/src/python/aim/_core/storage/union.pyx +++ b/src/python/aim/_core/storage/union.pyx @@ -10,7 +10,7 @@ from pathlib import Path from aim._core.storage.encoding import encode_path from aim._core.storage.container import Container, ContainerItemsIterator from aim._core.storage.prefixview import PrefixView -from aim._core.storage.rockscontainer import RocksContainer, optimize_db_for_read +from aim._core.storage.rockscontainer import RocksContainer from typing import Dict, List, NamedTuple, Tuple @@ -161,7 +161,6 @@ class DB(object): ): db = cache.get(prefix) if db is None: - optimize_db_for_read(Path(path), self.opts) db = aimrocks.DB(path, opts=aimrocks.Options(**self.opts), read_only=True) if store is not None: store[prefix] = db diff --git a/src/python/aim/_core/utils/__init__.py b/src/python/aim/_core/utils/__init__.py index aa19fa452a..e69de29bb2 100644 --- a/src/python/aim/_core/utils/__init__.py +++ b/src/python/aim/_core/utils/__init__.py @@ -1 +0,0 @@ -from aim._ext.exception_resistant import enable_safe_mode, disable_safe_mode # noqa diff --git a/src/python/aim/_ext/exception_resistant.py b/src/python/aim/_ext/exception_resistant.py deleted file mode 100644 index 347bf53cf7..0000000000 --- a/src/python/aim/_ext/exception_resistant.py +++ /dev/null @@ -1,72 +0,0 @@ -import logging - -from functools import wraps - - -logger = logging.getLogger(__name__) - - -def exception_resistant(silent: bool): - def inner(func): - if not silent: - num_fails = 0 - max_fails = 6 - - @wraps(func) - def wrapper(*args, **kwargs): - nonlocal num_fails - func_name = func.__name__ - try: - return func(*args, **kwargs) - except Exception as e: - num_fails += 1 - if num_fails == 1: - print('Something went wrong in `{}`. The process will continue to execute.'.format(func_name)) - if num_fails <= max_fails: - print('`{}`: {}'.format(func_name, e)) - elif num_fails == max_fails + 1: - print('The rest of the `{}` errors are hidden.'.format(func_name)) - else: - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception: - pass - return wrapper - return inner - - -class _SafeModeConfig: - @staticmethod - def log_exception(e: Exception, func: callable): - logger.warning(f'Exception "{str(e)}" raised in function "{func.__name__}"') - - @staticmethod - def reraise_exception(e: Exception, func: callable): - raise e - - exception_callback = reraise_exception - - -def enable_safe_mode(): - _SafeModeConfig.exception_callback = _SafeModeConfig.log_exception - - -def disable_safe_mode(): - _SafeModeConfig.exception_callback = _SafeModeConfig.reraise_exception - - -def set_exception_callback(callback: callable): - _SafeModeConfig.exception_callback = callback - - -def noexcept(func): - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - _SafeModeConfig.exception_callback(e, func) - - return wrapper