From fb6c779b2548897f8c8d39a951e098a571752a60 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Fri, 15 Nov 2024 15:16:08 +0100 Subject: [PATCH 1/2] support auth in frost connector --- met_binary/src/main.rs | 23 ++++++++++++++++++++--- met_connectors/src/frost/fetch.rs | 4 ++++ met_connectors/src/frost/mod.rs | 11 ++++++++++- met_connectors/src/lib.rs | 2 +- src/data_switch.rs | 10 +++++----- src/lib.rs | 8 ++++---- src/scheduler.rs | 2 +- tests/integration_test.rs | 4 ++-- 8 files changed, 47 insertions(+), 17 deletions(-) diff --git a/met_binary/src/main.rs b/met_binary/src/main.rs index 201104d..5c71637 100644 --- a/met_binary/src/main.rs +++ b/met_binary/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; -use met_connectors::Frost; use met_connectors::LustreNetatmo; +use met_connectors::{frost, Frost}; use rove::{ data_switch::{DataConnector, DataSwitch}, load_pipelines, start_server, @@ -17,6 +17,10 @@ struct Args { max_trace_level: Level, #[arg(short, long, default_value_t = String::from("sample_pipeline/fresh"))] pipeline_dir: String, + #[arg(short = 'u', long, default_value_t = String::from(""))] + frost_username: String, + #[arg(short = 'u', long, default_value_t = String::from(""))] + frost_password: String, } // TODO: use anyhow for error handling? @@ -28,9 +32,22 @@ async fn main() -> Result<(), Box> { .with_max_level(args.max_trace_level) .init(); + let frost_connector = Frost { + credentials: frost::Credentials { + username: args.frost_username, + password: args.frost_password, + }, + }; + let data_switch = DataSwitch::new(HashMap::from([ - ("frost", &Frost as &dyn DataConnector), - ("lustre_netatmo", &LustreNetatmo as &dyn DataConnector), + ( + "frost", + Box::new(frost_connector) as Box, + ), + ( + "lustre_netatmo", + Box::new(LustreNetatmo) as Box, + ), ])); start_server( diff --git a/met_connectors/src/frost/fetch.rs b/met_connectors/src/frost/fetch.rs index 6acefe6..08dd1d4 100644 --- a/met_connectors/src/frost/fetch.rs +++ b/met_connectors/src/frost/fetch.rs @@ -3,6 +3,8 @@ use chrono::{prelude::*, Duration}; use chronoutil::RelativeDuration; use rove::data_switch::{self, DataCache, Polygon, SpaceSpec, TimeSpec, Timeseries, Timestamp}; +use super::Credentials; + #[allow(clippy::type_complexity)] fn extract_data( mut resp: serde_json::Value, @@ -168,6 +170,7 @@ pub async fn fetch_data_inner( num_leading_points: u8, num_trailing_points: u8, extra_spec: Option<&str>, + credentials: &Credentials, ) -> Result { // TODO: figure out how to share the client between rove reqs let client = reqwest::Client::new(); @@ -194,6 +197,7 @@ pub async fn fetch_data_inner( let resp: serde_json::Value = client .get("https://frost-beta.met.no/api/v1/obs/met.no/filter/get") + .basic_auth(&credentials.username, Some(&credentials.password)) .query(&[ extra_query_param, ("elementids", element_id.to_string()), diff --git a/met_connectors/src/frost/mod.rs b/met_connectors/src/frost/mod.rs index 78406dd..40d19b3 100644 --- a/met_connectors/src/frost/mod.rs +++ b/met_connectors/src/frost/mod.rs @@ -39,8 +39,16 @@ pub enum Error { Misalignment(String), } +#[derive(Debug, Clone)] +pub struct Credentials { + pub username: String, + pub password: String, +} + #[derive(Debug)] -pub struct Frost; +pub struct Frost { + pub credentials: Credentials, +} #[derive(Deserialize, Debug)] struct FrostObsBody { @@ -114,6 +122,7 @@ impl DataConnector for Frost { num_leading_points, num_trailing_points, extra_spec, + &self.credentials, ) .await } diff --git a/met_connectors/src/lib.rs b/met_connectors/src/lib.rs index 32cfad9..05ef50c 100644 --- a/met_connectors/src/lib.rs +++ b/met_connectors/src/lib.rs @@ -1,4 +1,4 @@ -mod frost; +pub mod frost; mod lustre_netatmo; pub use frost::Frost; diff --git a/src/data_switch.rs b/src/data_switch.rs index c78b6e7..c5e1ad8 100644 --- a/src/data_switch.rs +++ b/src/data_switch.rs @@ -216,23 +216,23 @@ pub trait DataConnector: Sync + std::fmt::Debug { /// use std::collections::HashMap; /// /// let data_switch = DataSwitch::new(HashMap::from([ -/// ("test", &TestDataSource{ +/// ("test", Box::new(TestDataSource{ /// data_len_single: 3, /// data_len_series: 1000, /// data_len_spatial: 1000, -/// } as &dyn DataConnector), +/// }) as Box), /// ])); /// ``` -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct DataSwitch<'ds> { - sources: HashMap<&'ds str, &'ds dyn DataConnector>, + sources: HashMap<&'ds str, Box>, } impl<'ds> DataSwitch<'ds> { /// Instantiate a new DataSwitch /// /// See the DataSwitch struct documentation for more info - pub fn new(sources: HashMap<&'ds str, &'ds dyn DataConnector>) -> Self { + pub fn new(sources: HashMap<&'ds str, Box>) -> Self { Self { sources } } diff --git a/src/lib.rs b/src/lib.rs index 3e84ab1..22dbc1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,11 +16,11 @@ //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! let data_switch = DataSwitch::new(HashMap::from([ -//! ("test", &TestDataSource{ +//! ("test", Box::new(TestDataSource{ //! data_len_single: 3, //! data_len_series: 1000, //! data_len_spatial: 1000, -//! } as &dyn DataConnector), +//! }) as Box), //! ])); //! //! start_server( @@ -46,11 +46,11 @@ //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! let data_switch = DataSwitch::new(HashMap::from([ -//! ("test", &TestDataSource{ +//! ("test", Box::new(TestDataSource{ //! data_len_single: 3, //! data_len_series: 1000, //! data_len_spatial: 1000, -//! } as &dyn DataConnector), +//! }) as Box), //! ])); //! //! let rove_scheduler = Scheduler::new(construct_hardcoded_pipeline(), data_switch); diff --git a/src/scheduler.rs b/src/scheduler.rs index bd6ab0e..17f7ab9 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -20,7 +20,7 @@ pub enum Error { /// Receiver type for QC runs /// /// Holds information about test pipelines and data sources -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Scheduler<'a> { // this is pub so that the server can determine the number of checks in a pipeline to size // its channel with. can be made private if the server functionality is deprecated diff --git a/tests/integration_test.rs b/tests/integration_test.rs index c014f9c..1c45ba1 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -53,11 +53,11 @@ pub async fn set_up_rove( async fn integration_test_hardcoded_pipeline() { let data_switch = DataSwitch::new(HashMap::from([( "test", - &TestDataSource { + Box::new(TestDataSource { data_len_single: DATA_LEN_SINGLE, data_len_series: 1, data_len_spatial: DATA_LEN_SPATIAL, - } as &dyn DataConnector, + }) as Box, )])); let (coordinator_future, mut client) = From d814755b85140909ea4d7438feba9c50b038bb19 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Fri, 15 Nov 2024 21:21:19 +0100 Subject: [PATCH 2/2] met_binary: fix broken cmdline flag config --- met_binary/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/met_binary/src/main.rs b/met_binary/src/main.rs index 5c71637..f5d9302 100644 --- a/met_binary/src/main.rs +++ b/met_binary/src/main.rs @@ -15,11 +15,11 @@ struct Args { address: String, #[arg(short = 'l', long, default_value_t = Level::INFO)] max_trace_level: Level, - #[arg(short, long, default_value_t = String::from("sample_pipeline/fresh"))] + #[arg(short, long, default_value_t = String::from("sample_pipelines/fresh"))] pipeline_dir: String, - #[arg(short = 'u', long, default_value_t = String::from(""))] + #[arg(long, default_value_t = String::from(""))] frost_username: String, - #[arg(short = 'u', long, default_value_t = String::from(""))] + #[arg(long, default_value_t = String::from(""))] frost_password: String, }