diff --git a/src/ert/cli/main.py b/src/ert/cli/main.py index bdc9602eeba..3820f0fb7c2 100644 --- a/src/ert/cli/main.py +++ b/src/ert/cli/main.py @@ -2,9 +2,9 @@ import contextlib import logging import os +import queue import sys import threading -import time from typing import Any, TextIO from ert.cli import ( @@ -89,12 +89,15 @@ def run_cli(args: Namespace, _: Any = None) -> None: observations=ert_config.observations, ) + status_queue = queue.SimpleQueue() + try: model = create_model( ert_config, storage, args, experiment.id, + status_queue, ) except ValueError as e: raise ErtCliError(e) from e @@ -132,12 +135,9 @@ def run_cli(args: Namespace, _: Any = None) -> None: else: out = sys.stderr monitor = Monitor(out=out, color_always=args.color_always) - monitor.start() - model.add_send_event_callback(monitor.on_event) thread.start() try: - while not monitor.done: - time.sleep(0.5) + monitor.monitor(status_queue) except (SystemExit, KeyboardInterrupt): print("\nKilling simulations...") # tracker.request_termination() diff --git a/src/ert/cli/model_factory.py b/src/ert/cli/model_factory.py index d8cb33ed205..6c02ec7a98e 100644 --- a/src/ert/cli/model_factory.py +++ b/src/ert/cli/model_factory.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from queue import SimpleQueue from typing import TYPE_CHECKING from uuid import UUID @@ -55,6 +56,7 @@ def create_model( storage: StorageAccessor, args: Namespace, experiment_id: UUID, + status_queue: SimpleQueue, ) -> BaseRunModel: logger = logging.getLogger(__name__) logger.info( @@ -75,20 +77,24 @@ def create_model( ) if args.mode == TEST_RUN_MODE: - return _setup_single_test_run(config, storage, args, experiment_id) + return _setup_single_test_run( + config, storage, args, experiment_id, status_queue + ) elif args.mode == ENSEMBLE_EXPERIMENT_MODE: - return _setup_ensemble_experiment(config, storage, args, experiment_id) + return _setup_ensemble_experiment( + config, storage, args, experiment_id, status_queue + ) elif args.mode == ENSEMBLE_SMOOTHER_MODE: return _setup_ensemble_smoother( - config, storage, args, experiment_id, update_settings + config, storage, args, experiment_id, update_settings, status_queue ) elif args.mode == ES_MDA_MODE: return _setup_multiple_data_assimilation( - config, storage, args, experiment_id, update_settings + config, storage, args, experiment_id, update_settings, status_queue ) elif args.mode == ITERATIVE_ENSEMBLE_SMOOTHER_MODE: return _setup_iterative_ensemble_smoother( - config, storage, args, experiment_id, update_settings + config, storage, args, experiment_id, update_settings, status_queue ) else: @@ -96,7 +102,11 @@ def create_model( def _setup_single_test_run( - config: ErtConfig, storage: StorageAccessor, args: Namespace, experiment_id: UUID + config: ErtConfig, + storage: StorageAccessor, + args: Namespace, + experiment_id: UUID, + status_queue: SimpleQueue, ) -> SingleTestRun: return SingleTestRun( SingleTestRunArguments( @@ -113,7 +123,11 @@ def _setup_single_test_run( def _setup_ensemble_experiment( - config: ErtConfig, storage: StorageAccessor, args: Namespace, experiment_id: UUID + config: ErtConfig, + storage: StorageAccessor, + args: Namespace, + experiment_id: UUID, + status_queue: SimpleQueue, ) -> EnsembleExperiment: min_realizations_count = config.analysis_config.minimum_required_realizations active_realizations = _realizations(args, config.model_config.num_realizations) @@ -140,6 +154,7 @@ def _setup_ensemble_experiment( storage, config.queue_config, experiment_id, + status_queue, ) @@ -149,6 +164,7 @@ def _setup_ensemble_smoother( args: Namespace, experiment_id: UUID, update_settings: UpdateSettings, + status_queue: SimpleQueue, ) -> EnsembleSmoother: return EnsembleSmoother( ESRunArguments( @@ -168,6 +184,7 @@ def _setup_ensemble_smoother( experiment_id, es_settings=config.analysis_config.es_module, update_settings=update_settings, + status_queue=status_queue, ) @@ -177,6 +194,7 @@ def _setup_multiple_data_assimilation( args: Namespace, experiment_id: UUID, update_settings: UpdateSettings, + status_queue: SimpleQueue, ) -> MultipleDataAssimilation: # Because the configuration of the CLI is different from the gui, we # have a different way to get the restart information. @@ -207,6 +225,7 @@ def _setup_multiple_data_assimilation( prior_ensemble, es_settings=config.analysis_config.es_module, update_settings=update_settings, + status_queue=status_queue, ) @@ -216,6 +235,7 @@ def _setup_iterative_ensemble_smoother( args: Namespace, id_: UUID, update_settings: UpdateSettings, + status_queue: SimpleQueue, ) -> IteratedEnsembleSmoother: return IteratedEnsembleSmoother( SIESRunArguments( diff --git a/src/ert/cli/monitor.py b/src/ert/cli/monitor.py index 9756e600d24..bb91e0acdd9 100644 --- a/src/ert/cli/monitor.py +++ b/src/ert/cli/monitor.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import sys from datetime import datetime, timedelta +from queue import SimpleQueue from typing import Dict, Optional, TextIO, Tuple from tqdm import tqdm @@ -60,8 +61,25 @@ def __init__(self, out: TextIO = sys.stdout, color_always: bool = False) -> None self.dot = "" self.done = False - def start(self) -> None: + def monitor( + self, + event_queue: SimpleQueue, + ) -> None: self._start_time = datetime.now() + while True: + event = event_queue.get() + if isinstance(event, FullSnapshotEvent): + if event.snapshot is not None: + self._snapshots[event.iteration] = event.snapshot + self._progress = event.progress + elif isinstance(event, SnapshotUpdateEvent): + if event.partial_snapshot is not None: + self._snapshots[event.iteration].merge_event(event.partial_snapshot) + self._print_progress(event) + if isinstance(event, EndEvent): + self._print_result(event.failed, event.failed_msg) + self._print_job_errors() + return def on_event( self, diff --git a/src/ert/gui/simulation/queue_emitter.py b/src/ert/gui/simulation/queue_emitter.py new file mode 100644 index 00000000000..6d1a996dce8 --- /dev/null +++ b/src/ert/gui/simulation/queue_emitter.py @@ -0,0 +1,56 @@ +import logging +from queue import SimpleQueue + +from qtpy.QtCore import QObject, Signal, Slot + +from ert.ensemble_evaluator import EndEvent, FullSnapshotEvent, SnapshotUpdateEvent +from ert.gui.model.snapshot import SnapshotModel + +logger = logging.getLogger(__name__) + + +class QueueEmitter(QObject): + """A worker that emits items put on a queue to qt subscribers.""" + + new_event = Signal(object) + done = Signal() + + def __init__( + self, + event_queue: SimpleQueue, + parent=None, + ): + super().__init__(parent) + logger.debug("init QueueEmitter") + self._event_queue = event_queue + self._stopped = False + + @Slot() + def consume_and_emit(self): + logger.debug("tracking...") + while True: + event = self._event_queue.get() + if self._stopped: + logger.debug("stopped") + break + + # pre-rendering in this thread to avoid work in main rendering thread + if isinstance(event, FullSnapshotEvent) and event.snapshot: + SnapshotModel.prerender(event.snapshot) + elif isinstance(event, SnapshotUpdateEvent) and event.partial_snapshot: + SnapshotModel.prerender(event.partial_snapshot) + + logger.debug(f"emit {event}") + self.new_event.emit(event) + + if isinstance(event, EndEvent): + logger.debug("got end event") + break + + self.done.emit() + logger.debug("tracking done.") + + @Slot() + def stop(self): + logger.debug("stopping...") + self._stopped = True diff --git a/src/ert/gui/simulation/run_dialog.py b/src/ert/gui/simulation/run_dialog.py index 1d25bb66e8b..2e506164c9c 100644 --- a/src/ert/gui/simulation/run_dialog.py +++ b/src/ert/gui/simulation/run_dialog.py @@ -1,9 +1,10 @@ import logging +from queue import SimpleQueue from threading import Thread from typing import Optional from PyQt5.QtWidgets import QAbstractItemView -from qtpy.QtCore import QModelIndex, QSize, Qt, QTimer, Signal, Slot +from qtpy.QtCore import QModelIndex, QSize, Qt, QThread, QTimer, Signal, Slot from qtpy.QtGui import QMovie from qtpy.QtWidgets import ( QDialog, @@ -42,6 +43,7 @@ ) from ert.shared.status.utils import format_running_time +from .queue_emitter import QueueEmitter from .view import LegendView, ProgressView, RealizationWidget, UpdateWidget _TOTAL_PROGRESS_TEMPLATE = "Total progress {total_progress}% — {phase_name}" @@ -55,6 +57,7 @@ def __init__( self, config_file: str, run_model: BaseRunModel, + event_queue: SimpleQueue, notifier: ErtNotifier, parent=None, ): @@ -66,6 +69,7 @@ def __init__( self._snapshot_model = SnapshotModel(self) self._run_model = run_model + self._event_queue = event_queue self._notifier = notifier self._isDetailedDialog = False @@ -174,7 +178,6 @@ def __init__( self._setSimpleDialog() self.finished.connect(self._on_finished) - self._run_model.add_send_event_callback(self.on_run_model_event.emit) self.on_run_model_event.connect(self._on_event) def _current_tab_changed(self, index: int) -> None: @@ -274,8 +277,20 @@ def startSimulation(self): args=(evaluator_server_config,), ) - simulation_thread.start() + worker = QueueEmitter(self._event_queue) + worker_thread = QThread() + self._worker = worker + self._worker_thread = worker_thread + + worker.done.connect(worker_thread.quit) + worker.new_event.connect(self._on_event) + worker.moveToThread(worker_thread) + self.simulation_done.connect(worker.stop) + worker_thread.started.connect(worker.consume_and_emit) + self._ticker.start(1000) + self._worker_thread.start() + simulation_thread.start() self._notifier.set_is_simulation_running(True) def killJobs(self): @@ -287,9 +302,7 @@ def killJobs(self): if kill_job == QMessageBox.Yes: # Normally this slot would be invoked by the signal/slot system, # but the worker is busy tracking the evaluation. - # self._tracker.request_termination() - # self._worker_thread.quit() - # self._worker_thread.wait() + self._run_model.cancel() self._on_finished() self.finished.emit(-1) return kill_job @@ -329,7 +342,6 @@ def _on_event(self, event: object): self._show_done_button() elif isinstance(event, FullSnapshotEvent): if event.snapshot is not None: - SnapshotModel.prerender(event.snapshot) self._snapshot_model._add_snapshot(event.snapshot, event.iteration) self._progress_view.setIndeterminate(event.indeterminate) progress = int(event.progress * 100) @@ -343,7 +355,6 @@ def _on_event(self, event: object): elif isinstance(event, SnapshotUpdateEvent): if event.partial_snapshot is not None: - SnapshotModel.prerender(event.partial_snapshot) self._snapshot_model._add_partial_snapshot( event.partial_snapshot, event.iteration ) diff --git a/src/ert/gui/simulation/simulation_panel.py b/src/ert/gui/simulation/simulation_panel.py index 9c8f09269a4..e333acfbd54 100644 --- a/src/ert/gui/simulation/simulation_panel.py +++ b/src/ert/gui/simulation/simulation_panel.py @@ -1,4 +1,5 @@ from collections import OrderedDict +from queue import SimpleQueue from typing import Any, Dict from qtpy.QtCore import QSize, Qt @@ -181,6 +182,7 @@ def runSimulation(self): abort = False QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor) config = self.facade.config + event_queue = SimpleQueue() try: experiment = self._notifier.storage.create_experiment( parameters=config.ensemble_config.parameter_configuration, @@ -194,6 +196,7 @@ def runSimulation(self): self._notifier.storage, args, experiment.id, + event_queue, ) experiment.write_simulation_arguments(model.simulation_arguments) @@ -254,7 +257,7 @@ def runSimulation(self): QApplication.restoreOverrideCursor() dialog = RunDialog( - self._config_file, model, self._notifier, self.parent() + self._config_file, model, event_queue, self._notifier, self.parent() ) self.run_button.setEnabled(False) self.run_button.setText(EXPERIMENT_IS_RUNNING_BUTTON_MESSAGE) diff --git a/src/ert/gui/tools/run_analysis/run_analysis_tool.py b/src/ert/gui/tools/run_analysis/run_analysis_tool.py index 4bf823d2bf9..3f0b1f932e4 100644 --- a/src/ert/gui/tools/run_analysis/run_analysis_tool.py +++ b/src/ert/gui/tools/run_analysis/run_analysis_tool.py @@ -63,7 +63,7 @@ def run(self): update_settings, config.analysis_config.es_module, rng, - self.smoother_event_callback, + self.send_smoother_event, log_path=config.analysis_config.log_path, ) except ErtAnalysisError as e: @@ -73,7 +73,7 @@ def run(self): self.finished.emit(error, self._source_fs.name) - def smoother_event_callback(self, event: AnalysisEvent) -> None: + def send_smoother_event(self, event: AnalysisEvent) -> None: if isinstance(event, AnalysisStatusEvent): self.progress_update.emit(RunModelStatusEvent(iteration=0, msg=event.msg)) elif isinstance(event, AnalysisTimeEvent): diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index b20f27d3722..78c0023c10f 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -8,9 +8,9 @@ import uuid from contextlib import contextmanager from pathlib import Path +from queue import SimpleQueue from typing import ( TYPE_CHECKING, - Callable, Dict, Generator, List, @@ -129,6 +129,7 @@ def __init__( storage: StorageAccessor, queue_config: QueueConfig, experiment_id: uuid.UUID, + status_queue: SimpleQueue, phase_count: int = 1, ): """ @@ -178,16 +179,13 @@ def __init__( substitute=self.substitution_list.substitute_real_iter, ) self._iter_snapshot: Dict[int, Snapshot] = {} - self._send_event_callback: Optional[Callable[[StatusEvents], None]] = None - - def add_send_event_callback(self, func: Callable[[StatusEvents], None]) -> None: - self._send_event_callback = func + self._status_queue = status_queue + self._end_queue = SimpleQueue() def send_event(self, event: StatusEvents) -> None: - if self._send_event_callback: - self._send_event_callback(event) + self._status_queue.put(event) - def smoother_event_callback(self, iteration: int, event: AnalysisEvent) -> None: + def send_smoother_event(self, iteration: int, event: AnalysisEvent) -> None: if isinstance(event, AnalysisStatusEvent): self.send_event(RunModelStatusEvent(iteration=iteration, msg=event.msg)) elif isinstance(event, AnalysisTimeEvent): @@ -211,6 +209,9 @@ def simulation_arguments(self) -> RunArgumentsType: def _ensemble_size(self) -> int: return len(self._initial_realizations_mask) + def cancel(self) -> None: + self._end_queue.put("END") + def reset(self) -> None: self._failed = False self._error_messages = [] @@ -487,6 +488,10 @@ def send_snapshot_event(self, event: CloudEvent) -> None: def run_ensemble_evaluator( self, run_context: RunContext, ee_config: EvaluatorServerConfig ) -> List[int]: + if not self._end_queue.empty(): + event_logger.debug("Run model canceled - pre evaluation") + self._end_queue.get() + return [] ensemble = self._build_ensemble(run_context) evaluator = EnsembleEvaluator( ensemble, @@ -523,6 +528,14 @@ def run_ensemble_evaluator( return [] elif event["type"] == EVTYPE_EE_TERMINATED: event_logger.debug("got terminator event") + + if not self._end_queue.empty(): + event_logger.debug("Run model canceled - during evaluation") + self._end_queue.get() + monitor.signal_cancel() + event_logger.debug( + "Run model canceled - during evaluation - cancel sent" + ) # This sleep needs to be there. Refer to issue #1250: `Authority # on information about evaluations/experiments` # time.sleep(self._next_ensemble_evaluator_wait_time) @@ -546,6 +559,10 @@ def run_ensemble_evaluator( event_logger.debug("tasks complete") evaluator.join() + if not self._end_queue.empty(): + event_logger.debug("Run model canceled - post evaluation") + self._end_queue.get() + return [] return evaluator.get_successful_realizations() def _build_ensemble( diff --git a/src/ert/run_models/ensemble_experiment.py b/src/ert/run_models/ensemble_experiment.py index 0d8eddacd3c..659dae3e3ec 100644 --- a/src/ert/run_models/ensemble_experiment.py +++ b/src/ert/run_models/ensemble_experiment.py @@ -1,6 +1,7 @@ from __future__ import annotations from pathlib import Path +from queue import SimpleQueue from typing import TYPE_CHECKING, Union from uuid import UUID @@ -34,6 +35,7 @@ def __init__( storage: StorageAccessor, queue_config: QueueConfig, id_: UUID, + status_queue: SimpleQueue, ): super().__init__( simulation_arguments, @@ -41,6 +43,7 @@ def __init__( storage, queue_config, id_, + status_queue, ) def runSimulations__( diff --git a/src/ert/run_models/ensemble_smoother.py b/src/ert/run_models/ensemble_smoother.py index 957e55c0374..947ee12eb7e 100644 --- a/src/ert/run_models/ensemble_smoother.py +++ b/src/ert/run_models/ensemble_smoother.py @@ -2,6 +2,7 @@ import functools import logging +from queue import SimpleQueue from typing import TYPE_CHECKING from uuid import UUID @@ -37,6 +38,7 @@ def __init__( experiment_id: UUID, es_settings: ESSettings, update_settings: UpdateSettings, + status_queue: SimpleQueue, ): super().__init__( simulation_arguments, @@ -44,6 +46,7 @@ def __init__( storage, queue_config, experiment_id, + status_queue, phase_count=2, ) self.es_settings = es_settings @@ -128,7 +131,7 @@ def run_experiment( es_settings=self.es_settings, updatestep=self.ert.update_configuration, rng=self.rng, - progress_callback=functools.partial(self.smoother_event_callback, 0), + progress_callback=functools.partial(self.send_smoother_event, 0), log_path=self.ert_config.analysis_config.log_path, ) diff --git a/src/ert/run_models/iterated_ensemble_smoother.py b/src/ert/run_models/iterated_ensemble_smoother.py index ffc9cd7d1d8..f262e41fd03 100644 --- a/src/ert/run_models/iterated_ensemble_smoother.py +++ b/src/ert/run_models/iterated_ensemble_smoother.py @@ -2,6 +2,7 @@ import functools import logging +from queue import SimpleQueue from typing import TYPE_CHECKING from uuid import UUID @@ -42,6 +43,7 @@ def __init__( experiment_id: UUID, analysis_config: IESSettings, update_settings: UpdateSettings, + status_queue: SimpleQueue, ): super().__init__( simulation_arguments, @@ -49,6 +51,7 @@ def __init__( storage, queue_config, experiment_id, + status_queue, phase_count=2, ) self.support_restart = False @@ -99,7 +102,7 @@ def analyzeStep( initial_mask=initial_mask, rng=self.rng, progress_callback=functools.partial( - self.smoother_event_callback, iteration + self.send_smoother_event, iteration ), log_path=self.ert_config.analysis_config.log_path, ) diff --git a/src/ert/run_models/multiple_data_assimilation.py b/src/ert/run_models/multiple_data_assimilation.py index afb682d1965..5c0be5d5771 100644 --- a/src/ert/run_models/multiple_data_assimilation.py +++ b/src/ert/run_models/multiple_data_assimilation.py @@ -2,6 +2,7 @@ import functools import logging +from queue import SimpleQueue from typing import TYPE_CHECKING, List, Optional from uuid import UUID @@ -44,6 +45,7 @@ def __init__( prior_ensemble: Optional[EnsembleAccessor], es_settings: ESSettings, update_settings: UpdateSettings, + status_queue: SimpleQueue, ): super().__init__( simulation_arguments, @@ -51,6 +53,7 @@ def __init__( storage, queue_config, experiment_id, + status_queue, phase_count=2, ) self.weights = MultipleDataAssimilation.default_weights @@ -202,7 +205,7 @@ def update( global_scaling=weight, rng=self.rng, progress_callback=functools.partial( - self.smoother_event_callback, prior_context.iteration + self.send_smoother_event, prior_context.iteration ), log_path=self.ert_config.analysis_config.log_path, )