1
- use std:: { collections :: HashMap , sync:: Arc } ;
1
+ use std:: sync:: Arc ;
2
2
3
3
use futures_util:: stream:: FuturesOrdered ;
4
4
5
- use grafana_plugin_sdk:: backend;
6
- use tokio:: sync:: RwLock ;
7
- use tokio_postgres:: Client ;
5
+ use grafana_plugin_sdk:: backend:: { self , DataSourceInstanceSettings } ;
8
6
9
7
use crate :: {
10
- path:: { self , PathDisplay , QueryId } ,
11
- queries:: { Query , SelectStatement , TailTarget } ,
8
+ path:: { PathDisplay , QueryId } ,
9
+ queries:: { Query , TailTarget } ,
12
10
rows_to_frame, Error , MaterializePlugin ,
13
11
} ;
14
12
@@ -31,32 +29,37 @@ impl backend::DataQueryError for QueryError {
31
29
// Unfortunately this has to take all of its arguments by value until we have
32
30
// GATs, since the `DataService::Stream` associated type can't contain references.
33
31
// Ideally we'd just borrow the query/uid etc but it's really not a big deal.
34
- async fn query_data_single (
35
- client : & Client ,
36
- uid : & str ,
37
- query : backend:: DataQuery < Query > ,
38
- queries : Arc < RwLock < HashMap < path:: QueryId , SelectStatement > > > ,
39
- ) -> Result < backend:: DataResponse , Error > {
40
- let q = query. query ;
41
- let target = q. as_tail ( ) ?;
42
- let rows = target. select_all ( client) . await ?;
43
- let mut frame = rows_to_frame ( & rows) ;
32
+ impl MaterializePlugin {
33
+ async fn query_data_single (
34
+ & self ,
35
+ datasource_instance_settings : Arc < DataSourceInstanceSettings > ,
36
+ query : backend:: DataQuery < Query > ,
37
+ ) -> Result < backend:: DataResponse , Error > {
38
+ let q = query. query ;
39
+ let client = self . get_client ( & datasource_instance_settings) . await ?;
40
+ let target = q. as_tail ( ) ?;
41
+ let rows = target. select_all ( & client) . await ?;
42
+ let mut frame = rows_to_frame ( & rows) ;
44
43
45
- if let TailTarget :: Select { statement } = target {
46
- let query_id = QueryId :: from_statement ( statement) ;
47
- queries. write ( ) . await . insert ( query_id, statement. clone ( ) ) ;
48
- }
44
+ if let TailTarget :: Select { statement } = target {
45
+ let query_id = QueryId :: from_statement ( statement) ;
46
+ self . sql_queries
47
+ . write ( )
48
+ . await
49
+ . insert ( query_id, statement. clone ( ) ) ;
50
+ }
49
51
50
- let path = q. to_path ( ) ;
51
- // Set the channel of the frame, indicating to Grafana that it should switch to
52
- // streaming.
53
- let channel = format ! ( "ds/{}/{}" , uid, path)
54
- . parse ( )
55
- . map_err ( Error :: CreatingChannel ) ?;
56
- frame. set_channel ( channel) ;
57
- let frame = frame. check ( ) ?;
52
+ let path = q. to_path ( ) ;
53
+ // Set the channel of the frame, indicating to Grafana that it should switch to
54
+ // streaming.
55
+ let channel = format ! ( "ds/{}/{}" , datasource_instance_settings . uid, path)
56
+ . parse ( )
57
+ . map_err ( Error :: CreatingChannel ) ?;
58
+ frame. set_channel ( channel) ;
59
+ let frame = frame. check ( ) ?;
58
60
59
- Ok ( backend:: DataResponse :: new ( query. ref_id , vec ! [ frame] ) )
61
+ Ok ( backend:: DataResponse :: new ( query. ref_id , vec ! [ frame] ) )
62
+ }
60
63
}
61
64
62
65
#[ backend:: async_trait]
@@ -69,36 +72,27 @@ impl backend::DataService for MaterializePlugin {
69
72
& self ,
70
73
request : backend:: QueryDataRequest < Self :: Query > ,
71
74
) -> Self :: Stream < ' _ > {
72
- let datasource_settings = request
73
- . plugin_context
74
- . datasource_instance_settings
75
- . clone ( )
76
- . ok_or ( Error :: MissingDatasource )
77
- . unwrap ( ) ;
78
- // let clients: Vec<_> = request
79
- // .queries
80
- // .iter()
81
- // .map(|_| self.get_client(&datasource_settings))
82
- // .collect::<FuturesUnordered<_>>()
83
- // .collect()
84
- // .await;
85
- let client = Arc :: new ( self . get_client ( & datasource_settings) . await . unwrap ( ) ) ;
86
- let queries = self . sql_queries . clone ( ) ;
75
+ let datasource_settings = Arc :: new (
76
+ request
77
+ . plugin_context
78
+ . datasource_instance_settings
79
+ . clone ( )
80
+ . ok_or ( Error :: MissingDatasource )
81
+ . unwrap ( ) ,
82
+ ) ;
87
83
Box :: pin (
88
84
request
89
85
. queries
90
86
. into_iter ( )
91
87
. map ( |x| {
92
- let client = client. clone ( ) ;
93
- let queries = queries. clone ( ) ;
88
+ let datasource_settings = datasource_settings. clone ( ) ;
94
89
let ref_id = x. ref_id . clone ( ) ;
95
- let uid = datasource_settings. uid . clone ( ) ;
96
- async move {
90
+ async {
97
91
// let client = client.map_err(|source| QueryError {
98
92
// ref_id: ref_id.clone(),
99
93
// source,
100
- // })?;
101
- query_data_single ( & client , & uid , x , queries )
94
+ self . query_data_single ( datasource_settings , x )
95
+ // })?;
102
96
. await
103
97
. map_err ( |source| QueryError { ref_id, source } )
104
98
}
0 commit comments