Skip to content

Commit

Permalink
fixes in PR #76
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Sep 30, 2024
1 parent b30e075 commit 2c7ba91
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 100 deletions.
54 changes: 22 additions & 32 deletions met_connectors/src/frost/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ fn extract_data(
"couldn't find header field on tseries".to_string(),
))?;

let station_id = util::extract_station_id(header)?;

// TODO: Should there be a location for each observation?
let location = util::extract_location(header, time)?;

// TODO: differentiate actual parse errors from missing duration?
let ts_time_resolution_result = util::extract_duration(header);
if ts_time_resolution_result.is_err()
Expand All @@ -41,6 +36,11 @@ fn extract_data(
return Ok(None);
}

let station_id = util::extract_station_id(header)?;

// TODO: Should there be a location for each observation?
let location = util::extract_location(header, time)?;

let obs: Vec<FrostObs> = serde_json::from_value(
ts.get_mut("observations")
.ok_or(Error::FindObs(
Expand All @@ -51,12 +51,8 @@ fn extract_data(

Ok(Some(((station_id, obs), location)))
})
// Is there some smart way to avoid a double collect without making the error handling
// messy?
.collect::<Result<Vec<Option<((String, Vec<FrostObs>), FrostLatLonElev)>>, Error>>()?
.into_iter()
.flatten()
.collect();
.filter_map(Result::transpose)
.collect::<Result<Vec<((String, Vec<FrostObs>), FrostLatLonElev)>, Error>>()?;

Ok(data)
}
Expand Down Expand Up @@ -127,22 +123,17 @@ fn json_to_data_cache(

// insert obses into data, with Nones for gaps in the series
for obs in obses {
while curr_obs_time < obs.time {
data.push(None);
curr_obs_time = curr_obs_time + period;
}
if curr_obs_time == obs.time {
data.push(Some(obs.body.value));
curr_obs_time = curr_obs_time + period;
} else {
while curr_obs_time < obs.time {
data.push(None);
curr_obs_time = curr_obs_time + period;
}
if curr_obs_time == obs.time {
data.push(Some(obs.body.value));
curr_obs_time = curr_obs_time + period;
} else {
return Err(Error::Misalignment(
"obs misaligned with series".to_string(),
));
}
return Err(Error::Misalignment(
"obs misaligned with series".to_string(),
));
}
}

Expand Down Expand Up @@ -191,13 +182,12 @@ pub async fn fetch_data_inner(
let interval_end = Utc.timestamp_opt(time_spec.timerange.end.0, 0).unwrap();

let extra_query_param = match space_spec {
SpaceSpec::One(station_id) => Some(("stationids", station_id.to_string())),
SpaceSpec::Polygon(polygon) => Some(("polygon", parse_polygon(polygon))),
SpaceSpec::All => None,
}
.ok_or(data_switch::Error::Other(Box::new(
Error::InvalidSpaceSpec("space_spec for frost cannot be `All`, as frost will time out"),
)))?;
SpaceSpec::One(station_id) => Ok(("stationids", station_id.to_string())),
SpaceSpec::Polygon(polygon) => Ok(("polygon", parse_polygon(polygon))),
SpaceSpec::All => Err(data_switch::Error::Other(Box::new(
Error::InvalidSpaceSpec("space_spec for frost cannot be `All`, as frost will time out"),
))),
}?;

let resp: serde_json::Value = client
.get("https://frost-beta.met.no/api/v1/obs/met.no/filter/get")
Expand Down Expand Up @@ -487,7 +477,7 @@ mod tests {
]
},
"timeoffset": "PT0H",
"timeresolution": "PT1M"
"timeresolution": "PT1H"
}
},
"available": {
Expand Down Expand Up @@ -631,6 +621,6 @@ mod tests {

// This test is a lot less useful since we made spatial queries only return timeseries with
// the requested timeresolution
assert_eq!(spatial_cache.data.len(), 1);
assert_eq!(spatial_cache.data.len(), 2);
}
}
27 changes: 26 additions & 1 deletion src/data_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,32 @@ pub struct TimeSpec {
/// The range in time of data to fetch
pub timerange: Timerange,
/// The time resolution of data that should be fetched
pub time_resolution: chronoutil::RelativeDuration,
pub time_resolution: RelativeDuration,
}

impl TimeSpec {
/// Construct a new `TimeSpec` with specified start and end timestamps, and
/// a time resolution.
pub fn new(start: Timestamp, end: Timestamp, time_resolution: RelativeDuration) -> Self {
TimeSpec {
timerange: Timerange { start, end },
time_resolution,
}
}

/// Alternative constructor for `TimeSpec` with time resolution specified
/// using an ISO 8601 duration stamp, to avoid a dependency on chronoutil.
pub fn new_time_resolution_string(
start: Timestamp,
end: Timestamp,
time_resolution: &str,
) -> Result<Self, String> {
Ok(TimeSpec {
timerange: Timerange { start, end },
time_resolution: RelativeDuration::parse_from_iso8601(time_resolution)
.map_err(|e| e.to_string())?,
})
}
}

/// Specifier of geographic position, by latitude and longitude
Expand Down
52 changes: 4 additions & 48 deletions src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,30 +103,8 @@ pub async fn run_test(test: &str, cache: &DataCache) -> Result<ValidateResponse,
&vec![true; n],
)?;

if spatial_result
.iter()
.any(|flag| Flag::try_from(*flag).is_err())
{
return Err(Error::UnknownFlag(
// this is messy, but at least it's not on the critical path
// and it lets the critical path code be more efficient
Flag::try_from(
*spatial_result
.iter()
.find(|flag| Flag::try_from(**flag).is_err())
.unwrap(),
)
.err()
.unwrap(),
));
}

for (i, flag) in spatial_result
.into_iter()
.map(|flag| flag.try_into().unwrap())
.enumerate()
{
result_vec[i].1.push(flag);
for (i, flag) in spatial_result.into_iter().map(Flag::try_from).enumerate() {
result_vec[i].1.push(flag.map_err(Error::UnknownFlag)?);
}
}
result_vec
Expand Down Expand Up @@ -165,30 +143,8 @@ pub async fn run_test(test: &str, cache: &DataCache) -> Result<ValidateResponse,
None,
)?;

if spatial_result
.iter()
.any(|flag| Flag::try_from(*flag).is_err())
{
return Err(Error::UnknownFlag(
// this is messy, but at least it's not on the critical path
// and it lets the critical path code be more efficient
Flag::try_from(
*spatial_result
.iter()
.find(|flag| Flag::try_from(**flag).is_err())
.unwrap(),
)
.err()
.unwrap(),
));
}

for (i, flag) in spatial_result
.into_iter()
.map(|flag| flag.try_into().unwrap())
.enumerate()
{
result_vec[i].1.push(flag);
for (i, flag) in spatial_result.into_iter().map(Flag::try_from).enumerate() {
result_vec[i].1.push(flag.map_err(Error::UnknownFlag)?);
}
}
result_vec
Expand Down
30 changes: 14 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,19 @@
//! let mut rx = rove_scheduler.validate_direct(
//! "my_data_source",
//! &vec!["my_backing_source"],
//! &TimeSpec{
//! timerange: Timerange{
//! start: Timestamp(
//! Utc.with_ymd_and_hms(2023, 6, 26, 12, 0, 0)
//! .unwrap()
//! .timestamp(),
//! ),
//! end: Timestamp(
//! Utc.with_ymd_and_hms(2023, 6, 26, 14, 0, 0)
//! .unwrap()
//! .timestamp(),
//! ),
//! },
//! time_resolution: RelativeDuration::minutes(5),
//! },
//! &TimeSpec::new(
//! Timestamp(
//! Utc.with_ymd_and_hms(2023, 6, 26, 12, 0, 0)
//! .unwrap()
//! .timestamp(),
//! ),
//! Timestamp(
//! Utc.with_ymd_and_hms(2023, 6, 26, 14, 0, 0)
//! .unwrap()
//! .timestamp(),
//! ),
//! RelativeDuration::minutes(5),
//! ),
//! &SpaceSpec::One(String::from("station_id")),
//! &["dip_check", "step_check"],
//! None,
Expand Down Expand Up @@ -182,7 +180,7 @@ pub mod dev_utils {
RelativeDuration::minutes(5),
num_leading_points,
num_trailing_points,
vec![("test".to_string(), vec![Some(1.); self.data_len_spatial]); 1],
vec![("test".to_string(), vec![Some(1.); self.data_len_series]); 1],
))),
_ => panic!("unknown data_id"),
},
Expand Down
4 changes: 1 addition & 3 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<'a> Scheduler<'a> {
rx
}

/// Run a set of spatial QC tests on some data
/// Run a set of QC tests on some data
///
/// `data_source` is the key identifying a connector in the
/// [`DataSwitch`](data_switch::DataSwitch).
Expand Down Expand Up @@ -183,8 +183,6 @@ impl<'a> Scheduler<'a> {
data_source: impl AsRef<str>,
// TODO: we should actually use these
_backing_sources: &[impl AsRef<str>],
// TODO: should we allow a way to call this without a dependency on chronoutil?
// adding a constructor for timespec that can take a string would achieve this
time_spec: &TimeSpec,
space_spec: &SpaceSpec,
tests: &[impl AsRef<str>],
Expand Down

0 comments on commit 2c7ba91

Please sign in to comment.