Skip to content

Commit

Permalink
fix Dag->Pipeline breakage in integration and doc tests
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Oct 2, 2024
1 parent c111dee commit a683c9e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 127 deletions.
14 changes: 7 additions & 7 deletions sample_pipelines/fresh/TA_PT1H.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,38 @@
# sensor: 0 # do we run the QC non default sensors? Maybe not at first?
[[steps]]
name = "special_value_check"
[steps.test.special_value_check]
[steps.check.special_value_check]
special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999]

[[steps]]
name = "range_check"
[steps.test.range_check]
[steps.check.range_check]
min = -55
max = 50

[[steps]]
name = "climate_range_check"
[steps.test.range_check_dynamic]
[steps.check.range_check_dynamic]
source = "netcdf" # TODO: define a neat spec for this?

[[steps]]
name = "step_check"
[steps.test.step_check]
[steps.check.step_check]
max = 18.6

[[steps]]
name = "flatline_check"
[steps.test.flatline_check]
[steps.check.flatline_check]
max = 10

[[steps]]
name = "spike_check"
[steps.test.spike_check]
[steps.check.spike_check]
max = 18.6

[[steps]]
name = "model_consistency_check"
[steps.test.model_consistency_check]
[steps.check.model_consistency_check]
model_source = "lustre"
model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model
threshold = 3.0 # FIXME: made up value by Ingrid
12 changes: 7 additions & 5 deletions src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub enum Error {
#[error("test name {0} not found in runner")]
InvalidTestName(String),
#[error("failed to run test")]
#[error("failed to run test: {0}")]
FailedTest(#[from] olympian::Error),
#[error("unknown olympian flag: {0}")]
UnknownFlag(String),
Expand Down Expand Up @@ -119,7 +119,7 @@ pub fn run_test(step: &PipelineStep, cache: &DataCache) -> Result<ValidateRespon
// if the checks accept single values (which they should) then we don't need this.
// anyway I think if we have dynamic values for these we can match them to the data
// when fetching them.
// let _n = cache.data.len();
let n = cache.data.len();

let series_len = cache.data[0].1.len();

Expand Down Expand Up @@ -147,9 +147,11 @@ pub fn run_test(step: &PipelineStep, cache: &DataCache) -> Result<ValidateRespon
conf.min_elev_diff, // 200.,
conf.min_horizontal_scale, // 10000.,
conf.vertical_scale, // 200.,
&conf.pos, // &vec![4.; n],
&conf.neg, // &vec![8.; n],
&conf.eps2, // &vec![0.5; n],
// TODO: we shouldn't need to extend these vectors, it should be handled
// better in olympian
&vec![conf.pos[0]; n], // &vec![4.; n],
&vec![conf.neg[0]; n], // &vec![8.; n],
&vec![conf.eps2[0]; n], // &vec![0.5; n],
None,
)?;

Expand Down
94 changes: 55 additions & 39 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! use rove::{
//! start_server,
//! data_switch::{DataSwitch, DataConnector},
//! dev_utils::{TestDataSource, construct_hardcoded_dag},
//! dev_utils::{TestDataSource, construct_hardcoded_pipeline},
//! };
//! use std::collections::HashMap;
//!
Expand All @@ -26,7 +26,7 @@
//! start_server(
//! "[::1]:1337".parse()?,
//! data_switch,
//! construct_hardcoded_dag(),
//! construct_hardcoded_pipeline(),
//! )
//! .await
//! }
Expand All @@ -37,7 +37,7 @@
//! use rove::{
//! Scheduler,
//! data_switch::{DataSwitch, DataConnector, Timestamp, Timerange, TimeSpec, SpaceSpec},
//! dev_utils::{TestDataSource, construct_hardcoded_dag},
//! dev_utils::{TestDataSource, construct_hardcoded_pipeline},
//! };
//! use std::collections::HashMap;
//! use chrono::{Utc, TimeZone};
Expand All @@ -53,7 +53,7 @@
//! } as &dyn DataConnector),
//! ]));
//!
//! let rove_scheduler = Scheduler::new(construct_hardcoded_dag(), data_switch);
//! let rove_scheduler = Scheduler::new(construct_hardcoded_pipeline(), data_switch);
//!
//! let mut rx = rove_scheduler.validate_direct(
//! "my_data_source",
Expand All @@ -72,7 +72,7 @@
//! RelativeDuration::minutes(5),
//! ),
//! &SpaceSpec::One(String::from("station_id")),
//! &["dip_check", "step_check"],
//! "TA_PT1H",
//! None,
//! ).await?;
//!
Expand Down Expand Up @@ -101,7 +101,7 @@ mod pipeline;
mod scheduler;
mod server;

pub use pipeline::load_pipelines;
pub use pipeline::{load_pipelines, Pipeline};

pub use scheduler::Scheduler;

Expand Down Expand Up @@ -135,11 +135,11 @@ pub(crate) mod pb {
pub mod dev_utils {
use crate::{
data_switch::{self, DataCache, DataConnector, SpaceSpec, TimeSpec, Timestamp},
pipeline::{CheckConf, Pipeline, PipelineStep},
pipeline::Pipeline,
};
use async_trait::async_trait;
use chronoutil::RelativeDuration;
use std::hint::black_box;
use std::{collections::HashMap, hint::black_box};

#[derive(Debug)]
pub struct TestDataSource {
Expand Down Expand Up @@ -212,38 +212,54 @@ pub mod dev_utils {
}
}

pub fn construct_fake_pipeline() -> Pipeline {
Pipeline {
steps: vec![
PipelineStep {
name: "test1".to_string(),
check: CheckConf::Dummy,
},
PipelineStep {
name: "test2".to_string(),
check: CheckConf::Dummy,
},
PipelineStep {
name: "test3".to_string(),
check: CheckConf::Dummy,
},
PipelineStep {
name: "test4".to_string(),
check: CheckConf::Dummy,
},
PipelineStep {
name: "test5".to_string(),
check: CheckConf::Dummy,
},
PipelineStep {
name: "test6".to_string(),
check: CheckConf::Dummy,
},
],
}
}

// TODO: replace this by just loading a sample pipeline toml?
pub fn construct_hardcoded_pipeline() -> HashMap<String, Pipeline> {
HashMap::from([(
String::from("hardcoded"),
toml::from_str(
r#"
[[steps]]
name = "step_check"
[steps.check.step_check]
max = 3.0
[[steps]]
name = "spike_check"
[steps.check.spike_check]
max = 3.0
[[steps]]
name = "buddy_check"
[steps.check.buddy_check]
max = 3
radii = [5000.0]
nums_min = [2]
threshold = 2.0
max_elev_diff = 200.0
elev_gradient = 0.0
min_std = 1.0
num_iterations = 2
[[steps]]
name = "sct"
[steps.check.sct]
num_min = 5
num_max = 100
inner_radius = 50000.0
outer_radius = 150000.0
num_iterations = 5
num_min_prof = 20
min_elev_diff = 200.0
min_horizontal_scale = 10000.0
vertical_scale = 200.0
pos = [4.0]
neg = [8.0]
eps2 = [0.5]
"#,
)
.unwrap(),
)])
}
// pub fn construct_hardcoded_dag() -> Dag<&'static str> {
// let mut dag: Dag<&'static str> = Dag::new();

Expand Down
5 changes: 5 additions & 0 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ use serde::Deserialize;
use std::{collections::HashMap, path::Path};
use thiserror::Error;

/// Data structure defining a pipeline of checks, with parameters built in
///
/// Rather than constructing these manually, a convenience function `load_pipelines` is provided
/// to deserialize a set of pipelines from a directory containing TOML files defining them.
#[derive(Debug, Deserialize, PartialEq, Clone)]
pub struct Pipeline {
/// Sequence of steps in the pipeline
pub steps: Vec<PipelineStep>,
}

Expand Down
Loading

0 comments on commit a683c9e

Please sign in to comment.