Skip to content

Commit

Permalink
Remove the possibility for multiple Steps in a Realization
Browse files Browse the repository at this point in the history
Step "id" is removed, as there is now only one step (the "zero" step).
  • Loading branch information
berland committed Oct 20, 2023
1 parent cca7fc8 commit 2c8ddb1
Show file tree
Hide file tree
Showing 30 changed files with 410 additions and 545 deletions.
8 changes: 2 additions & 6 deletions src/_ert_job_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def __init__(self, evaluator_url, token=None, cert_path=None):

self._ens_id = None
self._real_id = None
self._step_id = None
self._event_queue = queue.Queue()
self._event_publisher_thread = threading.Thread(target=self._event_publisher)
self._sentinel = object() # notifying the queue's ended
Expand Down Expand Up @@ -124,20 +123,17 @@ def _dump_event(self, attributes: Dict[str, str], data: Any = None):
logger.debug(f'Schedule {type(event)} "{event["type"]}" for delivery')
self._event_queue.put(event)

def _step_path(self):
return f"/ert/ensemble/{self._ens_id}/real/{self._real_id}/step/{self._step_id}"

def _init_handler(self, msg):
self._ens_id = msg.ens_id
self._real_id = msg.real_id
self._step_id = msg.step_id
self._event_publisher_thread.start()

def _job_handler(self, msg: Message):
job_name = msg.job.name()
job_msg_attrs = {
_JOB_SOURCE: (
f"{self._step_path()}/job/{msg.job.index}" f"/index/{msg.job.index}"
f"/ert/ensemble/{self._ens_id}/real/{self._real_id}/"
f"job/{msg.job.index}/index/{msg.job.index}"
),
_CONTENT_TYPE: "application/json",
}
Expand Down
9 changes: 4 additions & 5 deletions src/ert/cli/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ def _print_job_errors(self) -> None:
failed_jobs: Dict[Optional[str], int] = {}
for snapshot in self._snapshots.values():
for real in snapshot.reals.values():
for step in real.steps.values():
for job in step.jobs.values():
if job.status == JOB_STATE_FAILURE:
result = failed_jobs.get(job.error, 0)
failed_jobs[job.error] = result + 1
for job in real.jobs.values():
if job.status == JOB_STATE_FAILURE:
result = failed_jobs.get(job.error, 0)
failed_jobs[job.error] = result + 1
for error, number_of_jobs in failed_jobs.items():
print(f"{number_of_jobs} jobs failed due to the error: {error}")

Expand Down
15 changes: 6 additions & 9 deletions src/ert/ensemble_evaluator/_builder/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,13 @@ def _create_snapshot(self) -> Snapshot:
active=True,
status=state.REALIZATION_STATE_WAITING,
)
for step in real.steps:
reals[str(real.iens)].steps[str(step.id_)] = Step(
status=state.STEP_STATE_UNKNOWN
reals[str(real.iens)].step = Step(status=state.STEP_STATE_UNKNOWN)
for job in real.jobs:
reals[str(real.iens)].jobs[str(job.id_)] = Job(
status=state.JOB_STATE_START,
index=job.index,
name=job.name,
)
for job in step.jobs:
reals[str(real.iens)].steps[str(step.id_)].jobs[str(job.id_)] = Job(
status=state.JOB_STATE_START,
index=job.index,
name=job.name,
)
top = SnapshotDict(
reals=reals,
status=state.ENSEMBLE_STATE_UNKNOWN,
Expand Down
4 changes: 2 additions & 2 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def generate_event_creator(
def event_builder(status: str, real_id: Optional[int] = None) -> CloudEvent:
source = f"/ert/ensemble/{self.id_}"
if real_id is not None:
source += f"/real/{real_id}/step/0"
source += f"/real/{real_id}"
return CloudEvent(
{
"type": status,
Expand Down Expand Up @@ -191,7 +191,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches

# Submit all jobs to queue and inform queue when done
for real in self.active_reals:
self._job_queue.add_ee_stage(real.steps[0], callback_timeout=on_timeout)
self._job_queue.add_ee_stage(real.step, callback_timeout=on_timeout)

# TODO: this is sort of a callback being preemptively called.
# It should be lifted out of the queue/evaluate, into the evaluator. If
Expand Down
26 changes: 17 additions & 9 deletions src/ert/ensemble_evaluator/_builder/_realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing_extensions import Self

from ._step import LegacyStep
from ._step import LegacyStep, LegacyJob

SOURCE_TEMPLATE_REAL = "/real/{iens}"

Expand All @@ -14,34 +14,41 @@ class Realization:
def __init__(
self,
iens: int,
steps: Sequence[LegacyStep],
step: LegacyStep,
jobs: Sequence[LegacyJob],
active: bool,
):
if iens is None:
raise ValueError(f"{self} needs iens")
if steps is None:
raise ValueError(f"{self} needs steps")
if jobs is None:
raise ValueError(f"{self} needs jobs")
if active is None:
raise ValueError(f"{self} needs to be set either active or not")

self.iens = iens
self.steps = steps
self.step = step
self.jobs = jobs
self.active = active


class RealizationBuilder:
def __init__(self) -> None:
self._steps: List[LegacyStep] = []
self._step: Optional[LegacyStep] = None
self._active: Optional[bool] = None
self._iens: Optional[int] = None
self._parent_source: Optional[str] = None
self._jobs: Sequence[LegacyJob] = []

def active(self, active: bool) -> Self:
self._active = active
return self

def add_step(self, step: LegacyStep) -> Self:
self._steps.append(step)
def set_step(self, step: LegacyStep) -> Self:
self._step = step
return self

def set_jobs(self, jobs: Sequence[LegacyJob]) -> Self:
self._jobs = jobs
return self

def set_iens(self, iens: int) -> Self:
Expand All @@ -58,6 +65,7 @@ def build(self) -> Realization:

return Realization(
self._iens,
self._steps,
self._step,

Check failure on line 68 in src/ert/ensemble_evaluator/_builder/_realization.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Argument 2 to "Realization" has incompatible type "LegacyStep | None"; expected "LegacyStep"
self._jobs,
self._active,
)
3 changes: 0 additions & 3 deletions src/ert/ensemble_evaluator/_builder/_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from ert.config.ext_job import ExtJob

SOURCE_TEMPLATE_STEP = "/step/{step_id}"
if TYPE_CHECKING:
from ert.run_arg import RunArg

Expand All @@ -20,8 +19,6 @@ class LegacyJob:

@dataclass
class LegacyStep:
id_: str
jobs: Sequence[LegacyJob]
name: str
max_runtime: Optional[int]
run_arg: "RunArg"
Expand Down
Loading

0 comments on commit 2c8ddb1

Please sign in to comment.