Skip to content

Commit

Permalink
Remove JobQueueManager
Browse files Browse the repository at this point in the history
It was only a thin wrapper with no value left. Move
its functionality down to the JobQueue object.
  • Loading branch information
berland committed Nov 3, 2023
1 parent c71cb32 commit 07b9e87
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 264 deletions.
2 changes: 0 additions & 2 deletions src/ert/job_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def __init__(self, prototype: str, bind: bool = True) -> None:
updateAbortSignals()

from .driver import Driver # noqa
from .job_queue_manager import JobQueueManager # noqa
from .job_queue_node import JobQueueNode # noqa
from .job_status import JobStatus # noqa
from .queue import JobQueue # noqa
Expand All @@ -98,7 +97,6 @@ def __init__(self, prototype: str, bind: bool = True) -> None:
__all__ = [
"Driver",
"JobQueue",
"JobQueueManager",
"JobQueueNode",
"JobStatus",
"SubmitStatus",
Expand Down
78 changes: 0 additions & 78 deletions src/ert/job_queue/job_queue_manager.py

This file was deleted.

14 changes: 9 additions & 5 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import threading
import time
from collections import deque
from threading import Semaphore
from threading import BoundedSemaphore, Semaphore
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -49,6 +49,11 @@
logger = logging.getLogger(__name__)

LONG_RUNNING_FACTOR = 1.25
"""If STOP_LONG_RUNNING is true, realizations taking more time than the average
times this factor will be killed."""
CONCURRENT_INTERNALIZATION = 1
"""How many realizations allowed to be concurrently internalized using
threads."""

EVTYPE_REALIZATION_FAILURE = "com.equinor.ert.realization.failure"
EVTYPE_REALIZATION_PENDING = "com.equinor.ert.realization.pending"
Expand Down Expand Up @@ -110,6 +115,7 @@ def __init__(self, driver: "Driver", max_submit: int = 2):
self.driver = driver
self._differ = QueueDiffer()
self._max_submit = max_submit
self._pool_sema = BoundedSemaphore(value=CONCURRENT_INTERNALIZATION)

def get_max_running(self) -> int:
return self.driver.get_max_running()
Expand Down Expand Up @@ -211,11 +217,9 @@ def launch_jobs(self, pool_sema: Semaphore) -> None:
max_submit=self.max_submit,
)

def execute_queue(
self, pool_sema: Semaphore, evaluators: Optional[Iterable[Callable[[], None]]]
) -> None:
def execute_queue(self, evaluators: Optional[Iterable[Callable[[], None]]]) -> None:
while self.is_active() and not self.stopped:
self.launch_jobs(pool_sema)
self.launch_jobs(self._pool_sema)

time.sleep(1)

Expand Down
40 changes: 21 additions & 19 deletions src/ert/simulator/simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from ert.config import HookRuntime
from ert.enkf_main import create_run_path
from ert.job_queue import Driver, JobQueue, JobQueueManager
from ert.job_queue import Driver, JobQueue, JobStatus
from ert.realization_state import RealizationState
from ert.run_context import RunContext
from ert.runpaths import Runpaths
Expand All @@ -20,7 +20,6 @@
import numpy.typing as npt

from ert.enkf_main import EnKFMain
from ert.job_queue import JobStatus
from ert.run_arg import RunArg
from ert.storage import EnsembleAccessor

Expand Down Expand Up @@ -73,8 +72,7 @@ def _run_forward_model(
)
]

jqm = JobQueueManager(job_queue, queue_evaluators)
jqm.execute_queue()
job_queue.execute_queue(queue_evaluators)

run_context.sim_fs.sync()

Expand All @@ -91,11 +89,10 @@ def __init__(
self._ert = ert
self._mask = mask

job_queue = JobQueue(
self._job_queue = JobQueue(
Driver.create_driver(ert.ert_config.queue_config),
max_submit=ert.ert_config.queue_config.max_submit,
)
self._queue_manager = JobQueueManager(job_queue)
# fill in the missing geo_id data
global_substitutions = ert.ert_config.substitution_list
global_substitutions["<CASE_NAME>"] = _slug(sim_fs.name)
Expand Down Expand Up @@ -126,7 +123,7 @@ def __init__(

# Wait until the queue is active before we finish the creation
# to ensure sane job status while running
while self.isRunning() and not self._queue_manager.isRunning():
while self.isRunning() and not self._job_queue.is_active():
sleep(0.1)

def get_run_args(self, iens: int) -> "RunArg":
Expand All @@ -144,7 +141,7 @@ def get_run_args(self, iens: int) -> "RunArg":
def _run_simulations_simple_step(self) -> Thread:
sim_thread = Thread(
target=lambda: _run_forward_model(
self._ert, self._queue_manager.queue, self._run_context
self._ert, self._job_queue, self._run_context
)
)
sim_thread.start()
Expand All @@ -155,28 +152,28 @@ def __len__(self) -> int:

def isRunning(self) -> bool:
# TODO: Should separate between running jobs and having loaded all data
return self._sim_thread.is_alive() or self._queue_manager.isRunning()
return self._sim_thread.is_alive() or self._job_queue.is_active()

def getNumPending(self) -> int:
return self._queue_manager.getNumPending()
return self._job_queue.count_status(JobStatus.PENDING) # type: ignore

def getNumRunning(self) -> int:
return self._queue_manager.getNumRunning()
return self._job_queue.count_status(JobStatus.RUNNING) # type: ignore

def getNumSuccess(self) -> int:
return self._queue_manager.getNumSuccess()
return self._job_queue.count_status(JobStatus.SUCCESS) # type: ignore

def getNumFailed(self) -> int:
return self._queue_manager.getNumFailed()
return self._job_queue.count_status(JobStatus.FAILED) # type: ignore

def getNumWaiting(self) -> int:
return self._queue_manager.getNumWaiting()
return self._job_queue.count_status(JobStatus.WAITING) # type: ignore

def didRealizationSucceed(self, iens: int) -> bool:
queue_index = self.get_run_args(iens).queue_index
if queue_index is None:
raise ValueError("Queue index not set")
return self._queue_manager.didJobSucceed(queue_index)
return self._job_queue.job_list[queue_index].queue_status == JobStatus.SUCCESS

def didRealizationFail(self, iens: int) -> bool:
# For the purposes of this class, a failure should be anything (killed
Expand All @@ -188,7 +185,11 @@ def isRealizationFinished(self, iens: int) -> bool:

queue_index = run_arg.queue_index
if queue_index is not None:
return self._queue_manager.isJobComplete(queue_index)
return not (
self._job_queue.job_list[queue_index].is_running()
or self._job_queue.job_list[queue_index].queue_status
== JobStatus.WAITING
)
else:
# job was not submitted
return False
Expand All @@ -208,7 +209,7 @@ def get_sim_fs(self) -> EnsembleAccessor:
return self._run_context.sim_fs

def stop(self) -> None:
self._queue_manager.stop_queue()
self._job_queue.kill_all_jobs()
self._sim_thread.join()

def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
Expand Down Expand Up @@ -239,7 +240,7 @@ def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
if queue_index is None:
# job was not submitted
return None
if self._queue_manager.isJobWaiting(queue_index):
if self._job_queue.job_list[queue_index].queue_status == JobStatus.WAITING:
return None

return ForwardModelStatus.load(run_arg.runpath)
Expand All @@ -257,4 +258,5 @@ def job_status(self, iens: int) -> Optional["JobStatus"]:
if queue_index is None:
# job was not submitted
return None
return self._queue_manager.getJobStatus(queue_index)
int_status = self._job_queue.job_list[queue_index].queue_status
return JobStatus(int_status)
Loading

0 comments on commit 07b9e87

Please sign in to comment.