1
1
//! SDK types and traits relevant to plugins that query data.
2
2
use std:: { collections:: HashMap , pin:: Pin , time:: Duration } ;
3
3
4
+ use bytes:: Bytes ;
4
5
use futures_core:: Stream ;
5
6
use futures_util:: StreamExt ;
6
- use serde:: de:: DeserializeOwned ;
7
+ use serde:: { de:: DeserializeOwned , Serialize } ;
8
+ use tonic:: transport:: Endpoint ;
7
9
8
10
use crate :: {
9
11
backend:: { self , ConvertFromError , TimeRange } ,
10
12
data, pluginv2,
11
13
} ;
12
14
15
+ use super :: ConvertToError ;
16
+
17
+ /// Transport metadata sent alongside a query.
18
+ ///
19
+ /// This should be passed into [`DataClient::query_data`] when forwarding
20
+ /// a query to the same Grafana instance as the query originated.
21
+ #[ derive( Clone , Debug ) ]
22
+ pub struct TransportMetadata ( tonic:: metadata:: MetadataMap ) ;
23
+
13
24
/// A request for data made by Grafana.
14
25
///
15
26
/// Details of the request source can be found in `plugin_context`,
16
27
/// while the actual plugins themselves are in `queries`.
17
28
#[ derive( Debug ) ]
18
29
#[ non_exhaustive]
19
- pub struct QueryDataRequest < Q >
20
- where
21
- Q : DeserializeOwned ,
22
- {
30
+ pub struct QueryDataRequest < Q > {
23
31
/// Details of the plugin instance from which the request originated.
24
32
///
25
33
/// If the request originates from a datasource instance, this will
@@ -34,38 +42,51 @@ where
34
42
/// the query to the frontend; this should be included in the corresponding
35
43
/// `DataResponse` for each query.
36
44
pub queries : Vec < DataQuery < Q > > ,
45
+
46
+ grpc_meta : TransportMetadata ,
47
+ }
48
+
49
+ impl < Q > QueryDataRequest < Q > {
50
+ /// Get the transport metadata in this request.
51
+ ///
52
+ /// This is required when using the [`DataClient`] to query
53
+ /// other Grafana datasources.
54
+ pub fn transport_metadata ( & self ) -> & TransportMetadata {
55
+ & self . grpc_meta
56
+ }
37
57
}
38
58
39
- impl < Q > TryFrom < pluginv2:: QueryDataRequest > for QueryDataRequest < Q >
59
+ impl < Q > TryFrom < tonic :: Request < pluginv2:: QueryDataRequest > > for QueryDataRequest < Q >
40
60
where
41
61
Q : DeserializeOwned ,
42
62
{
43
63
type Error = ConvertFromError ;
44
- fn try_from ( other : pluginv2:: QueryDataRequest ) -> Result < Self , Self :: Error > {
64
+ fn try_from ( other : tonic:: Request < pluginv2:: QueryDataRequest > ) -> Result < Self , Self :: Error > {
65
+ // Clone is required until https://github.com/hyperium/tonic/pull/1118 is released.
66
+ let grpc_meta = other. metadata ( ) . clone ( ) ;
67
+ let request = other. into_inner ( ) ;
45
68
Ok ( Self {
46
- plugin_context : other
69
+ plugin_context : request
47
70
. plugin_context
48
71
. ok_or ( ConvertFromError :: MissingPluginContext )
49
72
. and_then ( TryInto :: try_into) ?,
50
- headers : other . headers ,
51
- queries : other
73
+ headers : request . headers ,
74
+ queries : request
52
75
. queries
53
76
. into_iter ( )
54
77
. map ( DataQuery :: try_from)
55
78
. collect :: < Result < Vec < _ > , _ > > ( ) ?,
79
+ grpc_meta : TransportMetadata ( grpc_meta) ,
56
80
} )
57
81
}
58
82
}
59
83
60
84
/// A query made by Grafana to the plugin as part of a [`QueryDataRequest`].
61
85
///
62
86
/// The `query` field contains any fields set by the plugin's UI.
63
- #[ derive( Debug ) ]
64
87
#[ non_exhaustive]
65
- pub struct DataQuery < Q >
66
- where
67
- Q : DeserializeOwned ,
68
- {
88
+ #[ derive( Clone , Debug ) ]
89
+ pub struct DataQuery < Q > {
69
90
/// The unique identifier of the query, set by the frontend call.
70
91
///
71
92
/// This should be included in the corresponding [`DataResponse`].
@@ -111,6 +132,46 @@ where
111
132
}
112
133
}
113
134
135
+ impl < Q > TryFrom < DataQuery < Q > > for pluginv2:: DataQuery
136
+ where
137
+ Q : Serialize ,
138
+ {
139
+ type Error = ConvertToError ;
140
+
141
+ fn try_from ( other : DataQuery < Q > ) -> Result < Self , Self :: Error > {
142
+ Ok ( Self {
143
+ ref_id : other. ref_id ,
144
+ max_data_points : other. max_data_points ,
145
+ interval_ms : other. interval . as_millis ( ) as i64 ,
146
+ time_range : Some ( other. time_range . into ( ) ) ,
147
+ json : serde_json:: to_vec ( & other. query )
148
+ . map_err ( |err| ConvertToError :: InvalidJson { err } ) ?,
149
+ query_type : other. query_type ,
150
+ } )
151
+ }
152
+ }
153
+
154
+ #[ derive( Debug ) ]
155
+ enum DataResponseFrames {
156
+ Deserialized ( Vec < data:: Frame > ) ,
157
+ Serialized ( Result < Vec < Vec < u8 > > , data:: Error > ) ,
158
+ }
159
+
160
+ impl DataResponseFrames {
161
+ fn into_serialized ( self ) -> Result < Vec < Vec < u8 > > , data:: Error > {
162
+ match self {
163
+ Self :: Deserialized ( frames) => to_arrow (
164
+ frames
165
+ . iter ( )
166
+ . map ( |x| x. check ( ) )
167
+ . collect :: < Result < Vec < _ > , _ > > ( ) ?,
168
+ & None ,
169
+ ) ,
170
+ Self :: Serialized ( x) => x,
171
+ }
172
+ }
173
+ }
174
+
114
175
/// The results from a [`DataQuery`].
115
176
#[ derive( Debug ) ]
116
177
pub struct DataResponse {
@@ -121,7 +182,7 @@ pub struct DataResponse {
121
182
ref_id : String ,
122
183
123
184
/// The data returned from the query.
124
- frames : Result < Vec < Vec < u8 > > , data :: Error > ,
185
+ frames : DataResponseFrames ,
125
186
}
126
187
127
188
impl DataResponse {
@@ -130,7 +191,7 @@ impl DataResponse {
130
191
pub fn new ( ref_id : String , frames : Vec < data:: CheckedFrame < ' _ > > ) -> Self {
131
192
Self {
132
193
ref_id : ref_id. clone ( ) ,
133
- frames : to_arrow ( frames, & Some ( ref_id) ) ,
194
+ frames : DataResponseFrames :: Serialized ( to_arrow ( frames, & Some ( ref_id) ) ) ,
134
195
}
135
196
}
136
197
}
@@ -286,15 +347,15 @@ where
286
347
let responses = DataService :: query_data (
287
348
self ,
288
349
request
289
- . into_inner ( )
350
+ // .into_inner()
290
351
. try_into ( )
291
352
. map_err ( ConvertFromError :: into_tonic_status) ?,
292
353
)
293
354
. await
294
355
. map ( |resp| match resp {
295
356
Ok ( x) => {
296
357
let ref_id = x. ref_id ;
297
- x. frames . map_or_else (
358
+ x. frames . into_serialized ( ) . map_or_else (
298
359
|e| {
299
360
(
300
361
ref_id. clone ( ) ,
@@ -336,3 +397,89 @@ where
336
397
} ) )
337
398
}
338
399
}
400
+
401
+ /// A client for querying data from Grafana.
402
+ ///
403
+ /// This can be used by plugins which need to query data from
404
+ /// a different datasource of the same Grafana instance.
405
+ #[ derive( Debug , Clone ) ]
406
+ pub struct DataClient < T = tonic:: transport:: Channel > {
407
+ inner : pluginv2:: data_client:: DataClient < T > ,
408
+ }
409
+
410
+ impl DataClient < tonic:: transport:: Channel > {
411
+ /// Create a new DataClient to connect to the given endpoint.
412
+ ///
413
+ /// This constructor uses the default [`tonic::transport::Channel`] as the
414
+ /// transport. Use [`DataClient::with_channel`] to provide your own channel.
415
+ pub fn new ( url : impl Into < Bytes > ) -> Result < Self , tonic:: transport:: Error > {
416
+ let endpoint = Endpoint :: from_shared ( url) ?;
417
+ let channel = endpoint. connect_lazy ( ) ;
418
+ Ok ( Self {
419
+ inner : pluginv2:: data_client:: DataClient :: new ( channel) ,
420
+ } )
421
+ }
422
+ }
423
+
424
+ /// Errors which can occur when querying data.
425
+ #[ derive( Clone , Debug , thiserror:: Error ) ]
426
+ #[ error( "Error querying data" ) ]
427
+ pub struct QueryDataError ;
428
+
429
+ impl < T > DataClient < T > {
430
+ /// Query for data from a Grafana datasource.
431
+ pub async fn query_data < Q > (
432
+ & mut self ,
433
+ queries : Vec < DataQuery < Q > > ,
434
+ upstream_metadata : & TransportMetadata ,
435
+ ) -> Result < HashMap < String , DataResponse > , QueryDataError >
436
+ where
437
+ Q : Serialize ,
438
+ T : tonic:: client:: GrpcService < tonic:: body:: BoxBody > + Clone + Send + Sync + ' static ,
439
+ T :: ResponseBody : tonic:: codegen:: Body < Data = tonic:: codegen:: Bytes > + Send + Sync + ' static ,
440
+ <T :: ResponseBody as tonic:: codegen:: Body >:: Error : Into < tonic:: codegen:: StdError > + Send ,
441
+ {
442
+ let queries: Vec < _ > = queries
443
+ . into_iter ( )
444
+ // TODO: add enum variant
445
+ . map ( |q| q. try_into ( ) . map_err ( |_| QueryDataError ) )
446
+ . collect :: < Result < _ , _ > > ( ) ?;
447
+ let query_data_request = pluginv2:: QueryDataRequest {
448
+ plugin_context : None ,
449
+ headers : Default :: default ( ) ,
450
+ queries,
451
+ } ;
452
+ let mut request = tonic:: Request :: new ( query_data_request) ;
453
+ for kv in upstream_metadata. 0 . iter ( ) {
454
+ match kv {
455
+ tonic:: metadata:: KeyAndValueRef :: Ascii ( k, v) => {
456
+ request. metadata_mut ( ) . insert ( k, v. clone ( ) ) ;
457
+ }
458
+ tonic:: metadata:: KeyAndValueRef :: Binary ( k, v) => {
459
+ request. metadata_mut ( ) . insert_bin ( k, v. clone ( ) ) ;
460
+ }
461
+ }
462
+ }
463
+ let responses = self
464
+ . inner
465
+ . query_data ( request)
466
+ . await
467
+ // TODO: add enum variant
468
+ . map_err ( |_| QueryDataError ) ?
469
+ . into_inner ( )
470
+ . responses
471
+ . into_iter ( )
472
+ . map ( |( k, v) | {
473
+ let frames = v
474
+ . frames
475
+ . into_iter ( )
476
+ . map ( |f| data:: Frame :: from_arrow ( f) . map_err ( |_| QueryDataError ) )
477
+ . collect :: < Result < Vec < _ > , _ > > ( )
478
+ . map ( DataResponseFrames :: Deserialized )
479
+ . unwrap ( ) ;
480
+ Ok ( ( k. clone ( ) , DataResponse { ref_id : k, frames } ) )
481
+ } )
482
+ . collect :: < Result < _ , _ > > ( ) ?;
483
+ Ok ( responses)
484
+ }
485
+ }
0 commit comments