@@ -38,7 +38,7 @@ use arrow_array::builder::BooleanBuilder;
38
38
use async_trait:: async_trait;
39
39
use datafusion_common:: error:: Result ;
40
40
use datafusion_common:: DataFusionError ;
41
- use datafusion_expr:: { AggregateUDF , ScalarUDF , Signature , WindowUDF } ;
41
+ use datafusion_expr:: { AggregateUDF , ScalarUDF , Signature , TypeSignature , WindowUDF } ;
42
42
use std:: collections:: { HashMap , HashSet } ;
43
43
use std:: fmt:: Debug ;
44
44
use std:: { any:: Any , sync:: Arc } ;
@@ -50,10 +50,18 @@ pub(crate) const COLUMNS: &str = "columns";
50
50
pub ( crate ) const DF_SETTINGS : & str = "df_settings" ;
51
51
pub ( crate ) const SCHEMATA : & str = "schemata" ;
52
52
pub ( crate ) const ROUTINES : & str = "routines" ;
53
+ pub ( crate ) const PARAMETERS : & str = "parameters" ;
53
54
54
55
/// All information schema tables
55
- pub const INFORMATION_SCHEMA_TABLES : & [ & str ] =
56
- & [ TABLES , VIEWS , COLUMNS , DF_SETTINGS , SCHEMATA , ROUTINES ] ;
56
+ pub const INFORMATION_SCHEMA_TABLES : & [ & str ] = & [
57
+ TABLES ,
58
+ VIEWS ,
59
+ COLUMNS ,
60
+ DF_SETTINGS ,
61
+ SCHEMATA ,
62
+ ROUTINES ,
63
+ PARAMETERS ,
64
+ ] ;
57
65
58
66
/// Implements the `information_schema` virtual schema and tables
59
67
///
@@ -286,6 +294,102 @@ impl InformationSchemaConfig {
286
294
fn is_deterministic ( signature : & Signature ) -> bool {
287
295
signature. volatility == Volatility :: Immutable
288
296
}
297
+ fn make_parameters (
298
+ & self ,
299
+ udfs : & HashMap < String , Arc < ScalarUDF > > ,
300
+ udafs : & HashMap < String , Arc < AggregateUDF > > ,
301
+ udwfs : & HashMap < String , Arc < WindowUDF > > ,
302
+ config_options : & ConfigOptions ,
303
+ builder : & mut InformationSchemaParametersBuilder ,
304
+ ) -> Result < ( ) > {
305
+ let catalog_name = & config_options. catalog . default_catalog ;
306
+ let schema_name = & config_options. catalog . default_schema ;
307
+ let mut add_parameters = |func_name : & str ,
308
+ args : Option < & Vec < ( String , String ) > > ,
309
+ arg_types : Vec < String > ,
310
+ return_type : Option < String > ,
311
+ is_variadic : bool | {
312
+ for ( position, type_name) in arg_types. iter ( ) . enumerate ( ) {
313
+ let param_name =
314
+ args. and_then ( |a| a. get ( position) . map ( |arg| arg. 0 . as_str ( ) ) ) ;
315
+ builder. add_parameter (
316
+ catalog_name,
317
+ schema_name,
318
+ func_name,
319
+ position as u64 + 1 ,
320
+ "IN" ,
321
+ param_name,
322
+ type_name,
323
+ None :: < & str > ,
324
+ is_variadic,
325
+ ) ;
326
+ }
327
+ if let Some ( return_type) = return_type {
328
+ builder. add_parameter (
329
+ catalog_name,
330
+ schema_name,
331
+ func_name,
332
+ 1 ,
333
+ "OUT" ,
334
+ None :: < & str > ,
335
+ return_type. as_str ( ) ,
336
+ None :: < & str > ,
337
+ false ,
338
+ ) ;
339
+ }
340
+ } ;
341
+
342
+ for ( func_name, udf) in udfs {
343
+ let args = udf. documentation ( ) . and_then ( |d| d. arguments . clone ( ) ) ;
344
+ let combinations = get_udf_args_and_return_types ( udf) ?;
345
+ for ( arg_types, return_type) in combinations {
346
+ add_parameters (
347
+ func_name,
348
+ args. as_ref ( ) ,
349
+ arg_types,
350
+ return_type,
351
+ Self :: is_variadic ( udf. signature ( ) ) ,
352
+ ) ;
353
+ }
354
+ }
355
+
356
+ for ( func_name, udaf) in udafs {
357
+ let args = udaf. documentation ( ) . and_then ( |d| d. arguments . clone ( ) ) ;
358
+ let combinations = get_udaf_args_and_return_types ( udaf) ?;
359
+ for ( arg_types, return_type) in combinations {
360
+ add_parameters (
361
+ func_name,
362
+ args. as_ref ( ) ,
363
+ arg_types,
364
+ return_type,
365
+ Self :: is_variadic ( udaf. signature ( ) ) ,
366
+ ) ;
367
+ }
368
+ }
369
+
370
+ for ( func_name, udwf) in udwfs {
371
+ let args = udwf. documentation ( ) . and_then ( |d| d. arguments . clone ( ) ) ;
372
+ let combinations = get_udwf_args_and_return_types ( udwf) ?;
373
+ for ( arg_types, return_type) in combinations {
374
+ add_parameters (
375
+ func_name,
376
+ args. as_ref ( ) ,
377
+ arg_types,
378
+ return_type,
379
+ Self :: is_variadic ( udwf. signature ( ) ) ,
380
+ ) ;
381
+ }
382
+ }
383
+
384
+ Ok ( ( ) )
385
+ }
386
+
387
+ fn is_variadic ( signature : & Signature ) -> bool {
388
+ matches ! (
389
+ signature. type_signature,
390
+ TypeSignature :: Variadic ( _) | TypeSignature :: VariadicAny
391
+ )
392
+ }
289
393
}
290
394
291
395
/// get the arguments and return types of a UDF
@@ -384,6 +488,7 @@ impl SchemaProvider for InformationSchemaProvider {
384
488
DF_SETTINGS => Arc :: new ( InformationSchemaDfSettings :: new ( config) ) ,
385
489
SCHEMATA => Arc :: new ( InformationSchemata :: new ( config) ) ,
386
490
ROUTINES => Arc :: new ( InformationSchemaRoutines :: new ( config) ) ,
491
+ PARAMETERS => Arc :: new ( InformationSchemaParameters :: new ( config) ) ,
387
492
_ => return Ok ( None ) ,
388
493
} ;
389
494
@@ -1098,3 +1203,135 @@ impl PartitionStream for InformationSchemaRoutines {
1098
1203
) )
1099
1204
}
1100
1205
}
1206
+
1207
+ #[ derive( Debug ) ]
1208
+ struct InformationSchemaParameters {
1209
+ schema : SchemaRef ,
1210
+ config : InformationSchemaConfig ,
1211
+ }
1212
+
1213
+ impl InformationSchemaParameters {
1214
+ fn new ( config : InformationSchemaConfig ) -> Self {
1215
+ let schema = Arc :: new ( Schema :: new ( vec ! [
1216
+ Field :: new( "specific_catalog" , DataType :: Utf8 , false ) ,
1217
+ Field :: new( "specific_schema" , DataType :: Utf8 , false ) ,
1218
+ Field :: new( "specific_name" , DataType :: Utf8 , false ) ,
1219
+ Field :: new( "ordinal_position" , DataType :: UInt64 , false ) ,
1220
+ Field :: new( "parameter_mode" , DataType :: Utf8 , false ) ,
1221
+ Field :: new( "parameter_name" , DataType :: Utf8 , true ) ,
1222
+ Field :: new( "data_type" , DataType :: Utf8 , false ) ,
1223
+ Field :: new( "parameter_default" , DataType :: Utf8 , true ) ,
1224
+ Field :: new( "is_variadic" , DataType :: Boolean , false ) ,
1225
+ ] ) ) ;
1226
+
1227
+ Self { schema, config }
1228
+ }
1229
+
1230
+ fn builder ( & self ) -> InformationSchemaParametersBuilder {
1231
+ InformationSchemaParametersBuilder {
1232
+ schema : self . schema . clone ( ) ,
1233
+ specific_catalog : StringBuilder :: new ( ) ,
1234
+ specific_schema : StringBuilder :: new ( ) ,
1235
+ specific_name : StringBuilder :: new ( ) ,
1236
+ ordinal_position : UInt64Builder :: new ( ) ,
1237
+ parameter_mode : StringBuilder :: new ( ) ,
1238
+ parameter_name : StringBuilder :: new ( ) ,
1239
+ data_type : StringBuilder :: new ( ) ,
1240
+ parameter_default : StringBuilder :: new ( ) ,
1241
+ is_variadic : BooleanBuilder :: new ( ) ,
1242
+ inserted : HashSet :: new ( ) ,
1243
+ }
1244
+ }
1245
+ }
1246
+
1247
+ struct InformationSchemaParametersBuilder {
1248
+ schema : SchemaRef ,
1249
+ specific_catalog : StringBuilder ,
1250
+ specific_schema : StringBuilder ,
1251
+ specific_name : StringBuilder ,
1252
+ ordinal_position : UInt64Builder ,
1253
+ parameter_mode : StringBuilder ,
1254
+ parameter_name : StringBuilder ,
1255
+ data_type : StringBuilder ,
1256
+ parameter_default : StringBuilder ,
1257
+ is_variadic : BooleanBuilder ,
1258
+ // use HashSet to avoid duplicate rows. The key is (specific_name, ordinal_position, parameter_mode, data_type)
1259
+ inserted : HashSet < ( String , u64 , String , String ) > ,
1260
+ }
1261
+
1262
+ impl InformationSchemaParametersBuilder {
1263
+ #[ allow( clippy:: too_many_arguments) ]
1264
+ fn add_parameter (
1265
+ & mut self ,
1266
+ specific_catalog : impl AsRef < str > ,
1267
+ specific_schema : impl AsRef < str > ,
1268
+ specific_name : impl AsRef < str > ,
1269
+ ordinal_position : u64 ,
1270
+ parameter_mode : impl AsRef < str > ,
1271
+ parameter_name : Option < impl AsRef < str > > ,
1272
+ data_type : impl AsRef < str > ,
1273
+ parameter_default : Option < impl AsRef < str > > ,
1274
+ is_variadic : bool ,
1275
+ ) {
1276
+ let key = (
1277
+ specific_name. as_ref ( ) . to_string ( ) ,
1278
+ ordinal_position,
1279
+ parameter_mode. as_ref ( ) . to_string ( ) ,
1280
+ data_type. as_ref ( ) . to_string ( ) ,
1281
+ ) ;
1282
+ if self . inserted . insert ( key) {
1283
+ self . specific_catalog
1284
+ . append_value ( specific_catalog. as_ref ( ) ) ;
1285
+ self . specific_schema . append_value ( specific_schema. as_ref ( ) ) ;
1286
+ self . specific_name . append_value ( specific_name. as_ref ( ) ) ;
1287
+ self . ordinal_position . append_value ( ordinal_position) ;
1288
+ self . parameter_mode . append_value ( parameter_mode. as_ref ( ) ) ;
1289
+ self . parameter_name . append_option ( parameter_name. as_ref ( ) ) ;
1290
+ self . data_type . append_value ( data_type. as_ref ( ) ) ;
1291
+ self . parameter_default . append_option ( parameter_default) ;
1292
+ self . is_variadic . append_value ( is_variadic) ;
1293
+ }
1294
+ }
1295
+
1296
+ fn finish ( & mut self ) -> RecordBatch {
1297
+ RecordBatch :: try_new (
1298
+ self . schema . clone ( ) ,
1299
+ vec ! [
1300
+ Arc :: new( self . specific_catalog. finish( ) ) ,
1301
+ Arc :: new( self . specific_schema. finish( ) ) ,
1302
+ Arc :: new( self . specific_name. finish( ) ) ,
1303
+ Arc :: new( self . ordinal_position. finish( ) ) ,
1304
+ Arc :: new( self . parameter_mode. finish( ) ) ,
1305
+ Arc :: new( self . parameter_name. finish( ) ) ,
1306
+ Arc :: new( self . data_type. finish( ) ) ,
1307
+ Arc :: new( self . parameter_default. finish( ) ) ,
1308
+ Arc :: new( self . is_variadic. finish( ) ) ,
1309
+ ] ,
1310
+ )
1311
+ . unwrap ( )
1312
+ }
1313
+ }
1314
+
1315
+ impl PartitionStream for InformationSchemaParameters {
1316
+ fn schema ( & self ) -> & SchemaRef {
1317
+ & self . schema
1318
+ }
1319
+
1320
+ fn execute ( & self , ctx : Arc < TaskContext > ) -> SendableRecordBatchStream {
1321
+ let config = self . config . clone ( ) ;
1322
+ let mut builder = self . builder ( ) ;
1323
+ Box :: pin ( RecordBatchStreamAdapter :: new (
1324
+ self . schema . clone ( ) ,
1325
+ futures:: stream:: once ( async move {
1326
+ config. make_parameters (
1327
+ ctx. scalar_functions ( ) ,
1328
+ ctx. aggregate_functions ( ) ,
1329
+ ctx. window_functions ( ) ,
1330
+ ctx. session_config ( ) . options ( ) ,
1331
+ & mut builder,
1332
+ ) ?;
1333
+ Ok ( builder. finish ( ) )
1334
+ } ) ,
1335
+ ) )
1336
+ }
1337
+ }
0 commit comments