diff --git a/src/_ert_job_runner/reporting/event.py b/src/_ert_job_runner/reporting/event.py index 64018f76e09..73bd3595e33 100644 --- a/src/_ert_job_runner/reporting/event.py +++ b/src/_ert_job_runner/reporting/event.py @@ -69,7 +69,6 @@ def __init__(self, evaluator_url, token=None, cert_path=None): self._ens_id = None self._real_id = None - self._step_id = None self._event_queue = queue.Queue() self._event_publisher_thread = threading.Thread(target=self._event_publisher) self._sentinel = object() # notifying the queue's ended @@ -124,20 +123,17 @@ def _dump_event(self, attributes: Dict[str, str], data: Any = None): logger.debug(f'Schedule {type(event)} "{event["type"]}" for delivery') self._event_queue.put(event) - def _step_path(self): - return f"/ert/ensemble/{self._ens_id}/real/{self._real_id}/step/{self._step_id}" - def _init_handler(self, msg): self._ens_id = msg.ens_id self._real_id = msg.real_id - self._step_id = msg.step_id self._event_publisher_thread.start() def _job_handler(self, msg: Message): job_name = msg.job.name() job_msg_attrs = { _JOB_SOURCE: ( - f"{self._step_path()}/job/{msg.job.index}" f"/index/{msg.job.index}" + f"/ert/ensemble/{self._ens_id}/real/{self._real_id}/" + f"job/{msg.job.index}/index/{msg.job.index}" ), _CONTENT_TYPE: "application/json", } diff --git a/src/ert/cli/monitor.py b/src/ert/cli/monitor.py index 1bdf7975002..b2c346e716a 100644 --- a/src/ert/cli/monitor.py +++ b/src/ert/cli/monitor.py @@ -84,11 +84,10 @@ def _print_job_errors(self) -> None: failed_jobs: Dict[Optional[str], int] = {} for snapshot in self._snapshots.values(): for real in snapshot.reals.values(): - for step in real.steps.values(): - for job in step.jobs.values(): - if job.status == JOB_STATE_FAILURE: - result = failed_jobs.get(job.error, 0) - failed_jobs[job.error] = result + 1 + for job in real.jobs.values(): + if job.status == JOB_STATE_FAILURE: + result = failed_jobs.get(job.error, 0) + failed_jobs[job.error] = result + 1 for error, number_of_jobs in failed_jobs.items(): print(f"{number_of_jobs} jobs failed due to the error: {error}") diff --git a/src/ert/ensemble_evaluator/_builder/_ensemble.py b/src/ert/ensemble_evaluator/_builder/_ensemble.py index 248c6f899dc..752bc000a6c 100644 --- a/src/ert/ensemble_evaluator/_builder/_ensemble.py +++ b/src/ert/ensemble_evaluator/_builder/_ensemble.py @@ -161,16 +161,13 @@ def _create_snapshot(self) -> Snapshot: active=True, status=state.REALIZATION_STATE_WAITING, ) - for step in real.steps: - reals[str(real.iens)].steps[str(step.id_)] = Step( - status=state.STEP_STATE_UNKNOWN + reals[str(real.iens)].step = Step(status=state.STEP_STATE_UNKNOWN) + for job in real.jobs: + reals[str(real.iens)].jobs[str(job.id_)] = Job( + status=state.JOB_STATE_START, + index=job.index, + name=job.name, ) - for job in step.jobs: - reals[str(real.iens)].steps[str(step.id_)].jobs[str(job.id_)] = Job( - status=state.JOB_STATE_START, - index=job.index, - name=job.name, - ) top = SnapshotDict( reals=reals, status=state.ENSEMBLE_STATE_UNKNOWN, diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index f2cfc91450f..85bfd09d854 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -63,7 +63,7 @@ def generate_event_creator( def event_builder(status: str, real_id: Optional[int] = None) -> CloudEvent: source = f"/ert/ensemble/{self.id_}" if real_id is not None: - source += f"/real/{real_id}/step/0" + source += f"/real/{real_id}" return CloudEvent( { "type": status, @@ -191,7 +191,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches # Submit all jobs to queue and inform queue when done for real in self.active_reals: - self._job_queue.add_ee_stage(real.steps[0], callback_timeout=on_timeout) + self._job_queue.add_ee_stage(real.step, callback_timeout=on_timeout) # TODO: this is sort of a callback being preemptively called. # It should be lifted out of the queue/evaluate, into the evaluator. If diff --git a/src/ert/ensemble_evaluator/_builder/_realization.py b/src/ert/ensemble_evaluator/_builder/_realization.py index 4deb2d28115..437f94ef6c0 100644 --- a/src/ert/ensemble_evaluator/_builder/_realization.py +++ b/src/ert/ensemble_evaluator/_builder/_realization.py @@ -1,9 +1,9 @@ import logging -from typing import List, Optional, Sequence +from typing import Optional, Sequence from typing_extensions import Self -from ._step import LegacyStep +from ._step import LegacyJob, LegacyStep SOURCE_TEMPLATE_REAL = "/real/{iens}" @@ -14,34 +14,41 @@ class Realization: def __init__( self, iens: int, - steps: Sequence[LegacyStep], + step: Optional[LegacyStep], + jobs: Sequence[LegacyJob], active: bool, ): if iens is None: raise ValueError(f"{self} needs iens") - if steps is None: - raise ValueError(f"{self} needs steps") + if jobs is None: + raise ValueError(f"{self} needs jobs") if active is None: raise ValueError(f"{self} needs to be set either active or not") self.iens = iens - self.steps = steps + self.step = step + self.jobs = jobs self.active = active class RealizationBuilder: def __init__(self) -> None: - self._steps: List[LegacyStep] = [] + self._step: Optional[LegacyStep] = None self._active: Optional[bool] = None self._iens: Optional[int] = None self._parent_source: Optional[str] = None + self._jobs: Sequence[LegacyJob] = [] def active(self, active: bool) -> Self: self._active = active return self - def add_step(self, step: LegacyStep) -> Self: - self._steps.append(step) + def set_step(self, step: LegacyStep) -> Self: + self._step = step + return self + + def set_jobs(self, jobs: Sequence[LegacyJob]) -> Self: + self._jobs = jobs return self def set_iens(self, iens: int) -> Self: @@ -58,6 +65,7 @@ def build(self) -> Realization: return Realization( self._iens, - self._steps, + self._step, + self._jobs, self._active, ) diff --git a/src/ert/ensemble_evaluator/_builder/_step.py b/src/ert/ensemble_evaluator/_builder/_step.py index 9d49e8606d5..e2f73331db2 100644 --- a/src/ert/ensemble_evaluator/_builder/_step.py +++ b/src/ert/ensemble_evaluator/_builder/_step.py @@ -1,11 +1,10 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING, Optional, Sequence +from typing import TYPE_CHECKING, Optional from ert.config.ext_job import ExtJob -SOURCE_TEMPLATE_STEP = "/step/{step_id}" if TYPE_CHECKING: from ert.run_arg import RunArg @@ -20,8 +19,6 @@ class LegacyJob: @dataclass class LegacyStep: - id_: str - jobs: Sequence[LegacyJob] name: str max_runtime: Optional[int] run_arg: "RunArg" diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index 42fd45495d5..bfd65eac445 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -24,10 +24,6 @@ def _get_real_id(source: str) -> str: return _match_token("real", source) -def _get_step_id(source: str) -> str: - return _match_token("step", source) - - def _get_job_id(source: str) -> str: return _match_token("job", source) @@ -93,17 +89,17 @@ def __init__(self, snapshot: Optional["Snapshot"] = None) -> None: start_time (datetime), end_time (datetime) and status (str).""" self._step_states: Dict[ - Tuple[str, str], Dict[str, Union[str, datetime.datetime]] + str, Dict[str, Union[str, datetime.datetime]] ] = defaultdict(dict) - """A shallow dictionary of step states. The key is a tuple of two strings with - realization id and step id, pointing to a dict with the same members as the Step - class, except Jobs""" + """A shallow dictionary of step states. The key is a string + realization id, pointing to a dict with the same members as the Step + class.""" self._job_states: Dict[ - Tuple[str, str, str], Dict[str, Union[str, datetime.datetime]] + Tuple[str, str], Dict[str, Union[str, datetime.datetime]] ] = defaultdict(dict) - """A shallow dictionary of job states. The key is a tuple of three - strings with realization id, step id and job id, pointing to a dict with + """A shallow dictionary of job states. The key is a tuple of two + strings with realization id and job id, pointing to a dict with the same members as the Job.""" self._ensemble_state: Optional[str] = None @@ -122,10 +118,7 @@ def update_metadata(self, metadata: Dict[str, Any]) -> None: snapshot's metadata""" self._metadata.update(_filter_nones(metadata)) - def update_step( - self, real_id: str, step_id: str, step: "Step" - ) -> "PartialSnapshot": - step_idx = (real_id, step_id) + def update_step(self, real_id: str, step: "Step") -> "PartialSnapshot": step_update = _filter_nones( { "status": step.status, @@ -133,20 +126,19 @@ def update_step( "end_time": step.end_time, } ) - self._step_states[step_idx].update(step_update) + self._step_states[real_id].update(step_update) if self._snapshot: - self._snapshot._my_partial._step_states[step_idx].update(step_update) - self._check_state_after_step_update(step_idx[0], step_idx[1]) + self._snapshot._my_partial._step_states[real_id].update(step_update) + self._propagate_step_update_to_realization(real_id) return self def update_job( self, real_id: str, - step_id: str, job_id: str, job: "Job", ) -> "PartialSnapshot": - job_idx = (real_id, step_id, job_id) + job_idx = (real_id, job_id) job_update = _filter_nones(job.dict()) self._job_states[job_idx].update(job_update) @@ -154,10 +146,8 @@ def update_job( self._snapshot._my_partial._job_states[job_idx].update(job_update) return self - def _check_state_after_step_update( - self, real_id: str, step_id: str - ) -> "PartialSnapshot": - step = self._step_states[(real_id, step_id)] + def _propagate_step_update_to_realization(self, real_id: str) -> "PartialSnapshot": + step = self._step_states[real_id] step_status = step.get("status") assert isinstance(step_status, str) assert self._snapshot is not None @@ -165,39 +155,38 @@ def _check_state_after_step_update( real_state = self._realization_states[real_id] if real_state.get("status") == state.REALIZATION_STATE_FAILED: return self - if step_status in _STEP_STATE_TO_REALIZATION_STATE: + if ( + step_status in _STEP_STATE_TO_REALIZATION_STATE + ): # All but the finished state self._realization_states[real_id].update( {"status": _STEP_STATE_TO_REALIZATION_STATE[step_status]} ) elif ( step_status == state.REALIZATION_STATE_FINISHED - and self._snapshot.all_steps_finished(real_id) + and self._snapshot.step_finished(real_id) ): real_state["status"] = state.REALIZATION_STATE_FINISHED elif ( step_status == state.STEP_STATE_SUCCESS - and not self._snapshot.all_steps_finished(real_id) + and not self._snapshot.step_finished(real_id) ): pass else: - raise ValueError( - f"unknown step status {step_status} for real: {real_id} step: " - + f"{step_id}" - ) + raise ValueError(f"unknown step status {step_status} for real: {real_id}") return self def get_jobs( self, - ) -> Mapping[Tuple[str, str, str], "Job"]: + ) -> Mapping[Tuple[str, str], "Job"]: if self._snapshot: return self._snapshot.get_jobs() return {} - def get_job_status_for_all_reals_and_steps( + def get_job_status_for_all_reals( self, - ) -> Mapping[Tuple[str, str, str], Union[str, datetime.datetime]]: + ) -> Mapping[Tuple[str, str], Union[str, datetime.datetime]]: if self._snapshot: - return self._snapshot.get_job_status_for_all_reals_and_steps() + return self._snapshot.get_job_status_for_all_reals() return {} @property @@ -224,32 +213,9 @@ def get_real_ids(self) -> Sequence[str]: def metadata(self) -> Mapping[str, Any]: return self._metadata - def get_jobs_for_real_and_step( - self, real_id: str, step_id: str - ) -> Mapping[str, "Job"]: - jobs = {} - for idx, job_state in self._job_states.items(): - if real_id != idx[0] or step_id != idx[1]: - continue - job_id = idx[2] - jobs[job_id] = Job(**job_state) - return jobs - - def get_steps_for_real(self, real_id: str) -> Mapping[str, "Step"]: - steps = {} - for step_index_tuple, step_state in self._step_states.items(): - if real_id != step_index_tuple[0]: - continue - step_id = step_index_tuple[1] - steps[step_id] = Step(**step_state) - return steps - def get_real(self, real_id: str) -> "RealizationSnapshot": return RealizationSnapshot(**self._realization_states[real_id]) - def get_step(self, real_id: str, step_id: str) -> "Step": - return Step(**self._step_states[(real_id, step_id)]) - def to_dict(self) -> Dict[str, Any]: """used to send snapshot updates - for thread safety, this method should not access the _snapshot property""" @@ -261,31 +227,25 @@ def to_dict(self) -> Dict[str, Any]: if self._realization_states: _dict["reals"] = self._realization_states - for step_index_tuple, step_state in self._step_states.items(): - real_id = step_index_tuple[0] - step_id = step_index_tuple[1] + for real_id, step_state in self._step_states.items(): if "reals" not in _dict: _dict["reals"] = {real_id: {}} - if "steps" not in _dict["reals"][real_id]: - _dict["reals"][real_id]["steps"] = {} - if step_id not in _dict["reals"][real_id]["steps"]: - _dict["reals"][real_id]["steps"][step_id] = step_state - _dict["reals"][real_id]["steps"][step_id]["jobs"] = {} + if "step" not in _dict["reals"][real_id]: + _dict["reals"][real_id]["step"] = step_state for job_tuple, job_values_dict in self._job_states.items(): real_id = job_tuple[0] - step_id = job_tuple[1] if "reals" not in _dict: _dict["reals"] = {} if real_id not in _dict["reals"]: _dict["reals"][real_id] = {} - if "steps" not in _dict["reals"][real_id]: - _dict["reals"][real_id]["steps"] = {} - if step_id not in _dict["reals"][real_id]["steps"]: - _dict["reals"][real_id]["steps"][step_id] = {"jobs": {}} + if "step" not in _dict["reals"][real_id]: + _dict["reals"][real_id]["step"] = {} + if "jobs" not in _dict["reals"][real_id]: + _dict["reals"][real_id]["jobs"] = {} - job_id = job_tuple[2] - _dict["reals"][real_id]["steps"][step_id]["jobs"][job_id] = job_values_dict + job_id = job_tuple[1] + _dict["reals"][real_id]["jobs"][job_id] = job_values_dict return _dict @@ -298,8 +258,8 @@ def _merge(self, other: "PartialSnapshot") -> "PartialSnapshot": self._ensemble_state = other._ensemble_state for real_id, other_real_data in other._realization_states.items(): self._realization_states[real_id].update(other_real_data) - for step_id, other_step_data in other._step_states.items(): - self._step_states[step_id].update(other_step_data) + for real_id, other_step_data in other._step_states.items(): + self._step_states[real_id].update(other_step_data) for job_id, other_job_data in other._job_states.items(): self._job_states[job_id].update(other_job_data) return self @@ -326,10 +286,8 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot": ids.EVTYPE_FM_STEP_TIMEOUT, }: end_time = convert_iso8601_to_datetime(timestamp) - self.update_step( _get_real_id(e_source), - _get_step_id(e_source), Step( **_filter_nones( { @@ -342,14 +300,10 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot": ) if e_type == ids.EVTYPE_FM_STEP_TIMEOUT: - step = self._snapshot.get_step( - _get_real_id(e_source), _get_step_id(e_source) - ) - for job_id, job in step.jobs.items(): + for job_id, job in self._snapshot.jobs(_get_real_id(e_source)).items(): if job.status != state.JOB_STATE_FINISHED: real_id = _get_real_id(e_source) - step_id = _get_step_id(e_source) - job_idx = (real_id, step_id, job_id) + job_idx = (real_id, job_id) if job_idx not in self._job_states: self._job_states[job_idx] = {} self._job_states[job_idx].update( @@ -387,7 +341,6 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot": job_dict["error"] = event.data.get(ids.ERROR_MSG) self.update_job( _get_real_id(e_source), - _get_step_id(e_source), _get_job_id(e_source), Job(**job_dict), ) @@ -428,15 +381,15 @@ def metadata(self) -> Mapping[str, Any]: def get_jobs( self, - ) -> Mapping[Tuple[str, str, str], "Job"]: + ) -> Mapping[Tuple[str, str], "Job"]: return { idx: Job(**job_state) for idx, job_state in self._my_partial._job_states.items() } - def get_job_status_for_all_reals_and_steps( + def get_job_status_for_all_reals( self, - ) -> Mapping[Tuple[str, str, str], Union[str, datetime.datetime]]: + ) -> Mapping[Tuple[str, str], Union[str, datetime.datetime]]: return { idx: job_state["status"] for idx, job_state in self._my_partial._job_states.items() @@ -446,34 +399,28 @@ def get_job_status_for_all_reals_and_steps( def reals(self) -> Mapping[str, "RealizationSnapshot"]: return self._my_partial.reals - def steps(self, real_id: str) -> Dict[str, "Step"]: - return { - step_idx[1]: Step(**step_data) - for step_idx, step_data in self._my_partial._step_states.items() - if step_idx[0] == real_id - } + def step(self, real_id: str) -> Dict[str, Union[str, datetime.datetime]]: + return self._my_partial._step_states[real_id] - def jobs(self, real_id: str, step_id: str) -> Dict[str, "Job"]: + def jobs(self, real_id: str) -> Dict[str, "Job"]: return { - job_idx[2]: Job(**job_data) + job_idx[1]: Job(**job_data) for job_idx, job_data in self._my_partial._job_states.items() - if job_idx[0] == real_id and job_idx[1] == step_id + if job_idx[0] == real_id } def get_real(self, real_id: str) -> "RealizationSnapshot": return RealizationSnapshot(**self._my_partial._realization_states[real_id]) - def get_step(self, real_id: str, step_id: str) -> "Step": - return Step(**self._my_partial._step_states[(real_id, step_id)]) + def get_step(self, real_id: str) -> "Step": + return Step(**self._my_partial._step_states[real_id]) - def get_job(self, real_id: str, step_id: str, job_id: str) -> "Job": - return Job(**self._my_partial._job_states[(real_id, step_id, job_id)]) + def get_job(self, real_id: str, job_id: str) -> "Job": + return Job(**self._my_partial._job_states[(real_id, job_id)]) - def all_steps_finished(self, real_id: str) -> bool: - return all( - step["status"] == state.STEP_STATE_SUCCESS - for real_step_id, step in self._my_partial._step_states.items() - if real_step_id[0] == real_id + def step_finished(self, real_id: str) -> bool: + return ( + self._my_partial._step_states[real_id]["status"] == state.STEP_STATE_SUCCESS ) def get_successful_realizations(self) -> int: @@ -515,7 +462,6 @@ class Step(BaseModel): status: Optional[str] start_time: Optional[datetime.datetime] end_time: Optional[datetime.datetime] - jobs: Dict[str, Job] = {} class RealizationSnapshot(BaseModel): @@ -523,7 +469,8 @@ class RealizationSnapshot(BaseModel): active: Optional[bool] start_time: Optional[datetime.datetime] end_time: Optional[datetime.datetime] - steps: Dict[str, Step] = {} + step: Optional[Step] + jobs: Dict[str, Job] = {} class SnapshotDict(BaseModel): @@ -533,7 +480,8 @@ class SnapshotDict(BaseModel): class SnapshotBuilder(BaseModel): - steps: Dict[str, Step] = {} + step: Optional[Step] = None + jobs: Dict[str, Job] = {} metadata: Dict[str, Any] = {} def build( @@ -547,7 +495,8 @@ def build( for r_id in real_ids: top.reals[r_id] = RealizationSnapshot( active=True, - steps=self.steps, + step=self.step, + jobs=self.jobs, start_time=start_time, end_time=end_time, status=status, @@ -556,19 +505,15 @@ def build( def add_step( self, - step_id: str, status: Optional[str], start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, ) -> "SnapshotBuilder": - self.steps[step_id] = Step( - status=status, start_time=start_time, end_time=end_time - ) + self.step = Step(status=status, start_time=start_time, end_time=end_time) return self def add_job( self, - step_id: str, job_id: str, index: str, name: Optional[str], @@ -580,8 +525,7 @@ def add_job( stdout: Optional[str] = None, stderr: Optional[str] = None, ) -> "SnapshotBuilder": - step = self.steps[step_id] - step.jobs[job_id] = Job( + self.jobs[job_id] = Job( status=status, index=index, start_time=start_time, @@ -610,16 +554,17 @@ def _from_nested_dict(data: Mapping[str, Any]) -> PartialSnapshot: "end_time": realization_data.get("end_time"), } ) - for step_id, step_data in data["reals"][real_id].get("steps", {}).items(): - partial._step_states[(real_id, step_id)] = _filter_nones( + step_data = data["reals"][real_id].get("step", {}) + if step_data is not None: + partial._step_states[real_id] = _filter_nones( { "status": step_data.get("status"), "start_time": step_data.get("start_time"), "end_time": step_data.get("end_time"), } ) - for job_id, job in step_data.get("jobs", {}).items(): - job_idx = (real_id, step_id, job_id) - partial._job_states[job_idx] = job + for job_id, job in realization_data.get("jobs", {}).items(): + job_idx = (real_id, job_id) + partial._job_states[job_idx] = job return partial diff --git a/src/ert/gui/model/job_list.py b/src/ert/gui/model/job_list.py index 31f4928c1b5..03c9f167d59 100644 --- a/src/ert/gui/model/job_list.py +++ b/src/ert/gui/model/job_list.py @@ -17,39 +17,36 @@ IsEnsembleRole, IsJobRole, IsRealizationRole, - IsStepRole, NodeRole, ) class JobListProxyModel(QAbstractProxyModel): + """This proxy model presents two-dimensional views (row-column) of + forward model data for a specific realization in a specific iteration.""" + def __init__( self, parent: Optional[QObject], iter_: int, real: int, - stage: int, - step: int, ) -> None: super().__init__(parent=parent) self._iter = iter_ self._real = real - self._stage = stage - self._step = step - step_changed = Signal(int, int, int, int) + real_changed = Signal(int, int) - @Slot(int, int, int, int) - def set_step(self, iter_: int, real: int, stage: int, step: int): + @Slot(int, int) + def set_real(self, iter_: int, real: int): + """Called when the user clicks a specific realization in the run_dialog window.""" self._disconnect() self.modelAboutToBeReset.emit() self._iter = iter_ self._real = real - self._stage = stage - self._step = step self.modelReset.emit() self._connect() - self.step_changed.emit(iter_, real, stage, step) + self.real_changed.emit(iter_, real) def _disconnect(self): source_model = self.sourceModel() @@ -91,7 +88,7 @@ def headerData( if role != Qt.DisplayRole: return QVariant() if orientation == Qt.Horizontal: - return COLUMNS[NodeType.STEP][section][0] + return COLUMNS[NodeType.REAL][section][0] if orientation == Qt.Vertical: return section return QVariant() @@ -125,8 +122,7 @@ def index(self, row: int, column: int, parent=None) -> QModelIndex: if parent.isValid(): return QModelIndex() job_index = self.mapToSource(self.createIndex(row, column, parent)) - ret_index = self.createIndex(row, column, job_index.data(NodeRole)) - return ret_index + return self.createIndex(row, column, job_index.data(NodeRole)) def mapToSource(self, proxyIndex: QModelIndex) -> QModelIndex: if not proxyIndex.isValid(): @@ -138,11 +134,8 @@ def mapToSource(self, proxyIndex: QModelIndex) -> QModelIndex: real_index = source_model.index(self._real, 0, iter_index) if not real_index.isValid() or not source_model.hasChildren(real_index): return QModelIndex() - step_index = source_model.index(self._step, 0, real_index) - if not step_index.isValid() or not source_model.hasChildren(step_index): - return QModelIndex() job_index = source_model.index( - proxyIndex.row(), proxyIndex.column(), step_index + proxyIndex.row(), proxyIndex.column(), real_index ) return job_index @@ -167,18 +160,17 @@ def _source_data_changed( def _accept_index(self, index: QModelIndex) -> bool: if index.internalPointer() is None: return False + # This model should only consist of job indices, so anything else mean # the index is not on "our branch" of the state graph. if not index.data(IsJobRole): return False - # traverse upwards and check step, real and iter against parents of + # traverse upwards and check real and iter against parents of # this index. while index.isValid() and index.internalPointer() is not None: - if ( - (index.data(IsStepRole) and (index.row() != self._step)) - or (index.data(IsRealizationRole) and (index.row() != self._real)) - or (index.data(IsEnsembleRole) and (index.row() != self._iter)) + if (index.data(IsRealizationRole) and (index.row() != self._real)) or ( + index.data(IsEnsembleRole) and (index.row() != self._iter) ): return False index = index.parent() diff --git a/src/ert/gui/model/node.py b/src/ert/gui/model/node.py index 412bd20a28e..5f1f2c96fa5 100644 --- a/src/ert/gui/model/node.py +++ b/src/ert/gui/model/node.py @@ -6,7 +6,6 @@ class NodeType(Enum): ROOT = auto() ITER = auto() REAL = auto() - STEP = auto() JOB = auto() diff --git a/src/ert/gui/model/snapshot.py b/src/ert/gui/model/snapshot.py index e064c75584b..62a76db32cc 100644 --- a/src/ert/gui/model/snapshot.py +++ b/src/ert/gui/model/snapshot.py @@ -27,18 +27,17 @@ # Indicates what type the underlying data is IsEnsembleRole = Qt.UserRole + 8 IsRealizationRole = Qt.UserRole + 9 -IsStepRole = Qt.UserRole + 10 -IsJobRole = Qt.UserRole + 11 -StatusRole = Qt.UserRole + 12 - -STEP_COLUMN_NAME = "Name" -STEP_COLUMN_ERROR = "Error" -STEP_COLUMN_STATUS = "Status" -STEP_COLUMN_DURATION = "Duration" -STEP_COLUMN_STDOUT = "STDOUT" -STEP_COLUMN_STDERR = "STDERR" -STEP_COLUMN_CURRENT_MEMORY_USAGE = "Current memory usage" -STEP_COLUMN_MAX_MEMORY_USAGE = "Max memory usage" +IsJobRole = Qt.UserRole + 10 +StatusRole = Qt.UserRole + 11 + +JOB_COLUMN_NAME = "Name" +JOB_COLUMN_ERROR = "Error" +JOB_COLUMN_STATUS = "Status" +JOB_COLUMN_DURATION = "Duration" +JOB_COLUMN_STDOUT = "STDOUT" +JOB_COLUMN_STDERR = "STDERR" +JOB_COLUMN_CURRENT_MEMORY_USAGE = "Current memory usage" +JOB_COLUMN_MAX_MEMORY_USAGE = "Max memory usage" SORTED_REALIZATION_IDS = "_sorted_real_ids" SORTED_JOB_IDS = "_sorted_job_ids" @@ -50,19 +49,18 @@ COLUMNS: Dict[NodeType, Sequence[Union[str, Tuple[str, str]]]] = { NodeType.ROOT: ["Name", "Status"], NodeType.ITER: ["Name", "Status", "Active"], - NodeType.REAL: ["Name", "Status"], - NodeType.STEP: [ - (STEP_COLUMN_NAME, ids.NAME), - (STEP_COLUMN_ERROR, ids.ERROR), - (STEP_COLUMN_STATUS, ids.STATUS), + NodeType.REAL: [ + (JOB_COLUMN_NAME, ids.NAME), + (JOB_COLUMN_ERROR, ids.ERROR), + (JOB_COLUMN_STATUS, ids.STATUS), ( - STEP_COLUMN_DURATION, + JOB_COLUMN_DURATION, DURATION, ), # Duration is based on two data fields, not coming directly from ert - (STEP_COLUMN_STDOUT, ids.STDOUT), - (STEP_COLUMN_STDERR, ids.STDERR), - (STEP_COLUMN_CURRENT_MEMORY_USAGE, ids.CURRENT_MEMORY_USAGE), - (STEP_COLUMN_MAX_MEMORY_USAGE, ids.MAX_MEMORY_USAGE), + (JOB_COLUMN_STDOUT, ids.STDOUT), + (JOB_COLUMN_STDERR, ids.STDERR), + (JOB_COLUMN_CURRENT_MEMORY_USAGE, ids.CURRENT_MEMORY_USAGE), + (JOB_COLUMN_MAX_MEMORY_USAGE, ids.MAX_MEMORY_USAGE), ], NodeType.JOB: [], } @@ -106,7 +104,7 @@ def prerender( so it has to be called.""" reals = snapshot.reals - job_states = snapshot.get_job_status_for_all_reals_and_steps() + job_states = snapshot.get_job_status_for_all_reals() if not reals and not job_states: return None @@ -121,13 +119,13 @@ def prerender( metadata[SORTED_REALIZATION_IDS] = sorted(snapshot.reals.keys(), key=int) metadata[SORTED_JOB_IDS] = defaultdict(dict) for idx, _ in job_states.items(): - real_id, step_id, job_id = idx - if step_id not in metadata[SORTED_JOB_IDS][real_id]: - metadata[SORTED_JOB_IDS][real_id][step_id] = [] - metadata[SORTED_JOB_IDS][real_id][step_id].append(job_id) + real_id, job_id = idx + if real_id not in metadata[SORTED_JOB_IDS]: + metadata[SORTED_JOB_IDS][real_id] = [] + metadata[SORTED_JOB_IDS][real_id].append(job_id) for idx, job_status in job_states.items(): - real_id, step_id, job_id = idx + real_id, job_id = idx # partial snapshot may contain only information about job state if real_id in reals and reals[real_id].status: @@ -187,19 +185,13 @@ def _add_partial_snapshot(self, partial: PartialSnapshot, iter_: int): ] reals_changed.append(real_node.row()) - jobs_changed_by_real_and_step: Mapping[ - Tuple[str, str], Sequence[int] - ] = defaultdict(list) - - for job_idx, job in partial.get_jobs().items(): - real_id = job_idx[0] - step_id = job_idx[1] - job_id = job_idx[2] + jobs_changed_by_real: Mapping[str, Sequence[int]] = defaultdict(list) + for (real_id, job_id), job in partial.get_jobs().items(): real_node = iter_node.children[real_id] - step_node = real_node.children[step_id] - job_node = step_node.children[job_id] - jobs_changed_by_real_and_step[(real_id, step_id)].append(job_node.row()) + job_node = real_node.children[job_id] + + jobs_changed_by_real[real_id].append(job_node.row()) if job.status: job_node.data[ids.STATUS] = job.status @@ -221,22 +213,15 @@ def _add_partial_snapshot(self, partial: PartialSnapshot, iter_: int): # Errors may be unset as the queue restarts the job job_node.data[ids.ERROR] = job.error if job.error else "" - for idx, changed_jobs in jobs_changed_by_real_and_step.items(): - real_id, step_id = idx - real_node = iter_node.children[real_id] - step_node = real_node.children[step_id] + for real_idx, changed_jobs in jobs_changed_by_real.items(): + real_node = iter_node.children[real_idx] real_index = self.index(real_node.row(), 0, iter_index) - step = partial.get_step(real_id, step_id) - if step.status: - step_node.data[ids.STATUS] = step.status - step_index = self.index(step_node.row(), 0, real_index) - - job_top_left = self.index(min(changed_jobs), 0, step_index) + job_top_left = self.index(min(changed_jobs), 0, real_index) job_bottom_right = self.index( max(changed_jobs), - self.columnCount(step_index) - 1, - step_index, + self.columnCount(real_index) - 1, + real_index, ) stack.callback(self.dataChanged.emit, job_top_left, job_bottom_right) @@ -275,15 +260,11 @@ def _add_snapshot(self, snapshot: Snapshot, iter_: int): NodeType.REAL, ) snapshot_tree.add_child(real_node) - for step_id, step in snapshot.steps(real_id).items(): - step_node = Node(step_id, {ids.STATUS: step.status}, NodeType.STEP) - real_node.add_child(step_node) - if real_id in metadata[SORTED_JOB_IDS]: - for job_id in metadata[SORTED_JOB_IDS][real_id][step_id]: - job = snapshot.get_job(real_id, step_id, job_id) - job_dict = dict(job) - job_node = Node(job_id, job_dict, NodeType.JOB) - step_node.add_child(job_node) + + for job_id in metadata[SORTED_JOB_IDS][real_id]: + job = snapshot.get_job(real_id, job_id) + job_node = Node(job_id, dict(job), NodeType.JOB) + real_node.add_child(job_node) if iter_ in self.root.children: self.modelAboutToBeReset.emit() @@ -303,8 +284,10 @@ def columnCount(self, parent: QModelIndex = None): parent = QModelIndex() parent_node = parent.internalPointer() if parent_node is None: - return len(COLUMNS[NodeType.ROOT]) - return len(COLUMNS[parent_node.type]) + count = len(COLUMNS[NodeType.ROOT]) + else: + count = len(COLUMNS[parent_node.type]) + return count def rowCount(self, parent: QModelIndex = None): if parent is None: @@ -320,18 +303,11 @@ def parent(self, index: QModelIndex): if not index.isValid(): return QModelIndex() - child_item = index.internalPointer() - if not hasattr(child_item, "parent"): - raise ValueError( - f"index r{index.row()}/c{index.column()} pointed to parent-less item " - + f"{child_item}" - ) - parentItem = child_item.parent - - if parentItem == self.root: + parent_item = index.internalPointer().parent + if parent_item == self.root: return QModelIndex() - return self.createIndex(parentItem.row(), 0, parentItem) + return self.createIndex(parent_item.row(), 0, parent_item) def data(self, index: QModelIndex, role=Qt.DisplayRole): if not index.isValid(): @@ -349,8 +325,6 @@ def data(self, index: QModelIndex, role=Qt.DisplayRole): return node.type == NodeType.ITER if role == IsRealizationRole: return node.type == NodeType.REAL - if role == IsStepRole: - return node.type == NodeType.STEP if role == IsJobRole: return node.type == NodeType.JOB @@ -389,17 +363,16 @@ def _real_data(self, _index: QModelIndex, node: Node, role: int): if COLOR_RUNNING in node.data[REAL_JOB_STATUS_AGGREGATED].values(): is_running = True - for step_id in node.parent.data[SORTED_JOB_IDS][node.id]: - for job_id in node.parent.data[SORTED_JOB_IDS][node.id][step_id]: - # if queue system status is WAIT, jobs should indicate WAIT - if ( - node.data[REAL_JOB_STATUS_AGGREGATED][job_id] == COLOR_PENDING - and node.data[REAL_STATUS_COLOR] == COLOR_WAITING - and not is_running - ): - colors.append(COLOR_WAITING) - else: - colors.append(node.data[REAL_JOB_STATUS_AGGREGATED][job_id]) + for job_id in node.parent.data[SORTED_JOB_IDS][node.id]: + # if queue system status is WAIT, jobs should indicate WAIT + if ( + node.data[REAL_JOB_STATUS_AGGREGATED][job_id] == COLOR_PENDING + and node.data[REAL_STATUS_COLOR] == COLOR_WAITING + and not is_running + ): + colors.append(COLOR_WAITING) + else: + colors.append(node.data[REAL_JOB_STATUS_AGGREGATED][job_id]) return colors if role == RealLabelHint: @@ -415,8 +388,7 @@ def _real_data(self, _index: QModelIndex, node: Node, role: int): def _job_data(self, index: QModelIndex, node: Node, role: int): if role == Qt.BackgroundRole: assert node.parent # mypy - assert node.parent.parent # mypy - real = node.parent.parent + real = node.parent if COLOR_RUNNING in real.data[REAL_JOB_STATUS_AGGREGATED].values(): return real.data[REAL_JOB_STATUS_AGGREGATED][node.id] @@ -428,8 +400,9 @@ def _job_data(self, index: QModelIndex, node: Node, role: int): ): return COLOR_WAITING return real.data[REAL_JOB_STATUS_AGGREGATED][node.id] + if role == Qt.DisplayRole: - _, data_name = COLUMNS[NodeType.STEP][index.column()] + _, data_name = COLUMNS[NodeType.REAL][index.column()] if data_name in [ids.CURRENT_MEMORY_USAGE, ids.MAX_MEMORY_USAGE]: data = node.data.get(ids.DATA) _bytes = data.get(data_name) if data else None @@ -447,11 +420,12 @@ def _job_data(self, index: QModelIndex, node: Node, role: int): # There is no method for truncating microseconds, so we remove them delta -= datetime.timedelta(microseconds=delta.microseconds) return str(delta) - real = node.parent.parent + real = node.parent if COLOR_RUNNING in real.data[REAL_JOB_STATUS_AGGREGATED].values(): return node.data.get(data_name) + real = node.parent # if queue system status is WAIT, jobs should indicate WAIT if ( data_name in [ids.STATUS] @@ -461,13 +435,13 @@ def _job_data(self, index: QModelIndex, node: Node, role: int): return str("Waiting") return node.data.get(data_name) if role == FileRole: - _, data_name = COLUMNS[NodeType.STEP][index.column()] + _, data_name = COLUMNS[NodeType.REAL][index.column()] if data_name in [ids.STDOUT, ids.STDERR]: return ( node.data.get(data_name) if node.data.get(data_name) else QVariant() ) if role == Qt.ToolTipRole: - _, data_name = COLUMNS[NodeType.STEP][index.column()] + _, data_name = COLUMNS[NodeType.REAL][index.column()] data = None if data_name == ids.ERROR: data = node.data.get(data_name) @@ -487,10 +461,12 @@ def index(self, row: int, column: int, parent: QModelIndex = None) -> QModelInde if parent is None: parent = QModelIndex() if not self.hasIndex(row, column, parent): + # we end here when parent is real_index, that is wrong!!! + + # we only have up to row==2 and column==1, we should have row==2 and column==7 return QModelIndex() parent_item = self.root if not parent.isValid() else parent.internalPointer() - child_item = None try: child_item = list(parent_item.children.values())[row] diff --git a/src/ert/gui/simulation/run_dialog.py b/src/ert/gui/simulation/run_dialog.py index 51f995c0614..e69ab8282ea 100644 --- a/src/ert/gui/simulation/run_dialog.py +++ b/src/ert/gui/simulation/run_dialog.py @@ -96,7 +96,7 @@ def __init__( self._job_label = QLabel(self) - self._job_model = JobListProxyModel(self, 0, 0, 0, 0) + self._job_model = JobListProxyModel(self, 0, 0) self._job_model.setSourceModel(self._snapshot_model) self._job_view = QTableView(self) @@ -226,11 +226,9 @@ def _job_clicked(self, index): @Slot(QModelIndex) def _select_real(self, index): - step = 0 - stage = 0 real = index.row() iter_ = index.model().get_iter() - self._job_model.set_step(iter_, real, stage, step) + self._job_model.set_real(iter_, real) self._job_label.setText( f"Realization id {index.data(RealIens)} in iteration {iter_}" ) diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 4b9722135c2..9529b290795 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -236,7 +236,7 @@ def _translate_change_to_cloudevent( return CloudEvent( { "type": _queue_state_event_type(status), - "source": f"/ert/ensemble/{ens_id}/real/{real_id}/step/{0}", + "source": f"/ert/ensemble/{ens_id}/real/{real_id}", "datacontenttype": "application/json", }, { @@ -478,7 +478,6 @@ def add_dispatch_information_to_jobs_file( data["ens_id"] = ens_id data["real_id"] = self._differ.qindex_to_iens(q_index) - data["step_id"] = 0 data["dispatch_url"] = dispatch_url data["ee_token"] = token data["ee_cert_path"] = cert_path if cert is not None else None diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index c9c3bc0a9fc..6d651db6bff 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -339,21 +339,21 @@ def _build_ensemble( active = run_context.is_active(iens) real = RealizationBuilder().set_iens(iens).active(active) if active: - jobs = [ - LegacyJob( - id_=str(index), - index=str(index), - name=ext_job.name, - ext_job=ext_job, - ) - for index, ext_job in enumerate( - self.ert.resConfig().forward_model_list - ) - ] - real.add_step( + real.set_jobs( + [ + LegacyJob( + id_=str(index), + index=str(index), + name=ext_job.name, + ext_job=ext_job, + ) + for index, ext_job in enumerate( + self.ert.resConfig().forward_model_list + ) + ] + ) + real.set_step( LegacyStep( - id_="0", - jobs=jobs, name="legacy step", max_runtime=self.ert.analysisConfig().max_runtime, run_arg=run_arg, diff --git a/test-data/poly_example/poly_eval.py b/test-data/poly_example/poly_eval.py index 80d2e45e4ae..aad999cb454 100755 --- a/test-data/poly_example/poly_eval.py +++ b/test-data/poly_example/poly_eval.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - import json diff --git a/tests/performance_tests/test_snapshot.py b/tests/performance_tests/test_snapshot.py index bdf3738a900..381a4351043 100644 --- a/tests/performance_tests/test_snapshot.py +++ b/tests/performance_tests/test_snapshot.py @@ -71,9 +71,9 @@ def simulate_forward_model_event_handling( active=True, status=state.REALIZATION_STATE_WAITING, ) - reals[str(real)].steps["0"] = Step(status=state.STEP_STATE_UNKNOWN) + reals[str(real)].step = Step(status=state.STEP_STATE_UNKNOWN) for job_idx in range(forward_models): - reals[f"{real}"].steps["0"].jobs[str(job_idx)] = Job( + reals[f"{real}"].jobs[str(job_idx)] = Job( status=state.JOB_STATE_START, index=job_idx, name=f"FM_{job_idx}", @@ -101,7 +101,7 @@ def simulate_forward_model_event_handling( partial.from_cloudevent( CloudEvent( { - "source": f"/ert/ensemble/{ens_id}/real/{real}/step/0", + "source": f"/ert/ensemble/{ens_id}/real/{real}", "type": ids.EVTYPE_FM_STEP_WAITING, "id": str(uuid.uuid1()), } @@ -114,7 +114,7 @@ def simulate_forward_model_event_handling( CloudEvent( attributes={ "source": f"/ert/ensemble/{ens_id}/" - f"real/{real}/step/0/job/{job_idx}", + f"real/{real}/job/{job_idx}", "type": ids.EVTYPE_FM_JOB_START, "id": str(uuid.uuid1()), }, @@ -127,7 +127,7 @@ def simulate_forward_model_event_handling( CloudEvent( attributes={ "source": f"/ert/ensemble/{ens_id}/" - f"real/{real}/step/0/job/{job_idx}", + f"real/{real}/job/{job_idx}", "type": ids.EVTYPE_FM_JOB_RUNNING, "id": str(uuid.uuid1()), }, @@ -142,7 +142,7 @@ def simulate_forward_model_event_handling( CloudEvent( attributes={ "source": f"/ert/ensemble/{ens_id}/" - f"real/{real}/step/0/job/{job_idx}", + f"real/{real}/job/{job_idx}", "type": ids.EVTYPE_FM_JOB_SUCCESS, "id": str(uuid.uuid1()), }, @@ -153,7 +153,7 @@ def simulate_forward_model_event_handling( partial.from_cloudevent( CloudEvent( { - "source": f"/ert/ensemble/{ens_id}/real/{real}/step/0", + "source": f"/ert/ensemble/{ens_id}/real/{real}", "type": ids.EVTYPE_FM_STEP_SUCCESS, "id": str(uuid.uuid1()), } diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index dcc2f52ef83..482d03f8fa4 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -23,30 +23,26 @@ def snapshot(): return ( SnapshotBuilder() - .add_step(step_id="0", status="Unknown") + .add_step(status="Unknown") .add_job( - step_id="0", job_id="0", index="0", name="job0", status="Unknown", ) .add_job( - step_id="0", job_id="1", index="1", name="job1", status="Unknown", ) .add_job( - step_id="0", job_id="2", index="2", name="job2", status="Unknown", ) .add_job( - step_id="0", job_id="3", index="3", name="job3", @@ -123,7 +119,6 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 ) step = ert.ensemble_evaluator.LegacyStep( - id_="0", job_script="job_dispatch.py", max_runtime=10, run_arg=RunArg( @@ -138,22 +133,23 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 # from the run_arg, the queue wants to access the iens prop num_cpu=1, name="dummy step", - jobs=[ - ert.ensemble_evaluator.LegacyJob( - id_=str(index), - index=str(index), - name=f"dummy job {index}", - ext_job=job, - ) - for index, job in enumerate(ext_job_list) - ], ) + jobs = [ + ert.ensemble_evaluator.LegacyJob( + id_=str(index), + index=str(index), + name=f"dummy job {index}", + ext_job=job, + ) + for index, job in enumerate(ext_job_list) + ] builder.add_realization( ert.ensemble_evaluator.RealizationBuilder() .active(True) .set_iens(iens) - .add_step(step) + .set_step(step) + .set_jobs(jobs) ) analysis_config = Mock() diff --git a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py index 455c9961c5d..c13ef3cad68 100644 --- a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py +++ b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py @@ -44,10 +44,10 @@ def send_dispatch_event(client, event_type, source, event_id, data, **extra_attr class TestEnsemble(Ensemble): __test__ = False - def __init__(self, _iter, reals, steps, jobs, id_): + def __init__(self, _iter, reals, step, jobs, id_): self.iter = _iter self.test_reals = reals - self.steps = steps + self.step = step self.jobs = jobs self.fail_jobs = [] self.result = None @@ -57,25 +57,21 @@ def __init__(self, _iter, reals, steps, jobs, id_): the_reals = [ Realization( real_no, - steps=[ - LegacyStep( - id_=step_no, - jobs=[ - LegacyJob( - id_=job_no, - index=job_no, - name=f"job-{job_no}", - ext_job=None, - ) - for job_no in range(0, jobs) - ], - name=f"step-{step_no}", - max_runtime=0, - num_cpu=0, - run_arg=None, - job_script=None, + step=LegacyStep( + name="thestep", + max_runtime=0, + num_cpu=0, + run_arg=None, + job_script=None, + ), + jobs=[ + LegacyJob( + id_=job_no, + index=job_no, + name=f"job-{job_no}", + ext_job=None, ) - for step_no in range(0, steps) + for job_no in range(0, jobs) ], active=True, ) @@ -106,67 +102,61 @@ def _evaluate(self, url): event_id = event_id + 1 for real in range(0, self.test_reals): - for step in range(0, self.steps): - job_failed = False + job_failed = False + send_dispatch_event( + dispatch, + identifiers.EVTYPE_FM_STEP_UNKNOWN, + f"/ert/ensemble/{self.id_}/real/{real}", + f"event-{event_id}", + None, + ) + event_id = event_id + 1 + for job in range(0, self.jobs): send_dispatch_event( dispatch, - identifiers.EVTYPE_FM_STEP_UNKNOWN, - f"/ert/ensemble/{self.id_}/real/{real}/step/{step}", + identifiers.EVTYPE_FM_JOB_RUNNING, + f"/ert/ensemble/{self.id_}/real/" + f"{real}//job/{job}", f"event-{event_id}", - None, + {"current_memory_usage": 1000}, ) event_id = event_id + 1 - for job in range(0, self.jobs): + if self._shouldFailJob(real, job): send_dispatch_event( dispatch, - identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{self.id_}/real/" - + f"{real}/step/{step}/job/{job}", - f"event-{event_id}", - {"current_memory_usage": 1000}, - ) - event_id = event_id + 1 - if self._shouldFailJob(real, step, job): - send_dispatch_event( - dispatch, - identifiers.EVTYPE_FM_JOB_FAILURE, - f"/ert/ensemble/{self.id_}/real/" - + f"{real}/step/{step}/job/{job}", - f"event-{event_id}", - {}, - ) - event_id = event_id + 1 - job_failed = True - break - send_dispatch_event( - dispatch, - identifiers.EVTYPE_FM_JOB_SUCCESS, - f"/ert/ensemble/{self.id_}/real/" - + f"{real}/step/{step}/job/{job}", - f"event-{event_id}", - {"current_memory_usage": 1000}, - ) - event_id = event_id + 1 - if job_failed: - send_dispatch_event( - dispatch, - identifiers.EVTYPE_FM_STEP_FAILURE, - f"/ert/ensemble/{self.id_}/real/" - + f"{real}/step/{step}/job/{job}", - f"event-{event_id}", - {}, - ) - event_id = event_id + 1 - else: - send_dispatch_event( - dispatch, - identifiers.EVTYPE_FM_STEP_SUCCESS, - f"/ert/ensemble/{self.id_}/real/" - + f"{real}/step/{step}/job/{job}", + identifiers.EVTYPE_FM_JOB_FAILURE, + f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", f"event-{event_id}", {}, ) event_id = event_id + 1 + job_failed = True + break + send_dispatch_event( + dispatch, + identifiers.EVTYPE_FM_JOB_SUCCESS, + f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + f"event-{event_id}", + {"current_memory_usage": 1000}, + ) + event_id = event_id + 1 + if job_failed: + send_dispatch_event( + dispatch, + identifiers.EVTYPE_FM_STEP_FAILURE, + f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + f"event-{event_id}", + {}, + ) + event_id = event_id + 1 + else: + send_dispatch_event( + dispatch, + identifiers.EVTYPE_FM_STEP_SUCCESS, + f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + f"event-{event_id}", + {}, + ) + event_id = event_id + 1 data = self.result if self.result else None extra_attrs = {} @@ -194,11 +184,11 @@ def evaluate(self, config): def start(self): self._eval_thread.start() - def _shouldFailJob(self, real, step, job): - return (real, 0, step, job) in self.fail_jobs + def _shouldFailJob(self, real, job): + return (real, 0, job) in self.fail_jobs - def addFailJob(self, real, step, job): - self.fail_jobs.append((real, 0, step, job)) + def addFailJob(self, real, job): + self.fail_jobs.append((real, 0, job)) def with_result(self, result, datacontenttype): self.result = result diff --git a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py index 60a41b4d490..d2ad301460b 100644 --- a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py +++ b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py @@ -55,7 +55,7 @@ async def test_happy_path( Driver.create_driver(queue_config), max_submit=queue_config.max_submit ) for real in ensemble.reals: - queue.add_ee_stage(real.steps[0], None) + queue.add_ee_stage(real.step, None) await queue.execute_queue_via_websockets( url, "ee_0", threading.BoundedSemaphore(value=10), None @@ -69,7 +69,7 @@ async def test_happy_path( assert mock_ws_task.done() event_0 = from_json(mock_ws_task.result()[0]) - assert event_0["source"] == "/ert/ensemble/ee_0/real/0/step/0" + assert event_0["source"] == "/ert/ensemble/ee_0/real/0" assert event_0["type"] == "com.equinor.ert.forward_model_step.waiting" assert event_0.data == {"queue_event_type": "WAITING"} diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py index 841e6a40ad1..b18d72f31f4 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py @@ -21,24 +21,25 @@ def test_build_ensemble(active_real): .add_realization( RealizationBuilder() .set_iens(2) - .add_step( + .set_step( LegacyStep( run_arg=MagicMock(), job_script="job_script", num_cpu=1, - jobs=[ - LegacyJob( - ext_job=MagicMock(), - id_="4", - index="5", - name="echo_command", - ) - ], - id_="3", name="some_step", max_runtime=0, ) ) + .set_jobs( + [ + LegacyJob( + ext_job=MagicMock(), + id_="4", + index="5", + name="echo_command", + ) + ] + ) .active(active_real) ) .set_id("1") diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index 387891a0af7..a077c0c6742 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -70,9 +70,9 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): evt = next(events) snapshot = Snapshot(evt.data) - assert snapshot.get_job("0", "0", "0").status == JOB_STATE_RUNNING - assert snapshot.get_job("1", "0", "0").status == JOB_STATE_RUNNING - assert snapshot.get_job("1", "0", "1").status == JOB_STATE_RUNNING + assert snapshot.get_job("0", "0").status == JOB_STATE_RUNNING + assert snapshot.get_job("1", "0").status == JOB_STATE_RUNNING + assert snapshot.get_job("1", "1").status == JOB_STATE_RUNNING # take down first monitor by leaving context with Client( @@ -112,9 +112,9 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): assert full_snapshot_event["type"] == identifiers.EVTYPE_EE_SNAPSHOT snapshot = Snapshot(full_snapshot_event.data) assert snapshot.status == ENSEMBLE_STATE_UNKNOWN - assert snapshot.get_job("0", "0", "0").status == JOB_STATE_RUNNING - assert snapshot.get_job("1", "0", "0").status == JOB_STATE_FINISHED - assert snapshot.get_job("1", "0", "1").status == JOB_STATE_FAILURE + assert snapshot.get_job("0", "0").status == JOB_STATE_RUNNING + assert snapshot.get_job("1", "0").status == JOB_STATE_FINISHED + assert snapshot.get_job("1", "1").status == JOB_STATE_FAILURE def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluator( @@ -183,9 +183,9 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat ) evt = next(events) snapshot = Snapshot(evt.data) - assert snapshot.get_job("1", "0", "0").status == JOB_STATE_FINISHED - assert snapshot.get_job("0", "0", "0").status == JOB_STATE_RUNNING - assert snapshot.get_job("1", "0", "1").status == JOB_STATE_FAILURE + assert snapshot.get_job("1", "0").status == JOB_STATE_FINISHED + assert snapshot.get_job("0", "0").status == JOB_STATE_RUNNING + assert snapshot.get_job("1", "1").status == JOB_STATE_FAILURE # a second monitor connects with Monitor(evaluator._config.get_connection_info()) as monitor2: @@ -194,9 +194,9 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat assert full_snapshot_event["type"] == identifiers.EVTYPE_EE_SNAPSHOT snapshot = Snapshot(full_snapshot_event.data) assert snapshot.status == ENSEMBLE_STATE_UNKNOWN - assert snapshot.get_job("1", "0", "0").status == JOB_STATE_FINISHED - assert snapshot.get_job("0", "0", "0").status == JOB_STATE_RUNNING - assert snapshot.get_job("1", "0", "1").status == JOB_STATE_FAILURE + assert snapshot.get_job("1", "0").status == JOB_STATE_FINISHED + assert snapshot.get_job("0", "0").status == JOB_STATE_RUNNING + assert snapshot.get_job("1", "1").status == JOB_STATE_FAILURE # one monitor requests that server exit monitor.signal_cancel() diff --git a/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py b/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py index 117d2ff6681..310cf831a09 100644 --- a/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py +++ b/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py @@ -19,7 +19,7 @@ def build_snapshot(real_list: Optional[List[str]] = None): real_list = ["0"] return ( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_UNKNOWN) + .add_step(status=state.STEP_STATE_UNKNOWN) .build(real_list, state.REALIZATION_STATE_UNKNOWN) ) @@ -65,9 +65,7 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 2, @@ -93,9 +91,7 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 0, @@ -121,9 +117,7 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 0, @@ -149,12 +143,8 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) - .update_step( - "1", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 0, @@ -172,9 +162,7 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 1, @@ -205,12 +193,8 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) - .update_step( - "1", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 0, @@ -228,12 +212,8 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) - .update_step( - "1", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 1, @@ -264,12 +244,8 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) - .update_step( - "1", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 1, @@ -287,12 +263,8 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step( - "0", "0", Step(status=state.STEP_STATE_SUCCESS) - ) - .update_step( - "1", "0", Step(status=state.STEP_STATE_SUCCESS) - ) + .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) .to_dict() ), "iter": 2, diff --git a/tests/unit_tests/ensemble_evaluator/test_snapshot.py b/tests/unit_tests/ensemble_evaluator/test_snapshot.py index fefc43457de..8a17b5ddc6b 100644 --- a/tests/unit_tests/ensemble_evaluator/test_snapshot.py +++ b/tests/unit_tests/ensemble_evaluator/test_snapshot.py @@ -12,7 +12,6 @@ SnapshotBuilder, _get_job_id, _get_real_id, - _get_step_id, ) @@ -20,7 +19,6 @@ def test_snapshot_merge(snapshot: Snapshot): update_event = PartialSnapshot(snapshot) update_event.update_job( real_id="1", - step_id="0", job_id="0", job=Job( status="Finished", @@ -31,7 +29,6 @@ def test_snapshot_merge(snapshot: Snapshot): ) update_event.update_job( real_id="1", - step_id="0", job_id="1", job=Job( status="Running", @@ -41,7 +38,6 @@ def test_snapshot_merge(snapshot: Snapshot): ) update_event.update_job( real_id="9", - step_id="0", job_id="0", job=Job( status="Running", @@ -54,7 +50,7 @@ def test_snapshot_merge(snapshot: Snapshot): assert snapshot.status == state.ENSEMBLE_STATE_UNKNOWN - assert snapshot.get_job(real_id="1", step_id="0", job_id="0") == Job( + assert snapshot.get_job(real_id="1", job_id="0") == Job( status="Finished", index="0", start_time=datetime(year=2020, month=10, day=27), @@ -62,15 +58,15 @@ def test_snapshot_merge(snapshot: Snapshot): name="job0", ) - assert snapshot.get_job(real_id="1", step_id="0", job_id="1") == Job( + assert snapshot.get_job(real_id="1", job_id="1") == Job( status="Running", index="1", start_time=datetime(year=2020, month=10, day=27), name="job1", ) - assert snapshot.get_job(real_id="9", step_id="0", job_id="0").status == "Running" - assert snapshot.get_job(real_id="9", step_id="0", job_id="0") == Job( + assert snapshot.get_job(real_id="9", job_id="0").status == "Running" + assert snapshot.get_job(real_id="9", job_id="0") == Job( status="Running", index="0", start_time=datetime(year=2020, month=10, day=27), @@ -82,30 +78,29 @@ def test_snapshot_merge(snapshot: Snapshot): "source_string, expected_ids", [ ( - "/ert/ee/0/real/1111/step/asd123ASD/job/0", - {"real": "1111", "step": "asd123ASD", "job": "0"}, + "/ert/ee/0/real/1111/job/0", + {"real": "1111", "job": "0"}, ), ( - "/ert/ee/0/real/1111/step/asd123ASD", - {"real": "1111", "step": "asd123ASD", "job": None}, + "/ert/ee/0/real/1111", + {"real": "1111", "job": None}, ), ( "/ert/ee/0/real/1111", - {"real": "1111", "step": None, "job": None}, + {"real": "1111", "job": None}, ), ( "/ert/ee/0/real/1111", - {"real": "1111", "step": None, "job": None}, + {"real": "1111", "job": None}, ), ( "/ert/ee/0", - {"real": None, "step": None, "job": None}, + {"real": None, "job": None}, ), ], ) def test_source_get_ids(source_string, expected_ids): assert _get_real_id(source_string) == expected_ids["real"] - assert _get_step_id(source_string) == expected_ids["step"] assert _get_job_id(source_string) == expected_ids["job"] @@ -116,7 +111,7 @@ def test_update_partial_from_multiple_cloudevents(snapshot): attributes={ "id": "0", "type": ids.EVTYPE_FM_JOB_RUNNING, - "source": "/real/0/step/0/job/0", + "source": "/real/0/job/0", }, data={ "current_memory_usage": 5, @@ -129,7 +124,7 @@ def test_update_partial_from_multiple_cloudevents(snapshot): { "id": "0", "type": ids.EVTYPE_FM_JOB_FAILURE, - "source": "/real/0/step/0/job/0", + "source": "/real/0/job/0", }, {ids.ERROR_MSG: "failed"}, ) @@ -139,11 +134,11 @@ def test_update_partial_from_multiple_cloudevents(snapshot): { "id": "1", "type": ids.EVTYPE_FM_JOB_SUCCESS, - "source": "/real/0/step/0/job/1", + "source": "/real/0/job/1", } ) ) - jobs = partial.to_dict()["reals"]["0"]["steps"]["0"]["jobs"] + jobs = partial.to_dict()["reals"]["0"]["jobs"] assert jobs["0"]["status"] == state.JOB_STATE_FAILURE assert jobs["1"]["status"] == state.JOB_STATE_FINISHED @@ -153,9 +148,7 @@ def test_multiple_cloud_events_trigger_non_communicated_change(): explicitly send an event that changes the realization status. It should happen by virtue of the steps being completed.""" snapshot = ( - SnapshotBuilder() - .add_step(step_id="0", status="Unknown") - .build(["0"], status="Unknown") + SnapshotBuilder().add_step(status="Unknown").build(["0"], status="Unknown") ) partial = PartialSnapshot(snapshot) partial.from_cloudevent( @@ -163,7 +156,7 @@ def test_multiple_cloud_events_trigger_non_communicated_change(): { "id": "0", "type": ids.EVTYPE_FM_STEP_SUCCESS, - "source": "/real/0/step/0", + "source": "/real/0", } ) ) diff --git a/tests/unit_tests/gui/conftest.py b/tests/unit_tests/gui/conftest.py index 497e6fca048..ffc0ce38092 100644 --- a/tests/unit_tests/gui/conftest.py +++ b/tests/unit_tests/gui/conftest.py @@ -234,46 +234,46 @@ def full_snapshot() -> Snapshot: steps={ "0": Step( status="", - jobs={ - "0": Job( - start_time=dt.now(), - end_time=dt.now(), - name="poly_eval", - index="0", - status=JOB_STATE_START, - error="error", - stdout="std_out_file", - stderr="std_err_file", - current_memory_usage="123", - max_memory_usage="312", - ), - "1": Job( - start_time=dt.now(), - end_time=dt.now(), - name="poly_postval", - index="1", - status=JOB_STATE_START, - error="error", - stdout="std_out_file", - stderr="std_err_file", - current_memory_usage="123", - max_memory_usage="312", - ), - "2": Job( - start_time=dt.now(), - end_time=None, - name="poly_post_mortem", - index="2", - status=JOB_STATE_START, - error="error", - stdout="std_out_file", - stderr="std_err_file", - current_memory_usage="123", - max_memory_usage="312", - ), - }, ) }, + jobs={ + "0": Job( + start_time=dt.now(), + end_time=dt.now(), + name="poly_eval", + index="0", + status=JOB_STATE_START, + error="error", + stdout="std_out_file", + stderr="std_err_file", + current_memory_usage="123", + max_memory_usage="312", + ), + "1": Job( + start_time=dt.now(), + end_time=dt.now(), + name="poly_postval", + index="1", + status=JOB_STATE_START, + error="error", + stdout="std_out_file", + stderr="std_err_file", + current_memory_usage="123", + max_memory_usage="312", + ), + "2": Job( + start_time=dt.now(), + end_time=None, + name="poly_post_mortem", + index="2", + status=JOB_STATE_START, + error="error", + stdout="std_out_file", + stderr="std_err_file", + current_memory_usage="123", + max_memory_usage="312", + ), + }, ) snapshot = SnapshotDict( status=ENSEMBLE_STATE_STARTED, @@ -287,10 +287,9 @@ def full_snapshot() -> Snapshot: @pytest.fixture() def large_snapshot() -> Snapshot: - builder = SnapshotBuilder().add_step(step_id="0", status=STEP_STATE_UNKNOWN) + builder = SnapshotBuilder().add_step(status=STEP_STATE_UNKNOWN) for i in range(0, 150): builder.add_job( - step_id="0", job_id=str(i), index=str(i), name=f"job_{i}", @@ -308,10 +307,9 @@ def large_snapshot() -> Snapshot: @pytest.fixture() def small_snapshot() -> Snapshot: - builder = SnapshotBuilder().add_step(step_id="0", status=STEP_STATE_UNKNOWN) + builder = SnapshotBuilder().add_step(status=STEP_STATE_UNKNOWN) for i in range(0, 2): builder.add_job( - step_id="0", job_id=str(i), index=str(i), name=f"job_{i}", diff --git a/tests/unit_tests/gui/model/gui_models_utils.py b/tests/unit_tests/gui/model/gui_models_utils.py index dae2b0f9de2..4fd60b26fe4 100644 --- a/tests/unit_tests/gui/model/gui_models_utils.py +++ b/tests/unit_tests/gui/model/gui_models_utils.py @@ -5,5 +5,5 @@ def partial_snapshot(snapshot) -> PartialSnapshot: partial = PartialSnapshot(snapshot) partial._realization_states["0"].update({"status": JOB_STATE_FINISHED}) - partial.update_job("0", "0", "0", Job(status=JOB_STATE_FINISHED)) + partial.update_job("0", "0", Job(status=JOB_STATE_FINISHED)) return partial diff --git a/tests/unit_tests/gui/model/test_job_list.py b/tests/unit_tests/gui/model/test_job_list.py index b40ac7da3dc..b928031449f 100644 --- a/tests/unit_tests/gui/model/test_job_list.py +++ b/tests/unit_tests/gui/model/test_job_list.py @@ -21,7 +21,7 @@ def _id_to_col(identifier): - for col, fields in enumerate(COLUMNS[NodeType.STEP]): + for col, fields in enumerate(COLUMNS[NodeType.REAL]): if fields[1] == identifier: return col raise ValueError(f"{identifier} not a column in {COLUMNS}") @@ -31,7 +31,7 @@ def test_using_qt_model_tester(qtmodeltester, full_snapshot): partial = partial_snapshot(full_snapshot) source_model = SnapshotModel() - model = JobListProxyModel(None, 0, 0, 0, 0) + model = JobListProxyModel(None, 0, 0) model.setSourceModel(source_model) reporting_mode = qt_api.QtTest.QAbstractItemModelTester.FailureReportingMode.Warning @@ -52,7 +52,7 @@ def test_using_qt_model_tester(qtmodeltester, full_snapshot): def test_changes(full_snapshot): source_model = SnapshotModel() - model = JobListProxyModel(None, 0, 0, 0, 0) + model = JobListProxyModel(None, 0, 0) model.setSourceModel(source_model) reporting_mode = qt_api.QtTest.QAbstractItemModelTester.FailureReportingMode.Warning @@ -61,15 +61,12 @@ def test_changes(full_snapshot): ) source_model._add_snapshot(SnapshotModel.prerender(full_snapshot), 0) - assert ( - model.index(0, _id_to_col(ids.STATUS), QModelIndex()).data() == JOB_STATE_START - ) + assert model.index(0, _id_to_col(ids.STATUS)).data() == JOB_STATE_START partial = PartialSnapshot(full_snapshot) start_time = datetime.datetime(year=2020, month=10, day=27, hour=12) end_time = datetime.datetime(year=2020, month=10, day=28, hour=13) partial.update_job( - "0", "0", "0", job=Job( @@ -94,7 +91,7 @@ def test_changes(full_snapshot): def test_duration(mock_datetime, timezone, full_snapshot): source_model = SnapshotModel() - model = JobListProxyModel(None, 0, 0, 0, 0) + model = JobListProxyModel(None, 0, 0) model.setSourceModel(source_model) reporting_mode = qt_api.QtTest.QAbstractItemModelTester.FailureReportingMode.Warning @@ -123,7 +120,6 @@ def test_duration(mock_datetime, timezone, full_snapshot): tzinfo=timezone, ) partial.update_job( - "0", "0", "2", job=Job( @@ -142,7 +138,7 @@ def test_duration(mock_datetime, timezone, full_snapshot): def test_no_cross_talk(full_snapshot): source_model = SnapshotModel() - model = JobListProxyModel(None, 0, 0, 0, 0) + model = JobListProxyModel(None, 0, 0) model.setSourceModel(source_model) reporting_mode = qt_api.QtTest.QAbstractItemModelTester.FailureReportingMode.Warning @@ -153,13 +149,13 @@ def test_no_cross_talk(full_snapshot): # Test that changes to iter=1 does not bleed into iter=0 partial = PartialSnapshot(full_snapshot) - partial.update_job("0", "0", "0", job=Job(status=JOB_STATE_FAILURE)) + partial.update_job("0", "0", job=Job(status=JOB_STATE_FAILURE)) source_model._add_partial_snapshot(SnapshotModel.prerender(partial), 1) assert ( model.index(0, _id_to_col(ids.STATUS), QModelIndex()).data() == JOB_STATE_START ) - model.set_step(1, 0, 0, 0) + model.set_real(1, 0) assert ( model.index(0, _id_to_col(ids.STATUS), QModelIndex()).data() == JOB_STATE_FAILURE diff --git a/tests/unit_tests/gui/model/test_snapshot.py b/tests/unit_tests/gui/model/test_snapshot.py index f76a757f09c..44866f0f34d 100644 --- a/tests/unit_tests/gui/model/test_snapshot.py +++ b/tests/unit_tests/gui/model/test_snapshot.py @@ -45,7 +45,7 @@ def test_realization_job_hint(full_snapshot): model._add_snapshot(SnapshotModel.prerender(full_snapshot), 0) partial = PartialSnapshot(full_snapshot) - partial.update_job("0", "0", "0", Job(status=JOB_STATE_RUNNING)) + partial.update_job("0", "0", Job(status=JOB_STATE_RUNNING)) model._add_partial_snapshot(SnapshotModel.prerender(partial), 0) first_real = model.index(0, 0, model.index(0, 0)) diff --git a/tests/unit_tests/gui/simulation/test_run_dialog.py b/tests/unit_tests/gui/simulation/test_run_dialog.py index 0afd3bb7be5..2d6e92655f0 100644 --- a/tests/unit_tests/gui/simulation/test_run_dialog.py +++ b/tests/unit_tests/gui/simulation/test_run_dialog.py @@ -131,9 +131,8 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_UNKNOWN) + .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( - step_id="0", job_id="0", index="0", name="job_0", @@ -171,9 +170,8 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_UNKNOWN) + .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( - step_id="0", job_id="0", index="0", name="job_0", @@ -193,7 +191,7 @@ def test_large_snapshot( SnapshotUpdateEvent( partial_snapshot=PartialSnapshot( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_SUCCESS) + .add_step(status=state.STEP_STATE_SUCCESS) .build(["0"], status=state.REALIZATION_STATE_FINISHED) ), phase_name="Foo", @@ -213,16 +211,14 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_UNKNOWN) + .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( - step_id="0", job_id="0", index="0", name="job_0", status=state.JOB_STATE_START, ) .add_job( - step_id="0", job_id="1", index="1", name="job_1", @@ -240,9 +236,8 @@ def test_large_snapshot( SnapshotUpdateEvent( partial_snapshot=PartialSnapshot( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_SUCCESS) + .add_step(status=state.STEP_STATE_SUCCESS) .add_job( - step_id="0", job_id="0", index="0", status=state.JOB_STATE_FINISHED, @@ -260,9 +255,8 @@ def test_large_snapshot( SnapshotUpdateEvent( partial_snapshot=PartialSnapshot( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_FAILURE) + .add_step(status=state.STEP_STATE_FAILURE) .add_job( - step_id="0", job_id="1", index="1", status=state.JOB_STATE_FAILURE, @@ -287,9 +281,8 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_UNKNOWN) + .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( - step_id="0", job_id="0", index="0", name="job_0", @@ -307,9 +300,8 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(step_id="0", status=state.STEP_STATE_UNKNOWN) + .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( - step_id="0", job_id="0", index="0", name="job_0", diff --git a/tests/unit_tests/job_queue/test_job_queue.py b/tests/unit_tests/job_queue/test_job_queue.py index 0ac4e503e16..9f222274550 100644 --- a/tests/unit_tests/job_queue/test_job_queue.py +++ b/tests/unit_tests/job_queue/test_job_queue.py @@ -224,7 +224,6 @@ def test_add_dispatch_info(tmpdir, monkeypatch, simple_script): for runpath in runpaths: job_file_path = runpath / "jobs.json" content: dict = json.loads(job_file_path.read_text(encoding="utf-8")) - assert content["step_id"] == 0 assert content["dispatch_url"] == dispatch_url assert content["ee_token"] == token assert content["experiment_id"] == "experiment_id" @@ -254,7 +253,6 @@ def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script): for runpath in runpaths: job_file_path = runpath / "jobs.json" content: dict = json.loads(job_file_path.read_text(encoding="utf-8")) - assert content["step_id"] == 0 assert content["dispatch_url"] == dispatch_url assert content["ee_token"] == token assert content["experiment_id"] is None diff --git a/tests/unit_tests/job_runner/test_event_reporter.py b/tests/unit_tests/job_runner/test_event_reporter.py index 93cbe97996b..7205dbb9682 100644 --- a/tests/unit_tests/job_runner/test_event_reporter.py +++ b/tests/unit_tests/job_runner/test_event_reporter.py @@ -35,14 +35,14 @@ def test_report_with_successful_start_message_argument(unused_tcp_port): job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Start(job1)) reporter.report(Finish()) assert len(lines) == 1 event = json.loads(lines[0]) assert event["type"] == _FM_JOB_START - assert event["source"] == "/ert/ensemble/ens_id/real/0/step/0/job/0/index/0" + assert event["source"] == "/ert/ensemble/ens_id/real/0/job/0/index/0" assert os.path.basename(event["data"]["stdout"]) == "stdout" assert os.path.basename(event["data"]["stderr"]) == "stderr" @@ -56,7 +56,7 @@ def test_report_with_failed_start_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) msg = Start(job1).with_error("massive_failure") @@ -77,7 +77,7 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Exited(job1, 0)) reporter.report(Finish().with_error("failed")) @@ -94,7 +94,7 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Exited(job1, 1).with_error("massive_failure")) reporter.report(Finish()) @@ -112,7 +112,7 @@ def test_report_with_running_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(job1, 100, 10)) reporter.report(Finish()) @@ -131,7 +131,7 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(job1, 100, 10)) reporter.report(Finish()) @@ -146,7 +146,7 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(job1, 100, 10)) reporter.report(Finish().with_error("massive_failure")) @@ -187,7 +187,7 @@ def mock_send(msg): lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): with patch("_ert_job_runner.client.Client.send", lambda x, y: mock_send(y)): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(job1, 100, 10)) reporter.report(Running(job1, 100, 10)) reporter.report(Running(job1, 100, 10)) @@ -225,7 +225,7 @@ def send_func(msg): with patch("_ert_job_runner.client.Client.send") as patched_send: patched_send.side_effect = send_func - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(job1, 100, 10)) reporter.report(Running(job1, 200, 10)) reporter.report(Running(job1, 300, 10)) @@ -262,7 +262,7 @@ def mock_send(msg): job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) lines = [] with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0, step_id=0)) + reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(job1, 100, 10)) reporter.report(Running(job1, 200, 10)) diff --git a/tests/unit_tests/status/test_tracking_integration.py b/tests/unit_tests/status/test_tracking_integration.py index 18c99db92c6..e8d0d275dee 100644 --- a/tests/unit_tests/status/test_tracking_integration.py +++ b/tests/unit_tests/status/test_tracking_integration.py @@ -68,10 +68,10 @@ def check_expression(original, path_expression, expected, msg_start): 1, 1.0, [ - (".*", "reals.*.steps.*.jobs.*.status", JOB_STATE_FAILURE), + (".*", "reals.*.jobs.*.status", JOB_STATE_FAILURE), ( ".*", - "reals.*.steps.*.jobs.*.error", + "reals.*.jobs.*.error", "The run is cancelled due to reaching MAX_RUNTIME", ), ], @@ -90,7 +90,7 @@ def check_expression(original, path_expression, expected, msg_start): 2, 1, 1.0, - [(".*", "reals.*.steps.*.jobs.*.status", JOB_STATE_FINISHED)], + [(".*", "reals.*.jobs.*.status", JOB_STATE_FINISHED)], [RealizationState.HAS_DATA] * 2, id="ee_poly_experiment", ), @@ -108,7 +108,7 @@ def check_expression(original, path_expression, expected, msg_start): 2, 2, 1.0, - [(".*", "reals.*.steps.*.jobs.*.status", JOB_STATE_FINISHED)], + [(".*", "reals.*.jobs.*.status", JOB_STATE_FINISHED)], [RealizationState.HAS_DATA] * 2, id="ee_poly_smoother", ), @@ -128,9 +128,9 @@ def check_expression(original, path_expression, expected, msg_start): # Fails halfway, due to unable to run update 0.5, [ - ("0", "reals.'0'.steps.*.jobs.'0'.status", JOB_STATE_FAILURE), - ("0", "reals.'0'.steps.*.jobs.'1'.status", JOB_STATE_START), - (".*", "reals.'1'.steps.*.jobs.*.status", JOB_STATE_FINISHED), + ("0", "reals.'0'.jobs.'0'.status", JOB_STATE_FAILURE), + ("0", "reals.'0'.jobs.'1'.status", JOB_STATE_START), + (".*", "reals.'1'.jobs.*.status", JOB_STATE_FINISHED), ], [ RealizationState.LOAD_FAILURE,