From baff72bed2946cceea767e9d62f79e8aa4140190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Therese=20Natter=C3=B8y?= <61694854+tnatt@users.noreply.github.com> Date: Wed, 3 Apr 2024 14:59:01 +0200 Subject: [PATCH] CLN: Use pydantic in FmuProvider --- src/fmu/dataio/_metadata.py | 6 +- src/fmu/dataio/_utils.py | 4 +- src/fmu/dataio/aggregation.py | 2 +- src/fmu/dataio/dataio.py | 17 +- .../datastructure/_internal/internal.py | 12 +- src/fmu/dataio/providers/_fmu.py | 441 +++++++----------- tests/conftest.py | 16 + .../ertrun1/share/metadata/fmu_case.yml | 2 +- tests/test_units/test_fmuprovider_class.py | 164 ++++--- 9 files changed, 318 insertions(+), 346 deletions(-) diff --git a/src/fmu/dataio/_metadata.py b/src/fmu/dataio/_metadata.py index cad97a271..81cec7b1b 100644 --- a/src/fmu/dataio/_metadata.py +++ b/src/fmu/dataio/_metadata.py @@ -107,10 +107,6 @@ def _get_meta_masterdata(masterdata: dict) -> meta.Masterdata: return meta.Masterdata.model_validate(masterdata) -def _get_meta_fmu(fmudata: FmuProvider) -> internal.FMUClassMetaData: - return internal.FMUClassMetaData.model_validate(fmudata.get_metadata()) - - def _get_meta_display(dataio: ExportData, objdata: ObjectDataProvider) -> meta.Display: return meta.Display(name=dataio.display_name or objdata.name) @@ -173,7 +169,7 @@ def generate_export_metadata( version=VERSION, source=SOURCE, class_=objdata.classname, - fmu=_get_meta_fmu(fmudata) if fmudata else None, + fmu=fmudata.get_metadata() if fmudata else None, masterdata=_get_meta_masterdata(masterdata) if masterdata else None, access=_get_meta_access(access) if access else None, data=_get_meta_objectdata(objdata), diff --git a/src/fmu/dataio/_utils.py b/src/fmu/dataio/_utils.py index 6cda44d93..6300a24cc 100644 --- a/src/fmu/dataio/_utils.py +++ b/src/fmu/dataio/_utils.py @@ -211,9 +211,9 @@ def size(fname: str) -> int: return Path(fname).stat().st_size -def uuid_from_string(string: str) -> str: +def uuid_from_string(string: str) -> uuid.UUID: """Produce valid and repeteable UUID4 as a hash of given string""" - return str(uuid.UUID(hashlib.md5(string.encode("utf-8")).hexdigest())) + return uuid.UUID(hashlib.md5(string.encode("utf-8")).hexdigest()) def read_parameters_txt(pfile: Path | str) -> types.Parameters: diff --git a/src/fmu/dataio/aggregation.py b/src/fmu/dataio/aggregation.py index ab2b2c947..1f60e383c 100644 --- a/src/fmu/dataio/aggregation.py +++ b/src/fmu/dataio/aggregation.py @@ -75,7 +75,7 @@ def __post_init__(self) -> None: @staticmethod def _generate_aggr_uuid(uuids: list[str]) -> str: """Unless aggregation_id; use existing UUIDs to generate a new UUID.""" - return _utils.uuid_from_string("".join(sorted(uuids))) + return str(_utils.uuid_from_string("".join(sorted(uuids)))) def _update_settings(self, newsettings: dict) -> None: """Update instance settings (properties) from other routines.""" diff --git a/src/fmu/dataio/dataio.py b/src/fmu/dataio/dataio.py index 48b6555ee..c550ceed7 100644 --- a/src/fmu/dataio/dataio.py +++ b/src/fmu/dataio/dataio.py @@ -589,6 +589,14 @@ def _validate_and_establish_fmucontext(self) -> None: ) self.fmu_context = FmuContext.NON_FMU + if self.fmu_context != FmuContext.CASE and env_fmu_context == FmuContext.CASE: + warn( + "fmu_context is set to 'realization', but unable to detect " + "ERT runpath from environment variable. " + "Did you mean fmu_context='case'?", + UserWarning, + ) + def _update_fmt_flag(self) -> None: # treat special handling of "xtgeo" in format name: if self.points_fformat == "csv|xtgeo" or self.polygons_fformat == "csv|xtgeo": @@ -712,7 +720,7 @@ def _get_fmu_provider(self) -> FmuProvider: return FmuProvider( model=self.config.get("model"), fmu_context=self.fmu_context, - casepath_proposed=self.casepath or "", + casepath_proposed=Path(self.casepath) if self.casepath else None, include_ertjobs=self.include_ertjobs, forced_realization=self.realization, workflow=self.workflow, @@ -769,13 +777,12 @@ def generate_metadata( self._update_fmt_flag() fmudata = self._get_fmu_provider() if self._fmurun else None - # update rootpath based on fmurun or not # TODO: Move to ExportData init when/if users are # disallowed to update class settings on the export. - self._rootpath = Path( - fmudata.get_casepath() if fmudata else str(self._rootpath.absolute()) - ) + if fmudata and (casepath := fmudata.get_casepath()): + self._rootpath = casepath + self._rootpath = self._rootpath.absolute() logger.debug("Rootpath is now %s", self._rootpath) # TODO: refactor the argument list for generate_export_metadata; we do not need diff --git a/src/fmu/dataio/datastructure/_internal/internal.py b/src/fmu/dataio/datastructure/_internal/internal.py index 06216b249..1576f1d3e 100644 --- a/src/fmu/dataio/datastructure/_internal/internal.py +++ b/src/fmu/dataio/datastructure/_internal/internal.py @@ -17,7 +17,13 @@ Model as GlobalConfigurationModel, ) from fmu.dataio.datastructure.meta import meta -from pydantic import AnyHttpUrl, BaseModel, Field, TypeAdapter, model_validator +from pydantic import ( + AnyHttpUrl, + BaseModel, + Field, + TypeAdapter, + model_validator, +) def seismic_warn() -> None: @@ -163,7 +169,7 @@ class PreprocessedInfo(BaseModel): subfolder: str -class Context(BaseModel): +class Context(BaseModel, use_enum_values=True): stage: FmuContext @@ -211,5 +217,5 @@ class CaseSchema(JsonSchemaMetadata): masterdata: meta.Masterdata access: meta.Access fmu: FMUModel - description: Optional[List[str]] + description: Optional[List[str]] = Field(default=None) tracklog: List[meta.TracklogEvent] diff --git a/src/fmu/dataio/providers/_fmu.py b/src/fmu/dataio/providers/_fmu.py index fd10fae9b..a0385ffb3 100644 --- a/src/fmu/dataio/providers/_fmu.py +++ b/src/fmu/dataio/providers/_fmu.py @@ -30,18 +30,21 @@ import json import os -from copy import deepcopy from dataclasses import dataclass, field from enum import Enum, auto -from os import environ from pathlib import Path -from typing import Final, Optional +from typing import TYPE_CHECKING, Final, Optional from warnings import warn from fmu.config import utilities as ut from fmu.dataio import _utils from fmu.dataio._definitions import FmuContext from fmu.dataio._logging import null_logger +from fmu.dataio.datastructure._internal import internal +from fmu.dataio.datastructure.meta import meta + +if TYPE_CHECKING: + from uuid import UUID # case metadata relative to casepath ERT_RELATIVE_CASE_METADATA_FILE: Final = "share/metadata/fmu_case.yml" @@ -59,6 +62,15 @@ def get_fmu_context_from_environment() -> FmuContext: return FmuContext.NON_FMU +def _casepath_has_metadata(casepath: Path) -> bool: + """Check if a proposed casepath has a metadata file""" + if (casepath / ERT_RELATIVE_CASE_METADATA_FILE).exists(): + logger.debug("Found metadata for proposed casepath <%s>", casepath) + return True + logger.debug("Did not find metadata for proposed casepath <%s>", casepath) + return False + + class FmuEnv(Enum): EXPERIMENT_ID = auto() ENSEMBLE_ID = auto() @@ -84,61 +96,60 @@ class FmuProvider: Args: model: Name of the model (usually from global config) - rootpath: .... fmu_context: The FMU context this is ran in; see FmuContext enum class - casepath_proposed: Proposed casepath ... needed? + casepath_proposed: Proposed casepath. Needed if FmuContext is CASE include_ertjobs: True if we want to include .... forced_realization: If we want to force the realization (use case?) workflow: Descriptive work flow info """ - model: dict | None = field(default_factory=dict) + model: dict | None = None fmu_context: FmuContext = FmuContext.REALIZATION include_ertjobs: bool = True - casepath_proposed: str | Path = "" + casepath_proposed: Optional[Path] = None forced_realization: Optional[int] = None workflow: Optional[dict[str, str]] = None # private properties for this class - _runpath: Path | str = field(default="", init=False) - _casepath: Path | str = field(default="", init=False) # actual casepath - _provider: str = field(default="", init=False) + _runpath: Optional[Path] = field(default_factory=Path, init=False) + _casepath: Optional[Path] = field(default_factory=Path, init=False) _iter_name: str = field(default="", init=False) _iter_id: int = field(default=0, init=False) - _iter_path: Path | str = field(default="", init=False) _real_name: str = field(default="", init=False) _real_id: int = field(default=0, init=False) - _real_path: Path | str = field(default="", init=False) _case_name: str = field(default="", init=False) - _user_name: str = field(default="", init=False) - _ert_info: dict = field(default_factory=dict, init=False) - _case_metadata: dict = field(default_factory=dict, init=False) - _metadata: dict = field(default_factory=dict, init=False) def __post_init__(self) -> None: logger.info("Initialize %s...", self.__class__) logger.debug("Case path is initially <%s>...", self.casepath_proposed) - if not FmuEnv.ENSEMBLE_ID.value: - logger.debug( - "No ERT environment variables detected, provider will be empty" - ) - return # not an FMU run - - self._provider = "ERT" - - self._detect_absolute_runpath() - self._detect_and_update_casepath() - self._parse_folder_info() - self._read_case_metadata() - - # the next ones will not be read if case metadata is empty, or stage is FMU CASE - self._read_optional_restart_data() - self._read_ert_information() - self._generate_ert_metadata() + self._runpath = self._get_runpath_from_env() + self._real_id = ( + int(iter_num) if (iter_num := FmuEnv.REALIZATION_NUMBER.value) else 0 + ) + self._iter_id = ( + int(real_num) if (real_num := FmuEnv.ITERATION_NUMBER.value) else 0 + ) + + self._casepath = self._validate_and_establish_casepath() + if self._casepath: + self._case_name = self._casepath.name + + if self._runpath and self.fmu_context != FmuContext.CASE: + missing_iter_folder = self._casepath == self._runpath.parent + if not missing_iter_folder: + logger.debug("Iteration folder found") + self._iter_name = self._runpath.name + self._real_name = self._runpath.parent.name + else: + logger.debug("No iteration folder found") + raise NotImplementedError( + "No iteration folder found, this is not supported yet" + ) + logger.debug("Found iter name from runpath: %s", self._iter_name) + logger.debug("Found real name from runpath: %s", self._real_name) def get_iter_name(self) -> str: - """The client (metadata) will ask for iter_name""" """Return the iter_name, e.g. 'iter-3' or 'pred'.""" return self._iter_name @@ -146,256 +157,162 @@ def get_real_name(self) -> str: """Return the real_name, e.g. 'realization-23'.""" return self._real_name - def get_casepath(self) -> str: + def get_casepath(self) -> Path | None: """Return updated casepath in a FMU run, will be updated if initially blank.""" - return "" if not self._casepath else str(self._casepath) + return self._casepath - def get_provider(self) -> str | None: - """Return the name of the FMU provider (so far 'ERT' only), or None.""" - return None if not self._provider else self._provider - - def get_metadata(self) -> dict: - """The client (metadata) will ask for complete metadata for FMU section""" - return {} if not self._metadata else self._metadata - - # private methods: - @staticmethod - def _get_folderlist_from_path(current: Path | str) -> list: - """Return a list of pure folder names incl. current casepath up to system root. + def get_metadata(self) -> internal.FMUClassMetaData | None: + """Construct the metadata FMU block for an ERT forward job.""" + logger.debug("Generate ERT metadata...") - For example: current is /scratch/xfield/nn/case/realization-33/iter-1 - shall return ['scratch', 'xfield', 'nn', 'case', 'realization-33', 'iter-1'] - """ - return [folder for folder in str(current).split("/") if folder] + if self._casepath is None or self.model is None: + logger.info("Can't return metadata, missing casepath or model description") + return None - @staticmethod - def _get_folderlist_from_runpath_env() -> list: - """Return a list of pure folder names incl. current from RUNPATH environment, + case_meta = self._get_fmucase_meta() - Derived from _ERT_RUNPATH. + if self.fmu_context != FmuContext.REALIZATION: + return internal.FMUClassMetaData( + case=case_meta, + context=self._get_fmucontext_meta(), + model=self._get_fmumodel_meta(), + workflow=self._get_workflow_meta() if self.workflow else None, + ) - For example: runpath is /scratch/xfield/nn/case/realization-33/iter-1/ - shall return ['scratch', 'xfield', 'nn', 'case', 'realization-33', 'iter-1'] - """ - runpath = FmuEnv.RUNPATH.value - if runpath: - return [folder for folder in runpath.split("/") if folder] - return [] + iter_uuid, real_uuid = self._get_iteration_and_real_uuid(case_meta.uuid) + return internal.FMUClassMetaData( + case=case_meta, + context=self._get_fmucontext_meta(), + model=self._get_fmumodel_meta(), + workflow=self._get_workflow_meta() if self.workflow else None, + iteration=self._get_iteration_meta(iter_uuid), + realization=self._get_realization_meta(real_uuid), + ) - def _detect_absolute_runpath(self) -> None: - """In case _ERT_RUNPATH is relative, an absolute runpath is detected.""" - if FmuEnv.RUNPATH.value: - self._runpath = Path(FmuEnv.RUNPATH.value).resolve() + @staticmethod + def _get_runpath_from_env() -> Path | None: + """get runpath as an absolute path if detected from the enviroment""" + return Path(runpath).resolve() if (runpath := FmuEnv.RUNPATH.value) else None - def _detect_and_update_casepath(self) -> None: + def _validate_and_establish_casepath(self) -> Path | None: """If casepath is not given, then try update _casepath (if in realization). There is also a validation here that casepath contains case metadata, and if not - then a second guess is attempted, looking at `parent` insted of `parent.parent` - is case of unconventional structure. + then a second guess is attempted, looking at `parent` insted of `parent.parent` + is case of missing iteration folder. """ - logger.debug("Try detect casepath, RUNPATH is %s", self._runpath) - logger.debug("Proposed casepath is now <%s>", self.casepath_proposed) - - self._casepath = Path(self.casepath_proposed) if self.casepath_proposed else "" - - if not self._casepath: - try_casepath = Path(self._runpath).parent.parent - logger.debug("Try casepath (first attempt): %s", try_casepath) - - if not (try_casepath / ERT_RELATIVE_CASE_METADATA_FILE).exists(): - logger.debug("Cannot find metadata file, try just one parent...") - try_casepath = Path(self._runpath).parent - logger.debug("Try casepath (second attempt): %s", try_casepath) - self._casepath = try_casepath - - if not (Path(self._casepath) / ERT_RELATIVE_CASE_METADATA_FILE).exists(): - logger.debug("No case metadata, issue a warning!") + if self.casepath_proposed: + if _casepath_has_metadata(self.casepath_proposed): + return self.casepath_proposed warn( - "Case metadata does not exist; will not update initial casepath", - UserWarning, - ) - self._casepath = "" - - def _parse_folder_info(self) -> None: - """Retreive the folders (id's and paths).""" - logger.debug("Parse folder info...") - - folders = self._get_folderlist_from_runpath_env() - if self.fmu_context == FmuContext.CASE and self._casepath: - folders = self._get_folderlist_from_path(self._casepath) # override - logger.debug("Folders to evaluate (case): %s", folders) - - self._iter_path = "" - self._real_path = "" - self._case_name = folders[-1] - self._user_name = folders[-2] - - logger.debug( - "case_name, user_name: %s %s", self._case_name, self._user_name + "Could not detect metadata for the proposed casepath " + f"{self.casepath_proposed}. Will try to detect from runpath." ) - logger.debug("Detecting FMU provider as ERT (case only)") - else: - logger.debug("Folders to evaluate (realization): %s", folders) + if self._runpath: + if _casepath_has_metadata(self._runpath.parent.parent): + return self._runpath.parent.parent - self._case_name = folders[-3] - self._user_name = folders[-4] + if _casepath_has_metadata(self._runpath.parent): + return self._runpath.parent - self._iter_name = folders[-1] - self._real_name = folders[-2] - - self._iter_path = Path("/" + "/".join(folders)) - self._real_path = Path("/" + "/".join(folders[:-1])) - - self._iter_id = int(str(FmuEnv.ITERATION_NUMBER.value)) - self._real_id = int(str(FmuEnv.REALIZATION_NUMBER.value)) - - def _read_case_metadata(self) -> None: - """Check if metadatafile file for CASE exists, and if so parse metadata. - - If file does not exist, still give a proposed file path, but the - self.casepath_proposed_metadata will be {} (empty) and the physical file - will not be made. - """ - logger.debug("Read case metadata, if any...") - if not self._casepath: - logger.info("No case path detected, hence FMU metadata will be empty.") - return - - case_metafile = Path(self._casepath) / ERT_RELATIVE_CASE_METADATA_FILE - if case_metafile.exists(): - logger.debug("Case metadata file exists in file %s", str(case_metafile)) - self._case_metadata = ut.yaml_load(case_metafile, loader="standard") - logger.debug("Case metadata are: %s", self._case_metadata) - else: - logger.debug("Case metadata file does not exists as %s", str(case_metafile)) - warn( - "Cannot read case metadata, hence stop retrieving FMU data!", - UserWarning, + if self.fmu_context == FmuContext.CASE: + raise ValueError( + "Could not auto detect the casepath, please provide it as input." ) - self._case_metadata = {} - - def _read_optional_restart_data(self) -> None: - # Load restart_from information - logger.debug("Read optional restart data, if any, and requested...") - if not self._case_metadata: - return - - if not environ.get(RESTART_PATH_ENVNAME): - return + logger.debug("No case metadata, issue a warning!") + warn("Case metadata does not exist, metadata will be empty!", UserWarning) + return None + def _get_restart_data_uuid(self) -> UUID | None: + """Load restart_from information""" + assert self._runpath is not None logger.debug("Detected a restart run from environment variable") - restart_path = Path(self._iter_path) / environ[RESTART_PATH_ENVNAME] - restart_iter = self._get_folderlist_from_path(restart_path)[-1] + restart_path = self._runpath / os.environ[RESTART_PATH_ENVNAME] restart_case_metafile = ( - restart_path / "../.." / ERT_RELATIVE_CASE_METADATA_FILE + restart_path.parent.parent / ERT_RELATIVE_CASE_METADATA_FILE ).resolve() - if restart_case_metafile.exists(): - restart_metadata = ut.yaml_load(restart_case_metafile, loader="standard") - self._ert_info["restart_from"] = _utils.uuid_from_string( - restart_metadata["fmu"]["case"]["uuid"] + restart_iter - ) - else: - print( - f"{RESTART_PATH_ENVNAME} environment variable is set to " - f"{environ[RESTART_PATH_ENVNAME]} which is invalid. Metadata " - "restart_from will remain empty." - ) - logger.warning( - f"{RESTART_PATH_ENVNAME} environment variable is set to " - f"{environ[RESTART_PATH_ENVNAME]} which is invalid. Metadata " - "restart_from will remain empty." - ) - def _read_ert_information(self) -> None: - """Retrieve information from an ERT (ver 5 and later) run.""" - logger.debug("Read ERT information, if any") - - if not self._case_metadata: - return - - logger.debug("Read ERT information") - if not self._iter_path: - logger.debug("Not _iter_path!") - return - - # store parameters.txt - logger.debug("Read ERT information, if any (continues)") - parameters_file = Path(self._iter_path) / "parameters.txt" - if parameters_file.is_file(): - params = _utils.read_parameters_txt(parameters_file) - # BUG(?): value can contain Nones, loop in fn. below - # does contains check, will fail. - nested_params = _utils.nested_parameters_dict(params) # type: ignore - self._ert_info["params"] = nested_params - logger.debug("parameters.txt parsed.") - else: - self._ert_info["params"] = {} - warn("The parameters.txt file was not found", UserWarning) - - # store jobs.json if required! - if self.include_ertjobs: - jobs_file = Path(self._iter_path) / "jobs.json" - if jobs_file.is_file(): - with open(jobs_file) as stream: - self._ert_info["jobs"] = json.load(stream) - logger.debug("jobs.json parsed.") - else: - logger.debug("jobs.json was not found") - else: - self._ert_info["jobs"] = None - logger.debug("Storing jobs.json is disabled") - - logger.debug("ERT files has been parsed.") - - def _generate_ert_metadata(self) -> None: - """Construct the metadata FMU block for an ERT forward job.""" - if not self._case_metadata: - return - - logger.debug("Generate ERT metadata...") - if not self._case_metadata: - logger.debug("Trigger UserWarning!") + if not restart_case_metafile.exists(): warn( - f"The fmu provider: {self._provider} is found but no case metadata!", + f"{RESTART_PATH_ENVNAME} environment variable is set to " + f"{os.environ[RESTART_PATH_ENVNAME]} which is invalid. Metadata " + "restart_from will remain empty.", UserWarning, ) - - meta = self._metadata # shortform - - meta["model"] = self.model - meta["context"] = {"stage": self.fmu_context.name.lower()} - meta["workflow"] = self.workflow - case_uuid = "not_present" # TODO! not allow missing case metadata? - if self._case_metadata and "fmu" in self._case_metadata: - meta["case"] = deepcopy(self._case_metadata["fmu"]["case"]) - case_uuid = meta["case"]["uuid"] - - if self.fmu_context == FmuContext.REALIZATION: - iter_uuid = _utils.uuid_from_string(case_uuid + str(self._iter_name)) - meta["iteration"] = { - "id": self._iter_id, - "uuid": iter_uuid, - "name": self._iter_name, - **( - {"restart_from": self._ert_info["restart_from"]} - if "restart_from" in self._ert_info - else {} - ), - } - real_uuid = _utils.uuid_from_string( - case_uuid + str(iter_uuid) + str(self._real_id) - ) - - logger.debug( - "Generate ERT metadata continues, and real ID %s", self._real_id - ) - - mreal = meta["realization"] = {} - mreal["id"] = self._real_id - mreal["uuid"] = real_uuid - mreal["name"] = self._real_name - mreal["parameters"] = self._ert_info["params"] - - if self.include_ertjobs: - mreal["jobs"] = self._ert_info["jobs"] + return None + + restart_metadata = internal.CaseSchema.model_validate( + ut.yaml_load(restart_case_metafile, loader="standard") + ) + return _utils.uuid_from_string( + f"{restart_metadata.fmu.case.uuid}{restart_path.name}" + ) + + def _get_ert_parameters(self) -> meta.Parameters | None: + logger.debug("Read ERT parameters") + assert self._runpath is not None + parameters_file = self._runpath / "parameters.txt" + if not parameters_file.exists(): + warn("The parameters.txt file was not found", UserWarning) + return None + + params = _utils.read_parameters_txt(parameters_file) + logger.debug("parameters.txt parsed.") + # BUG(?): value can contain Nones, loop in fn. below + # does contains check, will fail. + return meta.Parameters(root=_utils.nested_parameters_dict(params)) # type: ignore + + def _get_ert_jobs(self) -> dict | None: + logger.debug("Read ERT jobs") + assert self._runpath is not None + jobs_file = self._runpath / "jobs.json" + if not jobs_file.exists(): + logger.debug("jobs.json was not found") + return None + + with open(jobs_file) as stream: + logger.debug("parsing jobs.json.") + return json.load(stream) + + def _get_iteration_and_real_uuid(self, case_uuid: UUID) -> tuple[UUID, UUID]: + iter_uuid = _utils.uuid_from_string(f"{case_uuid}{self._iter_name}") + real_uuid = _utils.uuid_from_string(f"{case_uuid}{iter_uuid}{self._real_id}") + return iter_uuid, real_uuid + + def _get_fmucase_meta(self) -> meta.FMUCase: + """Parse and validate the CASE metadata.""" + logger.debug("Loading case metadata file and return pydantic case model") + assert self._casepath is not None + case_metafile = self._casepath / ERT_RELATIVE_CASE_METADATA_FILE + case_meta = internal.CaseSchema.model_validate( + ut.yaml_load(case_metafile, loader="standard") + ) + return case_meta.fmu.case + + def _get_realization_meta(self, real_uuid: UUID) -> meta.Realization: + return meta.Realization( + id=self._real_id, + name=self._real_name, + parameters=self._get_ert_parameters(), + jobs=self._get_ert_jobs() if self.include_ertjobs else None, + uuid=real_uuid, + ) + + def _get_iteration_meta(self, iter_uuid: UUID) -> meta.Iteration: + return meta.Iteration( + id=self._iter_id, + name=self._iter_name, + uuid=iter_uuid, + restart_from=self._get_restart_data_uuid() + if os.getenv(RESTART_PATH_ENVNAME) + else None, + ) + + def _get_fmucontext_meta(self) -> internal.Context: + return internal.Context(stage=self.fmu_context) + + def _get_fmumodel_meta(self) -> meta.FMUModel: + return meta.FMUModel.model_validate(self.model) + + def _get_workflow_meta(self) -> meta.Workflow: + return meta.Workflow.model_validate(self.workflow) diff --git a/tests/conftest.py b/tests/conftest.py index 2b7f4b2be..09da99706 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -113,6 +113,22 @@ def fixture_fmurun_w_casemetadata(tmp_path_factory, monkeypatch, rootpath): return rootpath +@pytest.fixture(name="fmurun_non_equal_real_and_iter", scope="function") +def fixture_fmurun_non_equal_real_and_iter(tmp_path_factory, monkeypatch, rootpath): + """Create a tmp folder structure for testing; with non eqaul real and iter num!""" + tmppath = tmp_path_factory.mktemp("data3") + newpath = tmppath / ERTRUN + shutil.copytree(rootpath / ERTRUN, newpath) + rootpath = newpath / "realization-1/iter-0" + + monkeypatch.setenv(f"_ERT_{FmuEnv.ITERATION_NUMBER.name}", "0") + monkeypatch.setenv(f"_ERT_{FmuEnv.REALIZATION_NUMBER.name}", "1") + monkeypatch.setenv(f"_ERT_{FmuEnv.RUNPATH.name}", str(rootpath)) + + logger.debug("Ran %s", _current_function_name()) + return rootpath + + @pytest.fixture(name="fmurun_w_casemetadata_pred", scope="function") def fixture_fmurun_w_casemetadata_pred(tmp_path_factory, monkeypatch, rootpath): """Create a tmp folder structure for testing; here existing fmurun w/ case meta!""" diff --git a/tests/data/drogon/ertrun1/share/metadata/fmu_case.yml b/tests/data/drogon/ertrun1/share/metadata/fmu_case.yml index ed5859d23..04d70cf94 100644 --- a/tests/data/drogon/ertrun1/share/metadata/fmu_case.yml +++ b/tests/data/drogon/ertrun1/share/metadata/fmu_case.yml @@ -6,7 +6,7 @@ tracklog: - datetime: "2021-05-05T09:20:40.475110" event: created user: - user_id: peesv + id: peesv fmu: case: name: somecasename diff --git a/tests/test_units/test_fmuprovider_class.py b/tests/test_units/test_fmuprovider_class.py index 23f23a725..be03cae9b 100644 --- a/tests/test_units/test_fmuprovider_class.py +++ b/tests/test_units/test_fmuprovider_class.py @@ -2,7 +2,6 @@ import logging import os -from pathlib import Path import fmu.dataio as dataio import pydantic @@ -14,30 +13,10 @@ logger = logging.getLogger(__name__) -FOLDERTREE = "/scratch/myfield/case/realization-13/iter-2/" WORKFLOW = {"reference": "some_work_flow"} - GLOBAL_CONFIG_MODEL = {"name": "Model2", "revision": "22.1.0"} -def test_get_folderlist_from_path(): - """Test static method on getting folders from a path""" - ftree = Path(FOLDERTREE) - mylist = FmuProvider._get_folderlist_from_path(ftree) - assert mylist[-1] == "iter-2" - assert mylist[-3] == "case" - assert mylist[0] == "scratch" - - -def test_get_folderlist_from_ert_runpath(monkeypatch): - """Test static method on getting folders from a _ERT_RUNPATH env variable""" - logger.debug("Set ENV for RUNPATH as %s", FmuEnv.RUNPATH.keyname) - monkeypatch.setenv(FmuEnv.RUNPATH.keyname, FOLDERTREE) - mylist = FmuProvider._get_folderlist_from_runpath_env() - assert mylist[-1] == "iter-2" - assert mylist[-3] == "case" - - def test_fmuprovider_no_provider(): """Testing the FmuProvider where no ERT context is found from env variables.""" @@ -49,7 +28,7 @@ def test_fmuprovider_no_provider(): forced_realization=None, workflow=WORKFLOW, ) - assert myfmu.get_provider() is None + assert myfmu.get_metadata() is None def test_fmuprovider_model_info_in_metadata(fmurun_w_casemetadata): @@ -58,18 +37,19 @@ def test_fmuprovider_model_info_in_metadata(fmurun_w_casemetadata): myfmu = FmuProvider( model=GLOBAL_CONFIG_MODEL, fmu_context=FmuContext.REALIZATION, - workflow="some work flow", + workflow=WORKFLOW, ) - - assert "model" in myfmu._metadata - assert myfmu._metadata["model"] == GLOBAL_CONFIG_MODEL + meta = myfmu.get_metadata() + assert "model" in meta.model_fields_set + assert meta.model.model_dump(mode="json", exclude_none=True) == GLOBAL_CONFIG_MODEL myfmu = FmuProvider( model=None, fmu_context=FmuContext.REALIZATION, - workflow="some work flow", + workflow=WORKFLOW, ) - assert not myfmu._metadata["model"] + meta = myfmu.get_metadata() + assert meta is None def test_fmuprovider_ert_provider_guess_casemeta_path(fmurun): @@ -88,28 +68,26 @@ def test_fmuprovider_ert_provider_guess_casemeta_path(fmurun): workflow=WORKFLOW, ) - assert myfmu.get_provider() == "ERT" - assert not myfmu.get_metadata() - assert myfmu.get_casepath() == "" + assert myfmu.get_metadata() is None + assert myfmu.get_casepath() is None -def test_fmuprovider_ert_provider_missing_parameter_txt( - fmurun_w_casemetadata, globalconfig1 -): +def test_fmuprovider_ert_provider_missing_parameter_txt(fmurun_w_casemetadata): """Test for an ERT case, when missing file parameter.txt (e.g. pred. run)""" os.chdir(fmurun_w_casemetadata) # delete the file for this test (fmurun_w_casemetadata / "parameters.txt").unlink() - + myfmu = FmuProvider( + model=GLOBAL_CONFIG_MODEL, + fmu_context=FmuContext.REALIZATION, + include_ertjobs=True, + workflow=WORKFLOW, + ) with pytest.warns(UserWarning, match="parameters.txt file was not found"): - myfmu = FmuProvider( - model=GLOBAL_CONFIG_MODEL, - fmu_context=FmuContext.REALIZATION, - include_ertjobs=True, - workflow=WORKFLOW, - ) + myfmu.get_metadata() + assert myfmu._case_name == "ertrun1" assert myfmu._real_name == "realization-0" assert myfmu._real_id == 0 @@ -129,11 +107,28 @@ def test_fmuprovider_arbitrary_iter_name(fmurun_w_casemetadata_pred): assert myfmu._real_name == "realization-0" assert myfmu._real_id == 0 assert myfmu._iter_name == "pred" - assert not myfmu._iter_id - assert ( - myfmu._case_metadata["fmu"]["case"]["uuid"] - == "a40b05e8-e47f-47b1-8fee-f52a5116bd37" + # iter_id should have the default value + assert myfmu._iter_id == 0 + meta = myfmu.get_metadata() + assert str(meta.case.uuid) == "a40b05e8-e47f-47b1-8fee-f52a5116bd37" + + +def test_fmuprovider_arbitrary_iter_name_(fmurun_non_equal_real_and_iter): + """Test that iter and real number is picked up correctly from env""" + + os.chdir(fmurun_non_equal_real_and_iter) + myfmu = FmuProvider( + model=GLOBAL_CONFIG_MODEL, + fmu_context=FmuContext.REALIZATION, + include_ertjobs=True, + workflow=WORKFLOW, ) + assert myfmu._runpath == fmurun_non_equal_real_and_iter + assert myfmu._case_name == "ertrun1" + assert myfmu._real_name == "realization-1" + assert myfmu._real_id == 1 + assert myfmu._iter_name == "iter-0" + assert myfmu._iter_id == 0 def test_fmuprovider_prehook_case(tmp_path, globalconfig2, fmurun_prehook): @@ -186,15 +181,42 @@ def test_fmuprovider_detect_no_case_metadata(fmurun): """ os.chdir(fmurun) - with pytest.warns(UserWarning): + with pytest.warns(UserWarning, match="Case metadata does not exist"): myfmu = FmuProvider( model=GLOBAL_CONFIG_MODEL, fmu_context=FmuContext.REALIZATION, ) - assert myfmu._case_name == "ertrun1" - assert myfmu._real_name == "realization-0" - assert myfmu._real_id == 0 - assert not myfmu._case_metadata + meta = myfmu.get_metadata() + assert meta is None + + +def test_fmuprovider_case_run(fmurun_prehook): + """ + When fmu_context="case" and no runpath can be detected from environment + an error should be raised if no casepath is provided. + """ + logger.info("Active folder is %s", fmurun_prehook) + + os.chdir(fmurun_prehook) + + # make sure that no runpath environment value is present + assert FmuEnv.RUNPATH.value is None + + with pytest.raises(ValueError, match="Could not auto detect the casepath"): + FmuProvider( + model=GLOBAL_CONFIG_MODEL, + fmu_context=FmuContext.CASE, + ) + + # providing the casepath is the solution, and no error is thrown + myfmu = FmuProvider( + model=GLOBAL_CONFIG_MODEL, + fmu_context=FmuContext.CASE, + casepath_proposed=fmurun_prehook, + ) + meta = myfmu.get_metadata() + assert meta.realization is None + assert myfmu._case_name == fmurun_prehook.name def test_fmuprovider_valid_restart_env(monkeypatch, fmurun_w_casemetadata, fmurun_pred): @@ -206,6 +228,7 @@ def test_fmuprovider_valid_restart_env(monkeypatch, fmurun_w_casemetadata, fmuru fmu_restart_from = FmuProvider( model=GLOBAL_CONFIG_MODEL, fmu_context=FmuContext.REALIZATION ) + meta_restart_from = fmu_restart_from.get_metadata() monkeypatch.setenv(RESTART_PATH_ENVNAME, str(fmurun_w_casemetadata)) @@ -214,10 +237,9 @@ def test_fmuprovider_valid_restart_env(monkeypatch, fmurun_w_casemetadata, fmuru model=GLOBAL_CONFIG_MODEL, fmu_context=FmuContext.REALIZATION ) - assert ( - fmu_restart._metadata["iteration"]["restart_from"] - == fmu_restart_from._metadata["iteration"]["uuid"] - ) + meta_restart = fmu_restart.get_metadata() + assert meta_restart.iteration.restart_from is not None + assert meta_restart.iteration.restart_from == meta_restart_from.iteration.uuid def test_fmuprovider_invalid_restart_env( @@ -237,7 +259,8 @@ def test_fmuprovider_invalid_restart_env( fmu_restart = FmuProvider( model=GLOBAL_CONFIG_MODEL, fmu_context=FmuContext.REALIZATION ) - assert "restart_from" not in fmu_restart._metadata["iteration"] + meta = fmu_restart.get_metadata() + assert meta.iteration.restart_from is None def test_fmuprovider_no_restart_env(monkeypatch, fmurun_w_casemetadata, fmurun_pred): @@ -253,13 +276,13 @@ def test_fmuprovider_no_restart_env(monkeypatch, fmurun_w_casemetadata, fmurun_p monkeypatch.delenv(RESTART_PATH_ENVNAME) os.chdir(fmurun_pred) - fmu_restart = FmuProvider( + restart_meta = FmuProvider( model=GLOBAL_CONFIG_MODEL, fmu_context=FmuContext.REALIZATION - ) - assert "restart_from" not in fmu_restart._metadata["iteration"] + ).get_metadata() + assert restart_meta.iteration.restart_from is None -def test_fmuprovider_workflow_reference(fmurun_w_casemetadata, edataobj1): +def test_fmuprovider_workflow_reference(fmurun_w_casemetadata): """Testing the handling of workflow reference input. Metadata definitions of fmu.workflow is that it is a dictionary with 'reference' @@ -268,9 +291,8 @@ def test_fmuprovider_workflow_reference(fmurun_w_casemetadata, edataobj1): to a string which is inserted into the 'workflow' element in the outgoing metadata. Some users still have legacy workflows that give this as a dictionary, so we will continue to allow it, but with a warning. - This test is asserting that when 'workflow' is given in various shapes and forms, - it shall always produce valid metadata. + it shall always produce valid metadata, or give a validation error if not. """ os.chdir(fmurun_w_casemetadata) @@ -279,17 +301,25 @@ def test_fmuprovider_workflow_reference(fmurun_w_casemetadata, edataobj1): edata = dataio.ExportData(workflow="workflow as string") # check that the conversion to dict works assert edata.workflow == {"reference": "workflow as string"} - myfmu = FmuProvider(workflow=edata.workflow) - assert "workflow" in myfmu._metadata - assert myfmu._metadata["workflow"] == edata.workflow + myfmu_meta = FmuProvider( + model=GLOBAL_CONFIG_MODEL, workflow=edata.workflow + ).get_metadata() + assert myfmu_meta.workflow is not None + assert ( + myfmu_meta.workflow.model_dump(mode="json", exclude_none=True) == edata.workflow + ) # workflow input is a correct dict with pytest.warns(FutureWarning, match="The 'workflow' argument"): edata = dataio.ExportData(workflow={"reference": "workflow as dict"}) assert edata.workflow == {"reference": "workflow as dict"} - myfmu = FmuProvider(workflow=edata.workflow) - assert "workflow" in myfmu._metadata - assert myfmu._metadata["workflow"] == edata.workflow + myfmu_meta = FmuProvider( + model=GLOBAL_CONFIG_MODEL, workflow=edata.workflow + ).get_metadata() + assert myfmu_meta.workflow is not None + assert ( + myfmu_meta.workflow.model_dump(mode="json", exclude_none=True) == edata.workflow + ) # workflow input is non-correct dict with pytest.raises(pydantic.ValidationError):