Skip to content

Commit

Permalink
CLN: Use pydantic for internal MetaData
Browse files Browse the repository at this point in the history
  • Loading branch information
tnatt committed Mar 22, 2024
1 parent 1442842 commit 763b6d8
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 393 deletions.
344 changes: 101 additions & 243 deletions src/fmu/dataio/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
read_metadata_from_file,
)
from .datastructure._internal import internal
from .datastructure.configuration import global_configuration
from .datastructure.meta import meta
from .providers._filedata import FileDataProvider
from .providers._fmu import FmuProvider
Expand All @@ -42,21 +41,7 @@
logger: Final = null_logger(__name__)


# Generic, being resused several places:


def default_meta_dollars() -> dict[str, str]:
return internal.JsonSchemaMetadata(
schema_=TypeAdapter(AnyHttpUrl).validate_strings(SCHEMA), # type: ignore[call-arg]
version=VERSION,
source=SOURCE,
).model_dump(
mode="json",
by_alias=True,
)


def generate_meta_tracklog() -> list[meta.TracklogEvent]:
def generate_meta_tracklog() -> meta.TracklogEvent:
"""Initialize the tracklog with the 'created' event only."""
return [
meta.TracklogEvent.model_construct(
Expand All @@ -80,6 +65,56 @@ def generate_meta_tracklog() -> list[meta.TracklogEvent]:
]


def _get_objectdata_provider(
object: types.Inferrable,
dataio: ExportData,
meta_existing: dict | None = None,
) -> ObjectDataProvider:
"""Analyze the actual object together with input settings.
This will provide input to the ``data`` block of the metas but has also
valuable settings which are needed when providing filedata etc.
Hence this must be ran early or first.
"""
objdata = objectdata_provider_factory(object, dataio, meta_existing)
objdata.derive_metadata()
return objdata


def _get_filedata_provider(
dataio: ExportData, objdata: ObjectDataProvider, fmudata: FmuProvider | None
) -> FileDataProvider:
filedata = FileDataProvider(
dataio=dataio,
objdata=objdata,
rootpath=dataio._rootpath, # has been updated to case_path if fmurun
itername=fmudata.get_iter_name() if fmudata else "",
realname=fmudata.get_real_name() if fmudata else "",
)
filedata.derive_filedata()
return filedata


def _compute_md5(
dataio: ExportData, objdata: ObjectDataProvider, object: types.Inferrable
) -> str:
"""Return the file block in the metadata."""

if not objdata.extension.startswith("."):
raise ValueError("An extension must start with '.'")
with NamedTemporaryFile(
buffering=0,
suffix=objdata.extension,
) as tf:
logger.info("Compute MD5 sum for tmp file...: %s", tf.name)
return export_file_compute_checksum_md5(
obj=object,
filename=Path(tf.name),
flag=dataio._usefmtflag,
)


@dataclass
class MetaData:
"""Class for sampling, process and holding all metadata in an ExportData instance.
Expand Down Expand Up @@ -111,245 +146,68 @@ class MetaData:
# input variables
obj: types.Inferrable
dataio: ExportData
compute_md5: bool = True

# storage state variables
objdata: ObjectDataProvider | None = field(default=None, init=False)
fmudata: FmuProvider | None = field(default=None, init=False)
iter_name: str = field(default="", init=False)
real_name: str = field(default="", init=False)

meta_class: str = field(default="", init=False)
meta_masterdata: dict = field(default_factory=dict, init=False)
meta_objectdata: dict = field(default_factory=dict, init=False)
meta_dollars: dict = field(default_factory=default_meta_dollars, init=False)
meta_access: dict = field(default_factory=dict, init=False)
meta_file: dict = field(default_factory=dict, init=False)
meta_tracklog: list = field(default_factory=list, init=False)
meta_fmu: dict = field(default_factory=dict, init=False)
# temporary storage for preprocessed data:
meta_xpreprocessed: dict = field(default_factory=dict, init=False)

# relevant when ERT* fmu_context; same as rootpath in the ExportData class!:
rootpath: str = field(default="", init=False)

# if re-using existing metadata
meta_existing: dict = field(default_factory=dict, init=False)

def __post_init__(self) -> None:
logger.info("Initialize _MetaData instance.")

# one special case is that obj is a file path, and dataio.reuse_metadata_rule is
# active. In this case we read the existing metadata here and reuse parts
# according to rule described in string self.reuse_metadata_rule!
if isinstance(self.obj, (str, Path)) and self.dataio.reuse_metadata_rule:
logger.info("Partially reuse existing metadata from %s", self.obj)
self.meta_existing = read_metadata_from_file(self.obj)

self.rootpath = str(self.dataio._rootpath.absolute())

def _populate_meta_objectdata(self) -> None:
"""Analyze the actual object together with input settings.
This will provide input to the ``data`` block of the metas but has also
valuable settings which are needed when providing filedata etc.
Hence this must be ran early or first.
"""
self.objdata = objectdata_provider_factory(
self.obj, self.dataio, self.meta_existing
)
self.objdata.derive_metadata()
self.meta_objectdata = self.objdata.metadata

def _populate_meta_fmu(self) -> None:
"""Populate the fmu block in the metadata.
This block may be missing in case the client is not within a FMU run, e.g.
it runs from RMS interactive
The _FmuDataProvider is ran to provide this information
"""
fmudata = FmuProvider(
model=self.dataio.config.get("model", None),
fmu_context=FmuContext.get(self.dataio.fmu_context),
casepath_proposed=self.dataio.casepath or "",
include_ertjobs=self.dataio.include_ertjobs,
forced_realization=self.dataio.realization,
workflow=self.dataio.workflow,
)
logger.info("FMU provider is %s", fmudata.get_provider())

self.meta_fmu = fmudata.get_metadata()
self.rootpath = fmudata.get_casepath()
self.iter_name = fmudata.get_iter_name()
self.real_name = fmudata.get_real_name()

logger.debug("Rootpath is now %s", self.rootpath)

def _populate_meta_file(self) -> None:
"""Populate the file block in the metadata.
The file block also contains all needed info for doing the actual file export.
It requires that the _ObjectDataProvider is ran first -> self.objdata
fmudata: FmuProvider | None = field(default=None)
compute_md5: bool = field(default=True)

- relative_path, seen from rootpath
- absolute_path, as above but full path
- checksum_md5, if required (a bit special treatment of this)
In additional _optional_ symlink adresses
- relative_path_symlink, seen from rootpath
- absolute_path_symlink, as above but full path
"""

assert self.objdata is not None

fdata = FileDataProvider(
self.dataio,
self.objdata,
Path(self.rootpath),
self.iter_name,
self.real_name,
)
fdata.derive_filedata()

if self.compute_md5:
if not self.objdata.extension.startswith("."):
raise ValueError("A extension must start with '.'")
with NamedTemporaryFile(
buffering=0,
suffix=self.objdata.extension,
) as tf:
logger.info("Compute MD5 sum for tmp file...: %s", tf.name)
checksum_md5 = export_file_compute_checksum_md5(
self.obj,
Path(tf.name),
flag=self.dataio._usefmtflag,
)
else:
logger.info("Do not compute MD5 sum at this stage!")
checksum_md5 = None

self.meta_file = meta.File(
absolute_path=fdata.absolute_path,
relative_path=fdata.relative_path,
checksum_md5=checksum_md5,
relative_path_symlink=fdata.relative_path_symlink,
absolute_path_symlink=fdata.absolute_path_symlink,
).model_dump(
mode="json",
exclude_none=True,
by_alias=True,
)

def _populate_meta_class(self) -> None:
"""Get the general class which is a simple string."""
assert self.objdata is not None
self.meta_class = self.objdata.classname

def _populate_meta_tracklog(self) -> None:
"""Create the tracklog metadata, which here assumes 'created' only."""
self.meta_tracklog = [
x.model_dump(mode="json", exclude_none=True, by_alias=True)
for x in generate_meta_tracklog()
]

def _populate_meta_masterdata(self) -> None:
"""Populate metadata from masterdata section in config."""
self.meta_masterdata = self.dataio.config.get("masterdata", {})

def _populate_meta_access(self) -> None:
"""Populate metadata overall from access section in config + allowed keys.
Access should be possible to change per object, based on user input.
This is done through the access_ssdl input argument.
The "asset" field shall come from the config. This is static information.
The "ssdl" field can come from the config, or be explicitly given through
the "access_ssdl" input argument. If the access_ssdl input argument is present,
its contents shall take presedence.
"""
self.meta_access = (
global_configuration.Access.model_validate(
self.dataio.config["access"]
).model_dump(mode="json", exclude_none=True)
if self.dataio._config_is_valid
else {}
)

def _populate_meta_display(self) -> None:
"""Populate the display block."""

# display.name
if self.dataio.display_name is not None:
display_name = self.dataio.display_name
else:
assert self.objdata is not None
display_name = self.objdata.name

self.meta_display = {"name": display_name}

def _populate_meta_xpreprocessed(self) -> None:
"""Populate a few necessary 'tmp' metadata needed for preprocessed data."""
if self.dataio.fmu_context == FmuContext.PREPROCESSED:
self.meta_xpreprocessed["name"] = self.dataio.name
self.meta_xpreprocessed["tagname"] = self.dataio.tagname
self.meta_xpreprocessed["subfolder"] = self.dataio.subfolder

def _reuse_existing_metadata(self, meta: dict) -> dict:
def _reuse_existing_metadata(self, meta: dict, meta_existing: dict) -> dict:
"""Perform a merge procedure if the key `reuse_metadata_rule` is active."""
if self.dataio and self.dataio.reuse_metadata_rule:
oldmeta = self.meta_existing
newmeta = meta.copy()
if self.dataio.reuse_metadata_rule == "preprocessed":
return glue_metadata_preprocessed(oldmeta, newmeta)
raise ValueError(
f"The reuse_metadata_rule {self.dataio.reuse_metadata_rule} is not "
"supported."
)
return meta
if self.dataio.reuse_metadata_rule == "preprocessed":
return glue_metadata_preprocessed(meta_existing, meta.copy())
raise ValueError(
f"The reuse_metadata_rule {self.dataio.reuse_metadata_rule} is not "
"supported."
)

def generate_export_metadata(
self, skip_null: bool = True
) -> dict: # TODO! -> skip_null?
"""Main function to generate the full metadata"""

# populate order matters, in particular objectdata provides input to class/file
self._populate_meta_masterdata()
self._populate_meta_access()

if self.dataio._fmurun:
self._populate_meta_fmu()

self._populate_meta_tracklog()
self._populate_meta_objectdata()
self._populate_meta_class()
self._populate_meta_file()
self._populate_meta_display()
self._populate_meta_xpreprocessed()

# glue together metadata, order is as legacy code (but will be screwed if reuse
# of existing metadata...)
meta = self.meta_dollars.copy()
meta["tracklog"] = self.meta_tracklog
meta["class"] = self.meta_class
meta_existing = {}
if isinstance(self.obj, (str, Path)) and self.dataio.reuse_metadata_rule:
logger.info("Partially reuse existing metadata from %s", self.obj)
meta_existing = read_metadata_from_file(self.obj)

meta["fmu"] = self.meta_fmu
meta["file"] = self.meta_file
objdata = _get_objectdata_provider(self.obj, self.dataio, meta_existing)
filedata = _get_filedata_provider(self.dataio, objdata, self.fmudata)

meta["data"] = self.meta_objectdata
meta["display"] = self.meta_display
checksum_md5 = (
_compute_md5(self.dataio, objdata, self.obj) if self.compute_md5 else None
)

meta["access"] = self.meta_access
meta["masterdata"] = self.meta_masterdata
datameta = internal.DataMetaSchema(
schema_=TypeAdapter(AnyHttpUrl).validate_strings(SCHEMA), # type: ignore[call-arg]
version=VERSION,
source=SOURCE,
class_=objdata.classname,
masterdata=self.dataio.config.get("masterdata"),
fmu=self.fmudata.get_metadata() if self.fmudata else None,
access=self.dataio.config.get("access"),
data=objdata.metadata,
file=meta.File(
absolute_path=filedata.absolute_path,
relative_path=filedata.relative_path,
checksum_md5=checksum_md5,
relative_path_symlink=filedata.relative_path_symlink,
absolute_path_symlink=filedata.absolute_path_symlink,
),
tracklog=generate_meta_tracklog(),
display=meta.Display(name=self.dataio.display_name or objdata.name),
preprocessed=internal.PreprocessedInfo(
name=self.dataio.name,
tagname=self.dataio.tagname,
subfolder=self.dataio.subfolder,
)
if self.dataio.fmu_context == FmuContext.PREPROCESSED
else None,
)

if self.dataio.fmu_context == FmuContext.PREPROCESSED:
meta["_preprocessed"] = self.meta_xpreprocessed
meta_ = datameta.model_dump(mode="json", exclude_none=True, by_alias=True)

if skip_null:
meta = drop_nones(meta)
meta_ = drop_nones(meta_)

if self.dataio.reuse_metadata_rule:
return self._reuse_existing_metadata(meta_, meta_existing)

return self._reuse_existing_metadata(meta)
return meta_
4 changes: 2 additions & 2 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ def glue_metadata_preprocessed(

meta = oldmeta.copy()

if "_preprocessed" in meta:
del meta["_preprocessed"]
if "preprocessed" in meta:
del meta["preprocessed"]

meta["fmu"] = newmeta["fmu"]
meta["file"] = newmeta["file"]
Expand Down
Loading

0 comments on commit 763b6d8

Please sign in to comment.