Skip to content

Commit

Permalink
working impl of rove_connector for the single timeseries case
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Nov 26, 2024
1 parent 50b84b4 commit 6d53dc2
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/*
target/*
.DS_Store
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bb8 = "0.8.6"
bb8-postgres = "0.8.1"
bytes = "1.8.0"
chrono = { version = "0.4.31", features = ["serde"] }
chronoutil = "0.2.7"
csv = "1.3.0"
futures = "0.3.28"
kafka = "0.10.0"
Expand Down
5 changes: 5 additions & 0 deletions rove_connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ edition.workspace = true

[dependencies]
async-trait.workspace = true
bb8.workspace = true
bb8-postgres.workspace = true
chrono.workspace = true
chronoutil.workspace = true
rove.workspace = true
thiserror.workspace = true
tokio-postgres.workspace = true

102 changes: 91 additions & 11 deletions rove_connector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,107 @@
use async_trait::async_trait;
use rove::{
data_switch,
data_switch::{DataCache, DataConnector, SpaceSpec, TimeSpec},
};
use bb8_postgres::PostgresConnectionManager;
use chrono::{TimeZone, Utc};
use chronoutil::RelativeDuration;
use rove::data_switch::{self, DataCache, DataConnector, SpaceSpec, TimeSpec, Timeseries};
use thiserror::Error;
use tokio_postgres::NoTls;

#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {}
pub enum Error {
#[error("the connector does not know how to handle this time resolution: {0:?}")]
UnhandledTimeResolution(RelativeDuration),
}

type PgConnectionPool = bb8::Pool<PostgresConnectionManager<NoTls>>;

#[derive(Debug)]
pub struct Connector;
pub struct Connector {
pool: PgConnectionPool,
}

#[async_trait]
impl DataConnector for Connector {
async fn fetch_data(
&self,
_space_spec: &SpaceSpec,
_time_spec: &TimeSpec,
_num_leading_points: u8,
_num_trailing_points: u8,
space_spec: &SpaceSpec,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
_extra_spec: Option<&str>,
) -> Result<DataCache, data_switch::Error> {
todo!();
// TODO: matching intervals like this is a hack, but currently necessary to avoid
// SQL injection. Ideally we could pass an interval type as a query param, which would
// also save us the query_string allocation, but no ToSql implementations for intervals
// currently exist in tokio_postgres, so we need to implement it ourselves.
let interval = match time_spec.time_resolution {
x if x == RelativeDuration::minutes(1) => "1 minute",
x if x == RelativeDuration::hours(1) => "1 hour",
x if x == RelativeDuration::days(1) => "1 day",
_ => {
return Err(data_switch::Error::Other(Box::new(
Error::UnhandledTimeResolution(time_spec.time_resolution),
)))
}
};

let ts_id = match space_spec {
SpaceSpec::One(ts_id) => ts_id,
// TODO: We should handle at least the All case, Polygon can be left unimplemented for
// now
_ => todo!(),
};

// TODO: should time_spec just use chrono timestamps instead of unix?
// IIRC the reason for unix timestamps was easy compatibility with protobuf, but that's
// less of a priority now
let start_time = Utc.timestamp_opt(time_spec.timerange.start.0, 0).unwrap()
- (time_spec.time_resolution * num_leading_points.into());
// TODO: figure out whether the range in postgres is inclusive on the range here or
// we need to add 1 second
let end_time = Utc.timestamp_opt(time_spec.timerange.start.0, 0).unwrap()
+ (time_spec.time_resolution * num_trailing_points.into());

let query_string = format!("SELECT data.obsvalue, ts_rule.timestamp \
FROM (SELECT data.obsvalue, data.obstime FROM data WHERE data.timeseries = $1) as data
RIGHT JOIN generate_series($2::timestamptz, $3::timestamptz, interval '{}') AS ts_rule(timestamp) \
ON data.obstime = ts_rule.timestamp", interval);

let conn = self
.pool
.get()
.await
.map_err(|e| data_switch::Error::Other(Box::new(e)))?;

let data_results = conn
.query(query_string.as_str(), &[&ts_id, &start_time, &end_time])
.await
.map_err(|e| data_switch::Error::Other(Box::new(e)))?;

let cache = {
let mut values = Vec::with_capacity(data_results.len());

for row in data_results {
values.push(row.get(0));
}

DataCache::new(
vec![Timeseries {
tag: ts_id.to_string(),
values,
}],
// TODO: we need to either query to get the lat, lon, elev, or change olympian to
// accept not having them
vec![],
vec![],
vec![],
time_spec.timerange.start,
time_spec.time_resolution,
num_leading_points,
num_trailing_points,
)
};

Ok(cache)
}
}

0 comments on commit 6d53dc2

Please sign in to comment.