Skip to content

Commit 9221117

Browse files
committed
Add function to create executors by name and keyword arguments
1 parent 2a55838 commit 9221117

File tree

3 files changed

+67
-28
lines changed

3 files changed

+67
-28
lines changed

cubed/runtime/create.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Optional
2+
3+
from cubed.runtime.types import Executor
4+
5+
6+
def create_executor(name: str, executor_options: Optional[dict] = None) -> Executor:
7+
"""Create an executor from an executor name."""
8+
executor_options = executor_options or {}
9+
if name == "beam":
10+
from cubed.runtime.executors.beam import BeamDagExecutor
11+
12+
return BeamDagExecutor(**executor_options)
13+
elif name == "coiled":
14+
from cubed.runtime.executors.coiled import CoiledFunctionsDagExecutor
15+
16+
return CoiledFunctionsDagExecutor(**executor_options)
17+
elif name == "dask":
18+
from cubed.runtime.executors.dask_distributed_async import (
19+
AsyncDaskDistributedExecutor,
20+
)
21+
22+
return AsyncDaskDistributedExecutor(**executor_options)
23+
elif name == "lithops":
24+
from cubed.runtime.executors.lithops import LithopsDagExecutor
25+
26+
return LithopsDagExecutor(**executor_options)
27+
elif name == "modal":
28+
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor
29+
30+
return AsyncModalDagExecutor(**executor_options)
31+
elif name == "modal-sync":
32+
from cubed.runtime.executors.modal import ModalDagExecutor
33+
34+
return ModalDagExecutor(**executor_options)
35+
elif name == "single-threaded":
36+
from cubed.runtime.executors.python import PythonDagExecutor
37+
38+
return PythonDagExecutor(**executor_options)
39+
elif name == "threads":
40+
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
41+
42+
return AsyncPythonDagExecutor(**executor_options)
43+
else:
44+
raise ValueError(f"Unrecognized executor name: {name}")

cubed/spec.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Optional, Union
22

3+
from cubed.runtime.create import create_executor
34
from cubed.runtime.types import Executor
45
from cubed.utils import convert_to_bytes
56

@@ -13,6 +14,8 @@ def __init__(
1314
allowed_mem: Union[int, str, None] = None,
1415
reserved_mem: Union[int, str, None] = 0,
1516
executor: Union[Executor, None] = None,
17+
executor_name: Optional[str] = None,
18+
executor_options: Optional[dict] = None,
1619
storage_options: Union[dict, None] = None,
1720
):
1821
"""
@@ -45,7 +48,13 @@ def __init__(
4548
else:
4649
self._allowed_mem = convert_to_bytes(allowed_mem)
4750

48-
self._executor = executor
51+
if executor is not None:
52+
self._executor = executor
53+
elif executor_name is not None:
54+
self._executor = create_executor(executor_name, executor_options)
55+
else:
56+
self._executor = None
57+
4958
self._storage_options = storage_options
5059

5160
@property

cubed/tests/utils.py

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,60 +5,46 @@
55
import numpy as np
66
import zarr
77

8-
from cubed.runtime.executors.python import PythonDagExecutor
9-
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
8+
from cubed.runtime.create import create_executor
109
from cubed.runtime.types import Callback
1110

1211
LITHOPS_LOCAL_CONFIG = {"lithops": {"backend": "localhost", "storage": "localhost"}}
1312

14-
ALL_EXECUTORS = [PythonDagExecutor()]
13+
ALL_EXECUTORS = [create_executor("single-threaded")]
1514

1615
# don't run all tests on every executor as it's too slow, so just have a subset
17-
MAIN_EXECUTORS = [PythonDagExecutor()]
16+
MAIN_EXECUTORS = [create_executor("single-threaded")]
1817

1918

2019
if platform.system() != "Windows":
2120
# AsyncPythonDagExecutor calls `peak_measured_mem` which is not supported on Windows
22-
ALL_EXECUTORS.append(AsyncPythonDagExecutor())
21+
ALL_EXECUTORS.append(create_executor("threads"))
2322

2423

2524
try:
26-
from cubed.runtime.executors.beam import BeamDagExecutor
27-
28-
ALL_EXECUTORS.append(BeamDagExecutor())
29-
30-
MAIN_EXECUTORS.append(BeamDagExecutor())
25+
ALL_EXECUTORS.append(create_executor("beam"))
26+
MAIN_EXECUTORS.append(create_executor("beam"))
3127
except ImportError:
3228
pass
3329

3430
try:
35-
from cubed.runtime.executors.dask_distributed_async import (
36-
AsyncDaskDistributedExecutor,
37-
)
38-
39-
ALL_EXECUTORS.append(AsyncDaskDistributedExecutor())
40-
41-
MAIN_EXECUTORS.append(AsyncDaskDistributedExecutor())
31+
ALL_EXECUTORS.append(create_executor("dask"))
32+
MAIN_EXECUTORS.append(create_executor("dask"))
4233
except ImportError:
4334
pass
4435

4536
try:
46-
from cubed.runtime.executors.lithops import LithopsDagExecutor
47-
48-
ALL_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG))
49-
50-
MAIN_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG))
37+
executor_options = dict(config=LITHOPS_LOCAL_CONFIG)
38+
ALL_EXECUTORS.append(create_executor("lithops", executor_options))
39+
MAIN_EXECUTORS.append(create_executor("lithops", executor_options))
5140
except ImportError:
5241
pass
5342

5443
MODAL_EXECUTORS = []
5544

5645
try:
57-
from cubed.runtime.executors.modal import ModalDagExecutor
58-
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor
59-
60-
MODAL_EXECUTORS.append(AsyncModalDagExecutor())
61-
MODAL_EXECUTORS.append(ModalDagExecutor())
46+
MODAL_EXECUTORS.append(create_executor("modal"))
47+
MODAL_EXECUTORS.append(create_executor("modal-sync"))
6248
except ImportError:
6349
pass
6450

0 commit comments

Comments
 (0)