1
- use std :: { collections :: HashMap , sync :: Arc } ;
1
+ use futures_util :: stream :: FuturesOrdered ;
2
2
3
- use futures_util:: {
4
- stream:: { FuturesOrdered , FuturesUnordered } ,
5
- StreamExt ,
6
- } ;
7
-
8
- use grafana_plugin_sdk:: backend;
9
- use tokio:: sync:: RwLock ;
10
- use tokio_postgres:: Client ;
3
+ use grafana_plugin_sdk:: backend:: { self , DataSourceInstanceSettings } ;
11
4
12
5
use crate :: {
13
- path:: { self , PathDisplay , QueryId } ,
14
- queries:: { Query , SelectStatement , TailTarget } ,
6
+ path:: { PathDisplay , QueryId } ,
7
+ queries:: { Query , TailTarget } ,
15
8
rows_to_frame, Error , MaterializePlugin ,
16
9
} ;
17
10
@@ -34,73 +27,69 @@ impl backend::DataQueryError for QueryError {
34
27
// Unfortunately this has to take all of its arguments by value until we have
35
28
// GATs, since the `DataService::Stream` associated type can't contain references.
36
29
// Ideally we'd just borrow the query/uid etc but it's really not a big deal.
37
- async fn query_data_single (
38
- client : Client ,
39
- uid : String ,
40
- query : backend:: DataQuery < Query > ,
41
- queries : Arc < RwLock < HashMap < path:: QueryId , SelectStatement > > > ,
42
- ) -> Result < backend:: DataResponse , Error > {
43
- let q = query. query ;
44
- let target = q. as_tail ( ) ?;
45
- let rows = target. select_all ( & client) . await ?;
46
- let mut frame = rows_to_frame ( & rows) ;
30
+ impl MaterializePlugin {
31
+ async fn query_data_single (
32
+ & self ,
33
+ datasource_instance_settings : & DataSourceInstanceSettings ,
34
+ query : & backend:: DataQuery < Query > ,
35
+ ) -> Result < backend:: DataResponse , Error > {
36
+ let q = & query. query ;
37
+ let client = self . get_client ( datasource_instance_settings) . await ?;
38
+ let target = q. as_tail ( ) ?;
39
+ let rows = target. select_all ( & client) . await ?;
40
+ let mut frame = rows_to_frame ( & rows) ;
47
41
48
- if let TailTarget :: Select { statement } = target {
49
- let query_id = QueryId :: from_statement ( statement) ;
50
- queries. write ( ) . await . insert ( query_id, statement. clone ( ) ) ;
51
- }
42
+ if let TailTarget :: Select { statement } = target {
43
+ let query_id = QueryId :: from_statement ( statement) ;
44
+ self . sql_queries
45
+ . write ( )
46
+ . await
47
+ . insert ( query_id, statement. clone ( ) ) ;
48
+ }
52
49
53
- let path = q. to_path ( ) ;
54
- // Set the channel of the frame, indicating to Grafana that it should switch to
55
- // streaming.
56
- let channel = format ! ( "ds/{}/{}" , uid, path)
57
- . parse ( )
58
- . map_err ( Error :: CreatingChannel ) ?;
59
- frame. set_channel ( channel) ;
60
- let frame = frame. check ( ) ?;
50
+ let path = q. to_path ( ) ;
51
+ // Set the channel of the frame, indicating to Grafana that it should switch to
52
+ // streaming.
53
+ let channel = format ! ( "ds/{}/{}" , datasource_instance_settings . uid, path)
54
+ . parse ( )
55
+ . map_err ( Error :: CreatingChannel ) ?;
56
+ frame. set_channel ( channel) ;
57
+ let frame = frame. check ( ) ?;
61
58
62
- Ok ( backend:: DataResponse :: new ( query. ref_id , vec ! [ frame] ) )
59
+ Ok ( backend:: DataResponse :: new (
60
+ query. ref_id . clone ( ) ,
61
+ vec ! [ frame] ,
62
+ ) )
63
+ }
63
64
}
64
65
65
66
#[ backend:: async_trait]
66
67
impl backend:: DataService for MaterializePlugin {
67
68
type Query = Query ;
68
69
type QueryError = QueryError ;
69
- type Stream = backend:: BoxDataResponseStream < Self :: QueryError > ;
70
+ type Stream < ' a > = backend:: BoxDataResponseStream < ' a , Self :: QueryError > ;
70
71
71
- async fn query_data ( & self , request : backend:: QueryDataRequest < Self :: Query > ) -> Self :: Stream {
72
+ async fn query_data < ' stream , ' req : ' stream , ' slf : ' req > (
73
+ & ' slf self ,
74
+ request : & ' req backend:: QueryDataRequest < Self :: Query > ,
75
+ ) -> Self :: Stream < ' stream > {
72
76
let datasource_settings = request
73
77
. plugin_context
74
78
. datasource_instance_settings
75
- . clone ( )
79
+ . as_ref ( )
76
80
. ok_or ( Error :: MissingDatasource )
77
81
. unwrap ( ) ;
78
- let clients: Vec < _ > = request
79
- . queries
80
- . iter ( )
81
- . map ( |_| self . get_client ( & datasource_settings) )
82
- . collect :: < FuturesUnordered < _ > > ( )
83
- . collect ( )
84
- . await ;
85
- let queries = self . sql_queries . clone ( ) ;
86
82
Box :: pin (
87
83
request
88
84
. queries
89
- . into_iter ( )
90
- . zip ( clients)
91
- . map ( move |( x, client) | {
92
- let queries = queries. clone ( ) ;
93
- let ref_id = x. ref_id . clone ( ) ;
94
- let uid = datasource_settings. uid . clone ( ) ;
95
- async {
96
- let client = client. map_err ( |source| QueryError {
97
- ref_id : ref_id. clone ( ) ,
85
+ . iter ( )
86
+ . map ( |x| async move {
87
+ self . query_data_single ( datasource_settings, x)
88
+ . await
89
+ . map_err ( |source| QueryError {
90
+ ref_id : x. ref_id . clone ( ) ,
98
91
source,
99
- } ) ?;
100
- query_data_single ( client, uid, x, queries)
101
- . await
102
- . map_err ( |source| QueryError { ref_id, source } )
103
- }
92
+ } )
104
93
} )
105
94
. collect :: < FuturesOrdered < _ > > ( ) ,
106
95
)
0 commit comments