Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: add compatibility with GATs in backend::DataService #19

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ authors = [
bytes = "1.1.0"
chrono = "0.4.19"
futures-util = "0.3.21"
grafana-plugin-sdk = "0.4.2"
# grafana-plugin-sdk = "0.4.2"
grafana-plugin-sdk = { git = "https://github.com/grafana/grafana-plugin-sdk-rust", branch = "query-data-gat" }
http = "0.2.6"
md5 = "0.7.0"
rust_decimal = { version = "1.22.0", features = ["db-tokio-postgres"] }
Expand Down
107 changes: 48 additions & 59 deletions backend/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
use std::{collections::HashMap, sync::Arc};
use futures_util::stream::FuturesOrdered;

use futures_util::{
stream::{FuturesOrdered, FuturesUnordered},
StreamExt,
};

use grafana_plugin_sdk::backend;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use grafana_plugin_sdk::backend::{self, DataSourceInstanceSettings};

use crate::{
path::{self, PathDisplay, QueryId},
queries::{Query, SelectStatement, TailTarget},
path::{PathDisplay, QueryId},
queries::{Query, TailTarget},
rows_to_frame, Error, MaterializePlugin,
};

Expand All @@ -34,73 +27,69 @@ impl backend::DataQueryError for QueryError {
// Unfortunately this has to take all of its arguments by value until we have
// GATs, since the `DataService::Stream` associated type can't contain references.
// Ideally we'd just borrow the query/uid etc but it's really not a big deal.
async fn query_data_single(
client: Client,
uid: String,
query: backend::DataQuery<Query>,
queries: Arc<RwLock<HashMap<path::QueryId, SelectStatement>>>,
) -> Result<backend::DataResponse, Error> {
let q = query.query;
let target = q.as_tail()?;
let rows = target.select_all(&client).await?;
let mut frame = rows_to_frame(&rows);
impl MaterializePlugin {
async fn query_data_single(
&self,
datasource_instance_settings: &DataSourceInstanceSettings,
query: &backend::DataQuery<Query>,
) -> Result<backend::DataResponse, Error> {
let q = &query.query;
let client = self.get_client(datasource_instance_settings).await?;
let target = q.as_tail()?;
let rows = target.select_all(&client).await?;
let mut frame = rows_to_frame(&rows);

if let TailTarget::Select { statement } = target {
let query_id = QueryId::from_statement(statement);
queries.write().await.insert(query_id, statement.clone());
}
if let TailTarget::Select { statement } = target {
let query_id = QueryId::from_statement(statement);
self.sql_queries
.write()
.await
.insert(query_id, statement.clone());
}

let path = q.to_path();
// Set the channel of the frame, indicating to Grafana that it should switch to
// streaming.
let channel = format!("ds/{}/{}", uid, path)
.parse()
.map_err(Error::CreatingChannel)?;
frame.set_channel(channel);
let frame = frame.check()?;
let path = q.to_path();
// Set the channel of the frame, indicating to Grafana that it should switch to
// streaming.
let channel = format!("ds/{}/{}", datasource_instance_settings.uid, path)
.parse()
.map_err(Error::CreatingChannel)?;
frame.set_channel(channel);
let frame = frame.check()?;

Ok(backend::DataResponse::new(query.ref_id, vec![frame]))
Ok(backend::DataResponse::new(
query.ref_id.clone(),
vec![frame],
))
}
}

#[backend::async_trait]
impl backend::DataService for MaterializePlugin {
type Query = Query;
type QueryError = QueryError;
type Stream = backend::BoxDataResponseStream<Self::QueryError>;
type Stream<'a> = backend::BoxDataResponseStream<'a, Self::QueryError>;

async fn query_data(&self, request: backend::QueryDataRequest<Self::Query>) -> Self::Stream {
async fn query_data<'stream, 'req: 'stream, 'slf: 'req>(
&'slf self,
request: &'req backend::QueryDataRequest<Self::Query>,
) -> Self::Stream<'stream> {
let datasource_settings = request
.plugin_context
.datasource_instance_settings
.clone()
.as_ref()
.ok_or(Error::MissingDatasource)
.unwrap();
let clients: Vec<_> = request
.queries
.iter()
.map(|_| self.get_client(&datasource_settings))
.collect::<FuturesUnordered<_>>()
.collect()
.await;
let queries = self.sql_queries.clone();
Box::pin(
request
.queries
.into_iter()
.zip(clients)
.map(move |(x, client)| {
let queries = queries.clone();
let ref_id = x.ref_id.clone();
let uid = datasource_settings.uid.clone();
async {
let client = client.map_err(|source| QueryError {
ref_id: ref_id.clone(),
.iter()
.map(|x| async move {
self.query_data_single(datasource_settings, x)
.await
.map_err(|source| QueryError {
ref_id: x.ref_id.clone(),
source,
})?;
query_data_single(client, uid, x, queries)
.await
.map_err(|source| QueryError { ref_id, source })
}
})
})
.collect::<FuturesOrdered<_>>(),
)
Expand Down