From d44a9b553be914c3676b639e9ff0921d2c3f0371 Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 28 Mar 2024 10:26:06 +0000 Subject: [PATCH] Move retry logic to lithops Lithops doesn't have group names so use a dict to track --- cubed/runtime/executors/lithops.py | 22 ++- cubed/runtime/executors/lithops_retries.py | 187 -------------------- cubed/tests/runtime/test_lithops.py | 2 +- cubed/tests/runtime/test_lithops_retries.py | 2 +- docs/requirements.txt | 2 +- pyproject.toml | 6 +- 6 files changed, 16 insertions(+), 205 deletions(-) delete mode 100644 cubed/runtime/executors/lithops_retries.py diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index f9974fa32..771c14c7e 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -17,14 +17,11 @@ ) from lithops.executors import FunctionExecutor +from lithops.retries import RetryingFunctionExecutor, RetryingFuture from lithops.wait import ALWAYS, ANY_COMPLETED from networkx import MultiDiGraph from cubed.runtime.backup import should_launch_backup, use_backups_default -from cubed.runtime.executors.lithops_retries import ( - RetryingFunctionExecutor, - RetryingFuture, -) from cubed.runtime.pipeline import visit_node_generations, visit_nodes from cubed.runtime.types import Callback, DagExecutor from cubed.runtime.utils import ( @@ -79,6 +76,7 @@ def map_unordered( return_when = ALWAYS if use_backups else ANY_COMPLETED wait_dur_sec = wait_dur_sec or 1 + future_to_group_name: Dict[str, str] = {} group_name_to_function: Dict[str, Callable[..., Any]] = {} # backups are launched based on task start and end times for the group start_times: Dict[str, Dict[RetryingFuture, float]] = {} @@ -97,13 +95,13 @@ def map_unordered( futures = lithops_function_executor.map( partial_map_function, - map_iterdata, + list(map_iterdata), # lithops requires a list timeout=timeout, include_modules=include_modules, retries=retries, - group_name=group_name, ) start_times[group_name] = {k: time.monotonic() for k in futures} + future_to_group_name.update({k: group_name for k in futures}) pending.extend(futures) while pending: @@ -122,10 +120,10 @@ def map_unordered( if not backup.done or not backup.error: continue future.status(throw_except=True) - group_name = future.group_name # type: ignore[assignment] + group_name = future_to_group_name[future] # type: ignore[assignment] end_times[group_name][future] = time.monotonic() if return_stats: - yield future.result(), standardise_lithops_stats(future) + yield future.result(), standardise_lithops_stats(group_name, future) else: yield future.result() @@ -142,7 +140,7 @@ def map_unordered( if use_backups: now = time.monotonic() for future in copy.copy(pending): - group_name = future.group_name # type: ignore[assignment] + group_name = future_to_group_name[future] # type: ignore[assignment] if future not in backups and should_launch_backup( future, now, start_times[group_name], end_times[group_name] ): @@ -154,11 +152,11 @@ def map_unordered( timeout=timeout, include_modules=include_modules, retries=0, # don't retry backup tasks - group_name=group_name, ) start_times[group_name].update( {k: time.monotonic() for k in futures} ) + future_to_group_name.update({k: group_name for k in futures}) pending.extend(futures) backup = futures[0] backups[future] = backup @@ -237,10 +235,10 @@ def execute_dag( handle_callbacks(callbacks, result, stats) -def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]: +def standardise_lithops_stats(name: str, future: RetryingFuture) -> Dict[str, Any]: stats = future.stats return dict( - name=future.group_name, + name=name, task_create_tstamp=stats["host_job_create_tstamp"], function_start_tstamp=stats["worker_func_start_tstamp"], function_end_tstamp=stats["worker_func_end_tstamp"], diff --git a/cubed/runtime/executors/lithops_retries.py b/cubed/runtime/executors/lithops_retries.py deleted file mode 100644 index ba9f29c93..000000000 --- a/cubed/runtime/executors/lithops_retries.py +++ /dev/null @@ -1,187 +0,0 @@ -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union - -from lithops import FunctionExecutor -from lithops.future import ResponseFuture -from lithops.wait import ( - ALL_COMPLETED, - ALWAYS, - ANY_COMPLETED, - THREADPOOL_SIZE, - WAIT_DUR_SEC, -) -from six import reraise - - -class RetryingFuture: - """ - A wrapper around Lithops `ResponseFuture` that takes care of retries. - """ - - def __init__( - self, - response_future: ResponseFuture, - map_function: Callable[..., Any], - input: Any, - map_kwargs: Any = None, - retries: Optional[int] = None, - group_name: Optional[str] = None, - ): - self.response_future = response_future - self.map_function = map_function - self.input = input - self.map_kwargs = map_kwargs or {} - self.retries = retries or 0 - self.group_name = group_name - self.failure_count = 0 - self.cancelled = False - - def _inc_failure_count(self): - self.failure_count += 1 - - def _should_retry(self): - return not self.cancelled and self.failure_count <= self.retries - - def _retry(self, function_executor: FunctionExecutor): - inputs = [self.input] - futures_list = function_executor.map( - self.map_function, inputs, **self.map_kwargs - ) - self.response_future = futures_list[0] - - def cancel(self): - # cancelling will prevent any further retries, but won't affect any running tasks - self.cancelled = True - - @property - def done(self): - return self.response_future.done - - @property - def error(self): - return self.response_future.error - - @property - def _exception(self): - return self.response_future._exception - - @property - def stats(self): - return self.response_future.stats - - def status( - self, - throw_except: bool = True, - internal_storage: Any = None, - check_only: bool = False, - ): - stat = self.response_future.status( - throw_except=throw_except, - internal_storage=internal_storage, - check_only=check_only, - ) - if self.response_future.error: - reraise(*self.response_future._exception) - return stat - - def result(self, throw_except: bool = True, internal_storage: Any = None): - res = self.response_future.result( - throw_except=throw_except, internal_storage=internal_storage - ) - if self.response_future.error: - reraise(*self.response_future._exception) - return res - - -class RetryingFunctionExecutor: - """ - A wrapper around Lithops `FunctionExecutor` that supports retries. - """ - - def __init__(self, executor: FunctionExecutor): - self.executor = executor - - def __enter__(self): - self.executor.__enter__() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.executor.__exit__(exc_type, exc_value, traceback) - - def map( - self, - map_function: Callable[..., Any], - map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]], - *, - retries: Optional[int] = None, - group_name: Optional[str] = None, - **kwargs, - ) -> List[RetryingFuture]: - inputs = list(map_iterdata) - futures_list = self.executor.map( - map_function, - inputs, - **kwargs, - ) - return [ - RetryingFuture( - f, - map_function=map_function, - input=i, - map_kwargs=kwargs, - retries=retries, - group_name=group_name, - ) - for i, f in zip(inputs, futures_list) - ] - - def wait( - self, - fs: List[RetryingFuture], - throw_except: Optional[bool] = True, - return_when: Optional[Any] = ALL_COMPLETED, - download_results: Optional[bool] = False, - timeout: Optional[int] = None, - threadpool_size: Optional[int] = THREADPOOL_SIZE, - wait_dur_sec: Optional[int] = WAIT_DUR_SEC, - show_progressbar: Optional[bool] = True, - ) -> Tuple[List[RetryingFuture], List[RetryingFuture]]: - lookup = {f.response_future: f for f in fs} - - while True: - response_futures = [f.response_future for f in fs] - - done, pending = self.executor.wait( - response_futures, - throw_except=throw_except, - return_when=return_when, - download_results=download_results, - timeout=timeout, - threadpool_size=threadpool_size, - wait_dur_sec=wait_dur_sec, - show_progressbar=show_progressbar, - ) - - retrying_done = [] - retrying_pending = [lookup[response_future] for response_future in pending] - for response_future in done: - retrying_future = lookup[response_future] - if response_future.error: - retrying_future._inc_failure_count() - if retrying_future._should_retry(): - retrying_future._retry(self.executor) - # put back into pending since we are retrying this input - retrying_pending.append(retrying_future) - lookup[retrying_future.response_future] = retrying_future - else: - retrying_done.append(retrying_future) - else: - retrying_done.append(retrying_future) - - if return_when == ALWAYS: - break - elif return_when == ANY_COMPLETED and len(retrying_done) > 0: - break - elif return_when == ALL_COMPLETED and len(retrying_pending) == 0: - break - - return retrying_done, retrying_pending diff --git a/cubed/tests/runtime/test_lithops.py b/cubed/tests/runtime/test_lithops.py index 56838f061..723bf3641 100644 --- a/cubed/tests/runtime/test_lithops.py +++ b/cubed/tests/runtime/test_lithops.py @@ -8,9 +8,9 @@ pytest.importorskip("lithops") from lithops.executors import LocalhostExecutor +from lithops.retries import RetryingFunctionExecutor from cubed.runtime.executors.lithops import map_unordered -from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor def run_test(function, input, retries, timeout=10, use_backups=False): diff --git a/cubed/tests/runtime/test_lithops_retries.py b/cubed/tests/runtime/test_lithops_retries.py index 749e78491..e51e43e37 100644 --- a/cubed/tests/runtime/test_lithops_retries.py +++ b/cubed/tests/runtime/test_lithops_retries.py @@ -3,8 +3,8 @@ pytest.importorskip("lithops") from lithops.executors import LocalhostExecutor +from lithops.retries import RetryingFunctionExecutor -from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure diff --git a/docs/requirements.txt b/docs/requirements.txt index f9fea4a2b..163a74128 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,7 +1,7 @@ # cubed apache-beam fsspec -lithops[aws] >= 2.7.0 +lithops[aws] >= 3.3.0 modal mypy_extensions # for rechunker networkx != 2.8.3, != 2.8.4, != 2.8.5, != 2.8.6, != 2.8.7, != 2.8.8, != 3.0.*, != 3.1.*, != 3.2.* diff --git a/pyproject.toml b/pyproject.toml index a840fc282..bea9f52a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,15 +54,15 @@ beam = ["apache-beam", "gcsfs"] dask = ["dask < 2024.12.0"] dask-distributed = ["distributed < 2024.12.0"] icechunk = ["icechunk"] -lithops = ["lithops[aws] >= 2.7.0"] +lithops = ["lithops[aws] >= 3.3.0"] lithops-aws = [ "cubed[diagnostics]", - "lithops[aws]", + "lithops[aws] >= 3.3.0", "s3fs", ] lithops-gcp = [ "cubed[diagnostics]", - "lithops[gcp]", + "lithops[gcp] >= 3.3.0", "gcsfs", ] modal = [