Skip to content

Commit

Permalink
Change Backend Names (#500)
Browse files Browse the repository at this point in the history
* Change Backend Names

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update __init__.py

* fix flux tests

* fix flux submission test

* fix input checks

* fix notebooks

* Add more tests

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Nov 15, 2024
1 parent c1b8b22 commit 294965d
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 28 deletions.
2 changes: 1 addition & 1 deletion executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def __new__(
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if "pysqa_" in backend and not plot_dependency_graph:
if "_submission" in backend and not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor

return create_file_executor(
Expand Down
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(

def create_file_executor(
max_workers: int = 1,
backend: str = "pysqa_flux",
backend: str = "flux_submission",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
Expand Down Expand Up @@ -113,6 +113,6 @@ def create_file_executor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
backend=backend.split("_submission")[0],
disable_dependencies=disable_dependencies,
)
14 changes: 9 additions & 5 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ def create_executor(
init_function (None): optional function to preset arguments for functions which are submitted later
"""
check_init_function(block_allocation=block_allocation, init_function=init_function)
if flux_executor is not None and backend != "flux":
backend = "flux"
if flux_executor is not None and backend != "flux_allocation":
backend = "flux_allocation"
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
cores_per_worker = resource_dict["cores"]
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
if backend == "flux":
if backend == "flux_allocation":
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down Expand Up @@ -233,7 +233,7 @@ def create_executor(
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
elif backend == "slurm":
elif backend == "slurm_allocation":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
if block_allocation:
Expand All @@ -255,7 +255,7 @@ def create_executor(
executor_kwargs=resource_dict,
spawner=SrunSpawner,
)
else: # backend="local"
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
Expand Down Expand Up @@ -285,3 +285,7 @@ def create_executor(
executor_kwargs=resource_dict,
spawner=MpiExecSpawner,
)
else:
raise ValueError(
"The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local."
)
4 changes: 2 additions & 2 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def check_pmi(backend: str, pmi: Optional[str]) -> None:
"""
Check if pmi is valid for the selected backend and raise a ValueError if it is not.
"""
if backend != "flux" and pmi is not None:
if backend != "flux_allocation" and pmi is not None:
raise ValueError("The pmi parameter is currently only implemented for flux.")
elif backend == "flux" and pmi not in ["pmix", "pmi1", "pmi2", None]:
elif backend == "flux_allocation" and pmi not in ["pmix", "pmi1", "pmi2", None]:
raise ValueError(
"The pmi parameter supports [pmix, pmi1, pmi2], but not: " + pmi
)
Expand Down
22 changes: 11 additions & 11 deletions notebooks/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"source": [
"from executorlib import Executor\n",
"\n",
"with Executor(max_cores=1, backend=\"flux\") as exe:\n",
"with Executor(max_cores=1, backend=\"flux_allocation\") as exe:\n",
" future = exe.submit(sum, [1, 1])\n",
" print(future.result())"
]
Expand Down Expand Up @@ -103,7 +103,7 @@
" return sum(*args)\n",
"\n",
"\n",
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
"with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n",
" fs_1 = exe.submit(calc, [2, 1])\n",
" fs_2 = exe.submit(calc, [2, 2])\n",
" fs_3 = exe.submit(calc, [2, 3])\n",
Expand Down Expand Up @@ -159,7 +159,7 @@
" return sum(*args)\n",
"\n",
"\n",
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
"with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n",
" print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))"
]
},
Expand Down Expand Up @@ -205,7 +205,7 @@
" with Executor(\n",
" # Resource definition on the executor level\n",
" max_workers=2, # total number of cores available to the Executor\n",
" backend=\"flux\", # optional in case the backend is not recognized\n",
" backend=\"flux_allocation\", # optional in case the backend is not recognized\n",
" # Optional resource definition\n",
" resource_dict={\n",
" \"cores\": 1,\n",
Expand Down Expand Up @@ -277,7 +277,7 @@
" # Resource definition on the executor level\n",
" max_cores=2, # total number of cores available to the Executor\n",
" block_allocation=True, # reuse python processes\n",
" backend=\"flux\",\n",
" backend=\"flux_allocation\",\n",
") as exe:\n",
" future_obj = exe.submit(\n",
" calc_function,\n",
Expand Down Expand Up @@ -332,7 +332,7 @@
"with Executor(\n",
" max_cores=1,\n",
" init_function=init_function,\n",
" backend=\"flux\",\n",
" backend=\"flux_allocation\",\n",
" block_allocation=True,\n",
") as exe:\n",
" fs = exe.submit(calc, 2, j=5)\n",
Expand Down Expand Up @@ -462,7 +462,7 @@
"with Executor(\n",
" max_cores=2,\n",
" resource_dict={\"cores\": 2},\n",
" backend=\"flux\",\n",
" backend=\"flux_allocation\",\n",
" flux_executor_pmi_mode=\"pmix\",\n",
") as exe:\n",
" fs = exe.submit(calc, 3)\n",
Expand Down Expand Up @@ -519,7 +519,7 @@
"with Executor(\n",
" max_workers=2, \n",
" gpus_per_worker=1,\n",
" backend=\"flux\",\n",
" backend=\"flux_allocation\",\n",
") as exe:\n",
" fs_1 = exe.submit(get_available_gpus)\n",
" fs_2 = exe.submit(get_available_gpus)\n",
Expand Down Expand Up @@ -627,7 +627,7 @@
" return parameter_a + parameter_b\n",
"\n",
"\n",
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
"with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n",
" future_1 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
Expand Down Expand Up @@ -672,7 +672,7 @@
"```\n",
"from executorlib import Executor\n",
"\n",
"with Executor(max_cores=1, backend=\"slurm\") as exe:\n",
"with Executor(max_cores=1, backend=\"slurm_allocation\") as exe:\n",
" future = exe.submit(sum, [1,1])\n",
" print(future.result())\n",
"```"
Expand All @@ -683,7 +683,7 @@
"id": "ae8dd860-f90f-47b4-b3e5-664f5c949350",
"metadata": {},
"source": [
"The `backend=\"slurm\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n",
"The `backend=\"slurm_allocation\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n",
"or SLURM are available. \n",
"\n",
"In addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\n",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def mpi_funct(i):
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with Executor(
backend="pysqa_flux",
backend="flux_submission",
resource_dict={"cores": 2, "cwd": "cache"},
block_allocation=False,
cache_directory="cache",
Expand Down
4 changes: 4 additions & 0 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def test_executor_dependency_plot(self):
self.assertEqual(len(nodes), 5)
self.assertEqual(len(edges), 4)

def test_create_executor_error(self):
with self.assertRaises(ValueError):
create_executor(backend="toast", resource_dict={"cores": 1})

def test_dependency_steps(self):
cloudpickle_register(ind=1)
fs1 = Future()
Expand Down
10 changes: 5 additions & 5 deletions tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_flux_executor_serial(self):
with Executor(
max_cores=2,
flux_executor=self.executor,
backend="flux",
backend="flux_allocation",
block_allocation=True,
) as exe:
fs_1 = exe.submit(calc, 1)
Expand All @@ -62,7 +62,7 @@ def test_flux_executor_threads(self):
max_cores=1,
resource_dict={"threads_per_core": 2},
flux_executor=self.executor,
backend="flux",
backend="flux_allocation",
block_allocation=True,
) as exe:
fs_1 = exe.submit(calc, 1)
Expand All @@ -77,7 +77,7 @@ def test_flux_executor_parallel(self):
max_cores=2,
resource_dict={"cores": 2},
flux_executor=self.executor,
backend="flux",
backend="flux_allocation",
block_allocation=True,
flux_executor_pmi_mode=pmi,
) as exe:
Expand All @@ -90,7 +90,7 @@ def test_single_task(self):
max_cores=2,
resource_dict={"cores": 2},
flux_executor=self.executor,
backend="flux",
backend="flux_allocation",
block_allocation=True,
flux_executor_pmi_mode=pmi,
) as p:
Expand All @@ -106,7 +106,7 @@ def test_internal_memory(self):
resource_dict={"cores": 1},
init_function=set_global,
flux_executor=self.executor,
backend="flux",
backend="flux_allocation",
block_allocation=True,
) as p:
f = p.submit(get_global)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_check_pmi(self):
with self.assertRaises(ValueError):
check_pmi(backend="test", pmi="test")
with self.assertRaises(ValueError):
check_pmi(backend="flux", pmi="test")
check_pmi(backend="flux_allocation", pmi="test")

def test_check_nested_flux_executor(self):
with self.assertRaises(ValueError):
Expand Down

0 comments on commit 294965d

Please sign in to comment.