Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move retry logic to lithops #673

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
)

from lithops.executors import FunctionExecutor
from lithops.retries import RetryingFunctionExecutor, RetryingFuture
from lithops.wait import ALWAYS, ANY_COMPLETED
from networkx import MultiDiGraph

from cubed.runtime.backup import should_launch_backup, use_backups_default
from cubed.runtime.executors.lithops_retries import (
RetryingFunctionExecutor,
RetryingFuture,
)
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import (
Expand Down Expand Up @@ -79,6 +76,7 @@ def map_unordered(
return_when = ALWAYS if use_backups else ANY_COMPLETED
wait_dur_sec = wait_dur_sec or 1

future_to_group_name: Dict[str, str] = {}
group_name_to_function: Dict[str, Callable[..., Any]] = {}
# backups are launched based on task start and end times for the group
start_times: Dict[str, Dict[RetryingFuture, float]] = {}
Expand All @@ -97,13 +95,13 @@ def map_unordered(

futures = lithops_function_executor.map(
partial_map_function,
map_iterdata,
list(map_iterdata), # lithops requires a list
timeout=timeout,
include_modules=include_modules,
retries=retries,
group_name=group_name,
)
start_times[group_name] = {k: time.monotonic() for k in futures}
future_to_group_name.update({k: group_name for k in futures})
pending.extend(futures)

while pending:
Expand All @@ -122,10 +120,10 @@ def map_unordered(
if not backup.done or not backup.error:
continue
future.status(throw_except=True)
group_name = future.group_name # type: ignore[assignment]
group_name = future_to_group_name[future] # type: ignore[assignment]
end_times[group_name][future] = time.monotonic()
if return_stats:
yield future.result(), standardise_lithops_stats(future)
yield future.result(), standardise_lithops_stats(group_name, future)
else:
yield future.result()

Expand All @@ -142,7 +140,7 @@ def map_unordered(
if use_backups:
now = time.monotonic()
for future in copy.copy(pending):
group_name = future.group_name # type: ignore[assignment]
group_name = future_to_group_name[future] # type: ignore[assignment]
if future not in backups and should_launch_backup(
future, now, start_times[group_name], end_times[group_name]
):
Expand All @@ -154,11 +152,11 @@ def map_unordered(
timeout=timeout,
include_modules=include_modules,
retries=0, # don't retry backup tasks
group_name=group_name,
)
start_times[group_name].update(
{k: time.monotonic() for k in futures}
)
future_to_group_name.update({k: group_name for k in futures})
pending.extend(futures)
backup = futures[0]
backups[future] = backup
Expand Down Expand Up @@ -237,10 +235,10 @@ def execute_dag(
handle_callbacks(callbacks, result, stats)


def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
def standardise_lithops_stats(name: str, future: RetryingFuture) -> Dict[str, Any]:
stats = future.stats
return dict(
name=future.group_name,
name=name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
Expand Down
187 changes: 0 additions & 187 deletions cubed/runtime/executors/lithops_retries.py

This file was deleted.

2 changes: 1 addition & 1 deletion cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
pytest.importorskip("lithops")

from lithops.executors import LocalhostExecutor
from lithops.retries import RetryingFunctionExecutor

from cubed.runtime.executors.lithops import map_unordered
from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor


def run_test(function, input, retries, timeout=10, use_backups=False):
Expand Down
2 changes: 1 addition & 1 deletion cubed/tests/runtime/test_lithops_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
pytest.importorskip("lithops")

from lithops.executors import LocalhostExecutor
from lithops.retries import RetryingFunctionExecutor

from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor
from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure


Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# cubed
apache-beam
fsspec
lithops[aws] >= 2.7.0
lithops[aws] >= 3.3.0
modal
mypy_extensions # for rechunker
networkx != 2.8.3, != 2.8.4, != 2.8.5, != 2.8.6, != 2.8.7, != 2.8.8, != 3.0.*, != 3.1.*, != 3.2.*
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ beam = ["apache-beam", "gcsfs"]
dask = ["dask < 2024.12.0"]
dask-distributed = ["distributed < 2024.12.0"]
icechunk = ["icechunk"]
lithops = ["lithops[aws] >= 2.7.0"]
lithops = ["lithops[aws] >= 3.3.0"]
lithops-aws = [
"cubed[diagnostics]",
"lithops[aws]",
"lithops[aws] >= 3.3.0",
"s3fs",
]
lithops-gcp = [
"cubed[diagnostics]",
"lithops[gcp]",
"lithops[gcp] >= 3.3.0",
"gcsfs",
]
modal = [
Expand Down
Loading