Skip to content

Commit

Permalink
Rename JobQueue to Scheduler and move job_queue module to scheduler m…
Browse files Browse the repository at this point in the history
…odule
  • Loading branch information
xjules committed Nov 28, 2023
1 parent a87339f commit ff3a55b
Show file tree
Hide file tree
Showing 31 changed files with 134 additions and 151 deletions.
2 changes: 1 addition & 1 deletion src/ert/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from typing import TYPE_CHECKING

from ert.job_queue import WorkflowRunner
from ert.scheduler import WorkflowRunner

if TYPE_CHECKING:
from ert.enkf_main import EnKFMain
Expand Down
16 changes: 3 additions & 13 deletions src/ert/enkf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,17 @@
from copy import copy
from datetime import datetime
from pathlib import Path
from typing import (
TYPE_CHECKING,
Dict,
Iterable,
List,
Mapping,
Optional,
Union,
)
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Union

import numpy as np
from numpy.random import SeedSequence

from .analysis.configuration import UpdateConfiguration, UpdateStep
from .config import (
ParameterConfig,
)
from .job_queue import WorkflowRunner
from .config import ParameterConfig
from .realization_state import RealizationState
from .run_context import RunContext
from .runpaths import Runpaths
from .scheduler import WorkflowRunner
from .substitution_list import SubstitutionList

if TYPE_CHECKING:
Expand Down
20 changes: 10 additions & 10 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from ert.async_utils import get_event_loop
from ert.ensemble_evaluator import identifiers
from ert.job_queue import JobQueue
from ert.scheduler import Scheduler

from .._wait_for_evaluator import wait_for_evaluator
from ._ensemble import Ensemble
Expand Down Expand Up @@ -42,7 +42,7 @@ def __init__(
raise ValueError(f"{self} needs queue_config")
if not analysis_config:
raise ValueError(f"{self} needs analysis_config")
self._job_queue = JobQueue(queue_config)
self._scheduler = Scheduler(queue_config)
self._analysis_config = analysis_config
self._config: Optional[EvaluatorServerConfig] = None

Expand Down Expand Up @@ -147,10 +147,10 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
"""
This (inner) coroutine does the actual work of evaluating the ensemble. It
prepares and executes the necessary bookkeeping, prepares and executes
the JobQueue, and dispatches pertinent events.
the Scheduler, and dispatches pertinent events.
Before returning, it always dispatches a CloudEvent describing
the final result of executing all its jobs through a JobQueue.
the final result of executing all its jobs through a Scheduler.
cloudevent_unary_send determines how CloudEvents are dispatched. This
is a function (or bound method) that only takes a CloudEvent as a positional
Expand All @@ -176,7 +176,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
)

for real in self.active_reals:
self._job_queue.add_realization(real, callback_timeout=on_timeout)
self._scheduler.add_realization(real, callback_timeout=on_timeout)

# TODO: this is sort of a callback being preemptively called.
# It should be lifted out of the queue/evaluate, into the evaluator. If
Expand All @@ -190,12 +190,12 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
):
queue_evaluators = [
partial(
self._job_queue.stop_long_running_realizations,
self._scheduler.stop_long_running_realizations,
self._analysis_config.minimum_required_realizations,
)
]

self._job_queue.set_ee_info(
self._scheduler.set_ee_info(
ee_uri=self._config.dispatch_uri,
ens_id=self.id_,
ee_cert=self._config.cert,
Expand All @@ -204,9 +204,9 @@ async def _evaluate_inner( # pylint: disable=too-many-branches

# Tell queue to pass info to the jobs-file
# NOTE: This touches files on disk...
self._job_queue.add_dispatch_information_to_jobs_file()
self._scheduler.add_dispatch_information_to_jobs_file()

result: str = await self._job_queue.execute(
result: str = await self._scheduler.execute(
queue_evaluators # type: ignore
)
print(result)
Expand All @@ -230,5 +230,5 @@ def cancellable(self) -> bool:
return True

def cancel(self) -> None:
self._job_queue.kill_all_jobs()
self._scheduler.kill_all_jobs()
logger.debug("evaluator cancelled")
2 changes: 1 addition & 1 deletion src/ert/gui/tools/plugins/plugin_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING

from ert.config import CancelPluginException
from ert.job_queue import WorkflowJobRunner
from ert.scheduler import WorkflowJobRunner

from .process_job_dialog import ProcessJobDialog

Expand Down
2 changes: 1 addition & 1 deletion src/ert/gui/tools/workflows/run_workflow_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from ert.gui.ertwidgets import CaseSelector
from ert.gui.tools.workflows.workflow_dialog import WorkflowDialog
from ert.job_queue import WorkflowRunner
from ert.scheduler import WorkflowRunner

if TYPE_CHECKING:
from ert.gui.ertnotifier import ErtNotifier
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""
The job_queue package contains modules and classes for running
The scheduler package contains modules and classes for running
external commands.
"""

# Getting LSF to work properly is quite painful. The situation
# is a mix of build complexity and LSF specific requirements:
#
# 1. The LSF libraries are accessed from the libjob_queue.so
# 1. The LSF libraries are accessed from the libscheduler.so
# library, but observe that the dependancy on the liblsf and
# libbat libraries is through dlopen(), i.e. runtime. This module
# will therefore load happily without access to the lsf libraries.
Expand Down Expand Up @@ -58,13 +58,13 @@ def setenv(var: str, value: str) -> None:


from .driver import Driver
from .queue import JobQueue
from .realization_state import QueueableRealization, RealizationState
from .scheduler import Scheduler
from .workflow_runner import WorkflowJobRunner, WorkflowRunner

__all__ = [
"Driver",
"JobQueue",
"Scheduler",
"QueueableRealization",
"WorkflowJobRunner",
"WorkflowRunner",
Expand Down
13 changes: 2 additions & 11 deletions src/ert/job_queue/driver.py → src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,13 @@
import re
import shlex
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
)
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple

from ert.config.parsing.queue_system import QueueSystem

if TYPE_CHECKING:
from ert.config import QueueConfig
from ert.job_queue import RealizationState
from ert.scheduler import RealizationState


logger = logging.getLogger(__name__)
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from ert.realization_state import RealizationState as RealizationStorageState

if TYPE_CHECKING:
from ert.job_queue import JobQueue
from ert.run_arg import RunArg
from ert.scheduler import Scheduler


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -54,9 +54,9 @@ class RealizationState(StateMachine): # type: ignore
UNKNOWN = State("UNKNOWN")

def __init__(
self, jobqueue: "JobQueue", realization: QueueableRealization, retries: int = 1
self, jobqueue: "Scheduler", realization: QueueableRealization, retries: int = 1
):
self.jobqueue: "JobQueue" = (
self.jobqueue: "Scheduler" = (
jobqueue # For direct callbacks. Consider only supplying needed callbacks.
)
self.realization: QueueableRealization = realization
Expand Down
8 changes: 4 additions & 4 deletions src/ert/job_queue/queue.py → src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _queue_state_event_type(state: str) -> str:
return _queue_state_to_event_type_map[state]


class JobQueue:
class Scheduler:
"""Represents a queue of realizations (aka Jobs) to be executed on a
cluster."""

Expand Down Expand Up @@ -260,7 +260,7 @@ async def _publish_statechanges(
assert self._ens_id is not None
events = deque(
[
JobQueue._translate_change_to_cloudevent(self._ens_id, real_id, status)
Scheduler._translate_change_to_cloudevent(self._ens_id, real_id, status)
for real_id, status in changes.items()
]
)
Expand All @@ -279,7 +279,7 @@ async def _realization_statechange_publisher(self) -> None:
while (
change := await self._statechanges_to_publish.get()
) != CLOSE_PUBLISHER_SENTINEL:
logger.warning(f"State change in JobQueue.execute(): {change}")
logger.warning(f"State change in Scheduler.execute(): {change}")
return

async for ee_connection in connect(
Expand All @@ -301,7 +301,7 @@ async def _realization_statechange_publisher(self) -> None:
await self._publish_statechanges(change, ee_connection)
except ConnectionClosed:
logger.debug(
"Websocket connection from JobQueue "
"Websocket connection from Scheduler "
"to EnsembleEvaluator closed, will retry."
)
continue
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/ert/shared/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from ert.enkf_main import EnKFMain
from ert.gui.ertnotifier import ErtNotifier
from ert.job_queue import WorkflowJobRunner
from ert.libres_facade import LibresFacade
from ert.scheduler import WorkflowJobRunner

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion src/ert/simulator/batch_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def start(
self._setup_sim(sim_id, controls, ensemble)

# The input should be validated before we instantiate the BatchContext
# object, at that stage a job_queue object with multiple threads is
# object, at that stage a scheduler object with multiple threads is
# started, and things will typically be in a quite sorry state if an
# exception occurs.
itr = 0
Expand Down
12 changes: 6 additions & 6 deletions src/ert/simulator/batch_simulator_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import numpy as np

from ert.job_queue import RealizationState
from ert.scheduler import RealizationState

from .simulation_context import SimulationContext

Expand Down Expand Up @@ -55,11 +55,11 @@ def status(self) -> Status:
Will return the state of the simulations.
"""
return Status(
running=self.job_queue.count_realization_state(RealizationState.RUNNING),
waiting=self.job_queue.count_realization_state(RealizationState.WAITING),
pending=self.job_queue.count_realization_state(RealizationState.PENDING),
complete=self.job_queue.count_realization_state(RealizationState.SUCCESS),
failed=self.job_queue.count_realization_state(RealizationState.FAILED),
running=self.scheduler.count_realization_state(RealizationState.RUNNING),
waiting=self.scheduler.count_realization_state(RealizationState.WAITING),
pending=self.scheduler.count_realization_state(RealizationState.PENDING),
complete=self.scheduler.count_realization_state(RealizationState.SUCCESS),
failed=self.scheduler.count_realization_state(RealizationState.FAILED),
)

def results(self) -> List[Optional[Dict[str, "npt.NDArray[np.float64]"]]]:
Expand Down
Loading

0 comments on commit ff3a55b

Please sign in to comment.