Skip to content

Commit

Permalink
CLN: Improve _derive_timedata type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
JB Lovland committed Feb 13, 2024
1 parent 84157ae commit 5034c83
Showing 1 changed file with 76 additions and 96 deletions.
172 changes: 76 additions & 96 deletions src/fmu/dataio/_objectdata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@
"""
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime as dt
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Final, NamedTuple, Optional
from warnings import warn
Expand Down Expand Up @@ -116,6 +116,30 @@ def npfloat_to_float(v: Any) -> Any:
return float(v) if isinstance(v, (np.float64, np.float32)) else v


@dataclass
class TimedataValueLabel:
value: str
label: str = field(default="")

@staticmethod
def from_list(arr: list) -> TimedataValueLabel:
return TimedataValueLabel(
value=datetime.strptime(str(arr[0]), "%Y%m%d").isoformat(),
label=arr[1] if len(arr) == 2 else "",
)


@dataclass
class TimedataLegacyFormat:
time: list[TimedataValueLabel]


@dataclass
class TimedataFormat:
t0: Optional[TimedataValueLabel]
t1: Optional[TimedataValueLabel]


@dataclass
class _ObjectDataProvider:
"""Class for providing metadata for data objects in fmu-dataio, e.g. a surface.
Expand Down Expand Up @@ -635,101 +659,57 @@ def _check_index(self, index: list[str]) -> None:
for not_found in not_founds:
raise KeyError(f"{not_found} is not in table")

def _derive_timedata(self) -> dict:
"""Format input timedata to metadata."""
def _derive_timedata(
self,
) -> Optional[TimedataFormat | TimedataLegacyFormat]:
"""Format input timedata to metadata
New format:
When using two dates, input convention is
-[[newestdate, "monitor"], [oldestdate,"base"]]
but it is possible to turn around. But in the metadata the output t0
shall always be older than t1 so need to check, and by general rule the file
will be some--time1_time0 where time1 is the newest (unless a class
variable is set for those who wants it turned around).
"""

tdata = self.dataio.timedata
if not tdata:
return {}

if self.dataio.legacy_time_format:
timedata = self._derive_timedata_legacy()
else:
timedata = self._derive_timedata_newformat()
return timedata
use_legacy_format: bool = self.dataio.legacy_time_format

def _derive_timedata_legacy(self) -> dict[str, Any]:
"""Format input timedata to metadata. legacy version."""
# TODO(JB): Covnert tresult to TypedDict or Dataclass.
tdata = self.dataio.timedata
if not tdata:
return None

tresult: dict[str, Any] = {}
tresult["time"] = []
if len(tdata) == 1:
elem = tdata[0]
tresult["time"] = []
xfield = {"value": dt.strptime(str(elem[0]), "%Y%m%d").isoformat()}
self.time0 = str(elem[0])
if len(elem) == 2:
xfield["label"] = elem[1]
tresult["time"].append(xfield)
if len(tdata) == 2:
elem1 = tdata[0]
xfield1 = {"value": dt.strptime(str(elem1[0]), "%Y%m%d").isoformat()}
if len(elem1) == 2:
xfield1["label"] = elem1[1]

elem2 = tdata[1]
xfield2 = {"value": dt.strptime(str(elem2[0]), "%Y%m%d").isoformat()}
if len(elem2) == 2:
xfield2["label"] = elem2[1]

if xfield1["value"] < xfield2["value"]:
tresult["time"].append(xfield1)
tresult["time"].append(xfield2)
else:
tresult["time"].append(xfield2)
tresult["time"].append(xfield1)
start = TimedataValueLabel.from_list(tdata[0])
self.time0 = start.value
return (
TimedataLegacyFormat([start])
if use_legacy_format
else TimedataFormat(start, None)
)

self.time0 = tresult["time"][0]["value"]
self.time1 = tresult["time"][1]["value"]
if len(tdata) == 2:
start, stop = (
TimedataValueLabel.from_list(tdata[0]),
TimedataValueLabel.from_list(tdata[1]),
)

logger.info("Timedata: time0 is %s while time1 is %s", self.time0, self.time1)
return tresult
if datetime.fromisoformat(start.value) > datetime.fromisoformat(stop.value):
start, stop = stop, start

def _derive_timedata_newformat(self) -> dict[str, Any]:
"""Format input timedata to metadata, new format.
self.time0, self.time1 = start.value, stop.value

When using two dates, input convention is [[newestdate, "monitor"], [oldestdate,
"base"]] but it is possible to turn around. But in the metadata the output t0
shall always be older than t1 so need to check, and by general rule the file
will be some--time1_time0 where time1 is the newest (unless a class variable is
set for those who wants it turned around).
"""
tdata = self.dataio.timedata
tresult: dict[str, Any] = {}

if len(tdata) == 1:
elem = tdata[0]
tresult["t0"] = {}
xfield = {"value": dt.strptime(str(elem[0]), "%Y%m%d").isoformat()}
self.time0 = str(elem[0])
if len(elem) == 2:
xfield["label"] = elem[1]
tresult["t0"] = xfield
if len(tdata) == 2:
elem1 = tdata[0]
xfield1 = {"value": dt.strptime(str(elem1[0]), "%Y%m%d").isoformat()}
if len(elem1) == 2:
xfield1["label"] = elem1[1]

elem2 = tdata[1]
xfield2 = {"value": dt.strptime(str(elem2[0]), "%Y%m%d").isoformat()}
if len(elem2) == 2:
xfield2["label"] = elem2[1]

if xfield1["value"] < xfield2["value"]:
tresult["t0"] = xfield1
tresult["t1"] = xfield2
else:
tresult["t0"] = xfield2
tresult["t1"] = xfield1

self.time0 = tresult["t0"]["value"]
self.time1 = tresult["t1"]["value"]
return (
TimedataLegacyFormat([start, stop])
if use_legacy_format
else TimedataFormat(start, stop)
)

logger.info("Timedata: time0 is %s while time1 is %s", self.time0, self.time1)
return tresult
return (
TimedataLegacyFormat([])
if use_legacy_format
else TimedataFormat(None, None)
)

def _derive_from_existing(self) -> None:
"""Derive from existing metadata."""
Expand Down Expand Up @@ -821,15 +801,15 @@ def derive_metadata(self) -> None:
meta["undef_is_zero"] = self.dataio.undef_is_zero

# timedata:
tresult = self._derive_timedata()
if tresult:
if self.dataio.legacy_time_format:
for key, val in tresult.items():
meta[key] = val
else:
meta["time"] = {}
for key, val in tresult.items():
meta["time"][key] = val
dt = self._derive_timedata()
if isinstance(dt, TimedataLegacyFormat):
meta["time"] = [asdict(v) for v in dt.time]
elif isinstance(dt, TimedataFormat):
meta["time"] = {}
if t0 := dt.t0:
meta["time"]["t0"] = asdict(t0)
if t1 := dt.t1:
meta["time"]["t1"] = asdict(t1)

meta["is_prediction"] = self.dataio.is_prediction
meta["is_observation"] = self.dataio.is_observation
Expand Down

0 comments on commit 5034c83

Please sign in to comment.