From 7bd9e58d36cf677a48358d607bd89e7c4aeb8320 Mon Sep 17 00:00:00 2001 From: Ben Sully Date: Thu, 3 Nov 2022 22:03:19 +0000 Subject: [PATCH] Experimental: add compatibility with GATs in `backend::DataService` This means we don't need to clone so many things or connect to the database more than once, and we can organise things in a clearer way. Hooray for GATs! --- Cargo.lock | 6 +-- backend/Cargo.toml | 3 +- backend/src/data.rs | 107 ++++++++++++++++++++------------------------ 3 files changed, 52 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be10041..0719009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,8 +457,7 @@ dependencies = [ [[package]] name = "grafana-plugin-sdk" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d4f924fa9a503475ef466653443f8a06a659698e63e89862e7ae8016bdbb51d" +source = "git+https://github.com/grafana/grafana-plugin-sdk-rust?branch=query-data-gat#842a760603307bede2cc3f614524df35bdc2efaa" dependencies = [ "arrow2", "chrono", @@ -488,8 +487,7 @@ dependencies = [ [[package]] name = "grafana-plugin-sdk-macros" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39d9789e90e10c2459ca467b26ce8c2d5414d404c02a8ce32f4ff79cb85f534e" +source = "git+https://github.com/grafana/grafana-plugin-sdk-rust?branch=query-data-gat#842a760603307bede2cc3f614524df35bdc2efaa" dependencies = [ "proc-macro2", "quote", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 2d691fb..c253c78 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -11,7 +11,8 @@ authors = [ bytes = "1.1.0" chrono = "0.4.19" futures-util = "0.3.21" -grafana-plugin-sdk = "0.4.2" +# grafana-plugin-sdk = "0.4.2" +grafana-plugin-sdk = { git = "https://github.com/grafana/grafana-plugin-sdk-rust", branch = "query-data-gat" } http = "0.2.6" md5 = "0.7.0" rust_decimal = { version = "1.22.0", features = ["db-tokio-postgres"] } diff --git a/backend/src/data.rs b/backend/src/data.rs index c807ca2..30ecbc7 100644 --- a/backend/src/data.rs +++ b/backend/src/data.rs @@ -1,17 +1,10 @@ -use std::{collections::HashMap, sync::Arc}; +use futures_util::stream::FuturesOrdered; -use futures_util::{ - stream::{FuturesOrdered, FuturesUnordered}, - StreamExt, -}; - -use grafana_plugin_sdk::backend; -use tokio::sync::RwLock; -use tokio_postgres::Client; +use grafana_plugin_sdk::backend::{self, DataSourceInstanceSettings}; use crate::{ - path::{self, PathDisplay, QueryId}, - queries::{Query, SelectStatement, TailTarget}, + path::{PathDisplay, QueryId}, + queries::{Query, TailTarget}, rows_to_frame, Error, MaterializePlugin, }; @@ -34,73 +27,69 @@ impl backend::DataQueryError for QueryError { // Unfortunately this has to take all of its arguments by value until we have // GATs, since the `DataService::Stream` associated type can't contain references. // Ideally we'd just borrow the query/uid etc but it's really not a big deal. -async fn query_data_single( - client: Client, - uid: String, - query: backend::DataQuery, - queries: Arc>>, -) -> Result { - let q = query.query; - let target = q.as_tail()?; - let rows = target.select_all(&client).await?; - let mut frame = rows_to_frame(&rows); +impl MaterializePlugin { + async fn query_data_single( + &self, + datasource_instance_settings: &DataSourceInstanceSettings, + query: &backend::DataQuery, + ) -> Result { + let q = &query.query; + let client = self.get_client(datasource_instance_settings).await?; + let target = q.as_tail()?; + let rows = target.select_all(&client).await?; + let mut frame = rows_to_frame(&rows); - if let TailTarget::Select { statement } = target { - let query_id = QueryId::from_statement(statement); - queries.write().await.insert(query_id, statement.clone()); - } + if let TailTarget::Select { statement } = target { + let query_id = QueryId::from_statement(statement); + self.sql_queries + .write() + .await + .insert(query_id, statement.clone()); + } - let path = q.to_path(); - // Set the channel of the frame, indicating to Grafana that it should switch to - // streaming. - let channel = format!("ds/{}/{}", uid, path) - .parse() - .map_err(Error::CreatingChannel)?; - frame.set_channel(channel); - let frame = frame.check()?; + let path = q.to_path(); + // Set the channel of the frame, indicating to Grafana that it should switch to + // streaming. + let channel = format!("ds/{}/{}", datasource_instance_settings.uid, path) + .parse() + .map_err(Error::CreatingChannel)?; + frame.set_channel(channel); + let frame = frame.check()?; - Ok(backend::DataResponse::new(query.ref_id, vec![frame])) + Ok(backend::DataResponse::new( + query.ref_id.clone(), + vec![frame], + )) + } } #[backend::async_trait] impl backend::DataService for MaterializePlugin { type Query = Query; type QueryError = QueryError; - type Stream = backend::BoxDataResponseStream; + type Stream<'a> = backend::BoxDataResponseStream<'a, Self::QueryError>; - async fn query_data(&self, request: backend::QueryDataRequest) -> Self::Stream { + async fn query_data<'stream, 'req: 'stream, 'slf: 'req>( + &'slf self, + request: &'req backend::QueryDataRequest, + ) -> Self::Stream<'stream> { let datasource_settings = request .plugin_context .datasource_instance_settings - .clone() + .as_ref() .ok_or(Error::MissingDatasource) .unwrap(); - let clients: Vec<_> = request - .queries - .iter() - .map(|_| self.get_client(&datasource_settings)) - .collect::>() - .collect() - .await; - let queries = self.sql_queries.clone(); Box::pin( request .queries - .into_iter() - .zip(clients) - .map(move |(x, client)| { - let queries = queries.clone(); - let ref_id = x.ref_id.clone(); - let uid = datasource_settings.uid.clone(); - async { - let client = client.map_err(|source| QueryError { - ref_id: ref_id.clone(), + .iter() + .map(|x| async move { + self.query_data_single(datasource_settings, x) + .await + .map_err(|source| QueryError { + ref_id: x.ref_id.clone(), source, - })?; - query_data_single(client, uid, x, queries) - .await - .map_err(|source| QueryError { ref_id, source }) - } + }) }) .collect::>(), )