From 6d53dc2a6e5afb81d6d36af35b3df6a4c8212ca6 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Tue, 26 Nov 2024 20:49:57 +0100 Subject: [PATCH] working impl of rove_connector for the single timeseries case --- .gitignore | 3 +- Cargo.lock | 5 ++ Cargo.toml | 1 + rove_connector/Cargo.toml | 5 ++ rove_connector/src/lib.rs | 102 ++++++++++++++++++++++++++++++++++---- 5 files changed, 104 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index c8b241f2..89a3216c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -target/* \ No newline at end of file +target/* +.DS_Store diff --git a/Cargo.lock b/Cargo.lock index 27da1271..22114c53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2374,8 +2374,13 @@ name = "rove_connector" version = "0.1.0" dependencies = [ "async-trait", + "bb8", + "bb8-postgres", + "chrono", + "chronoutil", "rove", "thiserror", + "tokio-postgres", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ed2f9fe7..c0afde02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ bb8 = "0.8.6" bb8-postgres = "0.8.1" bytes = "1.8.0" chrono = { version = "0.4.31", features = ["serde"] } +chronoutil = "0.2.7" csv = "1.3.0" futures = "0.3.28" kafka = "0.10.0" diff --git a/rove_connector/Cargo.toml b/rove_connector/Cargo.toml index 4031d1ce..3ef0bd29 100644 --- a/rove_connector/Cargo.toml +++ b/rove_connector/Cargo.toml @@ -5,6 +5,11 @@ edition.workspace = true [dependencies] async-trait.workspace = true +bb8.workspace = true +bb8-postgres.workspace = true +chrono.workspace = true +chronoutil.workspace = true rove.workspace = true thiserror.workspace = true +tokio-postgres.workspace = true diff --git a/rove_connector/src/lib.rs b/rove_connector/src/lib.rs index 18adfbb7..4ab3e5ac 100644 --- a/rove_connector/src/lib.rs +++ b/rove_connector/src/lib.rs @@ -1,27 +1,107 @@ use async_trait::async_trait; -use rove::{ - data_switch, - data_switch::{DataCache, DataConnector, SpaceSpec, TimeSpec}, -}; +use bb8_postgres::PostgresConnectionManager; +use chrono::{TimeZone, Utc}; +use chronoutil::RelativeDuration; +use rove::data_switch::{self, DataCache, DataConnector, SpaceSpec, TimeSpec, Timeseries}; use thiserror::Error; +use tokio_postgres::NoTls; #[derive(Error, Debug)] #[non_exhaustive] -pub enum Error {} +pub enum Error { + #[error("the connector does not know how to handle this time resolution: {0:?}")] + UnhandledTimeResolution(RelativeDuration), +} + +type PgConnectionPool = bb8::Pool>; #[derive(Debug)] -pub struct Connector; +pub struct Connector { + pool: PgConnectionPool, +} #[async_trait] impl DataConnector for Connector { async fn fetch_data( &self, - _space_spec: &SpaceSpec, - _time_spec: &TimeSpec, - _num_leading_points: u8, - _num_trailing_points: u8, + space_spec: &SpaceSpec, + time_spec: &TimeSpec, + num_leading_points: u8, + num_trailing_points: u8, _extra_spec: Option<&str>, ) -> Result { - todo!(); + // TODO: matching intervals like this is a hack, but currently necessary to avoid + // SQL injection. Ideally we could pass an interval type as a query param, which would + // also save us the query_string allocation, but no ToSql implementations for intervals + // currently exist in tokio_postgres, so we need to implement it ourselves. + let interval = match time_spec.time_resolution { + x if x == RelativeDuration::minutes(1) => "1 minute", + x if x == RelativeDuration::hours(1) => "1 hour", + x if x == RelativeDuration::days(1) => "1 day", + _ => { + return Err(data_switch::Error::Other(Box::new( + Error::UnhandledTimeResolution(time_spec.time_resolution), + ))) + } + }; + + let ts_id = match space_spec { + SpaceSpec::One(ts_id) => ts_id, + // TODO: We should handle at least the All case, Polygon can be left unimplemented for + // now + _ => todo!(), + }; + + // TODO: should time_spec just use chrono timestamps instead of unix? + // IIRC the reason for unix timestamps was easy compatibility with protobuf, but that's + // less of a priority now + let start_time = Utc.timestamp_opt(time_spec.timerange.start.0, 0).unwrap() + - (time_spec.time_resolution * num_leading_points.into()); + // TODO: figure out whether the range in postgres is inclusive on the range here or + // we need to add 1 second + let end_time = Utc.timestamp_opt(time_spec.timerange.start.0, 0).unwrap() + + (time_spec.time_resolution * num_trailing_points.into()); + + let query_string = format!("SELECT data.obsvalue, ts_rule.timestamp \ + FROM (SELECT data.obsvalue, data.obstime FROM data WHERE data.timeseries = $1) as data + RIGHT JOIN generate_series($2::timestamptz, $3::timestamptz, interval '{}') AS ts_rule(timestamp) \ + ON data.obstime = ts_rule.timestamp", interval); + + let conn = self + .pool + .get() + .await + .map_err(|e| data_switch::Error::Other(Box::new(e)))?; + + let data_results = conn + .query(query_string.as_str(), &[&ts_id, &start_time, &end_time]) + .await + .map_err(|e| data_switch::Error::Other(Box::new(e)))?; + + let cache = { + let mut values = Vec::with_capacity(data_results.len()); + + for row in data_results { + values.push(row.get(0)); + } + + DataCache::new( + vec![Timeseries { + tag: ts_id.to_string(), + values, + }], + // TODO: we need to either query to get the lat, lon, elev, or change olympian to + // accept not having them + vec![], + vec![], + vec![], + time_spec.timerange.start, + time_spec.time_resolution, + num_leading_points, + num_trailing_points, + ) + }; + + Ok(cache) } }