1
1
use std:: { collections:: HashMap , sync:: Arc } ;
2
2
3
- use futures_util:: {
4
- stream:: { FuturesOrdered , FuturesUnordered } ,
5
- StreamExt ,
6
- } ;
3
+ use futures_util:: stream:: FuturesOrdered ;
7
4
8
5
use grafana_plugin_sdk:: backend;
9
6
use tokio:: sync:: RwLock ;
@@ -35,14 +32,14 @@ impl backend::DataQueryError for QueryError {
35
32
// GATs, since the `DataService::Stream` associated type can't contain references.
36
33
// Ideally we'd just borrow the query/uid etc but it's really not a big deal.
37
34
async fn query_data_single (
38
- client : Client ,
39
- uid : String ,
35
+ client : & Client ,
36
+ uid : & str ,
40
37
query : backend:: DataQuery < Query > ,
41
38
queries : Arc < RwLock < HashMap < path:: QueryId , SelectStatement > > > ,
42
39
) -> Result < backend:: DataResponse , Error > {
43
40
let q = query. query ;
44
41
let target = q. as_tail ( ) ?;
45
- let rows = target. select_all ( & client) . await ?;
42
+ let rows = target. select_all ( client) . await ?;
46
43
let mut frame = rows_to_frame ( & rows) ;
47
44
48
45
if let TailTarget :: Select { statement } = target {
@@ -66,38 +63,42 @@ async fn query_data_single(
66
63
impl backend:: DataService for MaterializePlugin {
67
64
type Query = Query ;
68
65
type QueryError = QueryError ;
69
- type Stream = backend:: BoxDataResponseStream < Self :: QueryError > ;
66
+ type Stream < ' a > = backend:: BoxDataResponseStream < ' a , Self :: QueryError > ;
70
67
71
- async fn query_data ( & self , request : backend:: QueryDataRequest < Self :: Query > ) -> Self :: Stream {
68
+ async fn query_data (
69
+ & self ,
70
+ request : backend:: QueryDataRequest < Self :: Query > ,
71
+ ) -> Self :: Stream < ' _ > {
72
72
let datasource_settings = request
73
73
. plugin_context
74
74
. datasource_instance_settings
75
75
. clone ( )
76
76
. ok_or ( Error :: MissingDatasource )
77
77
. unwrap ( ) ;
78
- let clients: Vec < _ > = request
79
- . queries
80
- . iter ( )
81
- . map ( |_| self . get_client ( & datasource_settings) )
82
- . collect :: < FuturesUnordered < _ > > ( )
83
- . collect ( )
84
- . await ;
78
+ // let clients: Vec<_> = request
79
+ // .queries
80
+ // .iter()
81
+ // .map(|_| self.get_client(&datasource_settings))
82
+ // .collect::<FuturesUnordered<_>>()
83
+ // .collect()
84
+ // .await;
85
+ let client = Arc :: new ( self . get_client ( & datasource_settings) . await . unwrap ( ) ) ;
85
86
let queries = self . sql_queries . clone ( ) ;
86
87
Box :: pin (
87
88
request
88
89
. queries
89
90
. into_iter ( )
90
- . zip ( clients )
91
- . map ( move | ( x , client) | {
91
+ . map ( |x| {
92
+ let client = client . clone ( ) ;
92
93
let queries = queries. clone ( ) ;
93
94
let ref_id = x. ref_id . clone ( ) ;
94
95
let uid = datasource_settings. uid . clone ( ) ;
95
- async {
96
- let client = client. map_err ( |source| QueryError {
97
- ref_id : ref_id. clone ( ) ,
98
- source,
99
- } ) ?;
100
- query_data_single ( client, uid, x, queries)
96
+ async move {
97
+ // let client = client.map_err(|source| QueryError {
98
+ // ref_id: ref_id.clone(),
99
+ // source,
100
+ // })?;
101
+ query_data_single ( & client, & uid, x, queries)
101
102
. await
102
103
. map_err ( |source| QueryError { ref_id, source } )
103
104
}
0 commit comments