Skip to content

Commit b3e25be

Browse files
committed
Complete integration
1 parent 1118305 commit b3e25be

File tree

13 files changed

+72
-106
lines changed

13 files changed

+72
-106
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ ctor = "0.2.0"
6262
datafusion = { path = "datafusion/core" }
6363
datafusion-common = { path = "datafusion/common" }
6464
datafusion-expr = { path = "datafusion/expr" }
65+
datafusion-functions = { path = "datafusion/functions" }
6566
datafusion-sql = { path = "datafusion/sql" }
6667
datafusion-optimizer = { path = "datafusion/optimizer" }
6768
datafusion-physical-expr = { path = "datafusion/physical-expr" }

datafusion/core/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ backtrace = ["datafusion-common/backtrace"]
4040
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
4141
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
4242
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
43-
encoding_expressions = ["datafusion-physical-expr/encoding_expressions"]
43+
encoding_expressions = ["datafusion-functions/encoding_expressions"]
4444
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
4545
force_hash_collisions = []
4646
parquet = ["datafusion-common/parquet", "dep:parquet"]
@@ -65,6 +65,7 @@ dashmap = { workspace = true }
6565
datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false }
6666
datafusion-execution = { workspace = true }
6767
datafusion-expr = { workspace = true }
68+
datafusion-functions = { workspace = true }
6869
datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false }
6970
datafusion-physical-expr = { path = "../physical-expr", version = "32.0.0", default-features = false }
7071
datafusion-physical-plan = { workspace = true }

datafusion/core/src/execution/context/mod.rs

+48-59
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use datafusion_common::{
4040
exec_err, not_impl_err, plan_datafusion_err, plan_err,
4141
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
4242
};
43-
pub use datafusion_execution::registry::MutableFunctionRegistry;
4443
use datafusion_execution::registry::SerializerRegistry;
4544
use datafusion_expr::{
4645
logical_plan::{DdlStatement, Statement},
@@ -796,6 +795,48 @@ impl SessionContext {
796795
.add_var_provider(variable_type, provider);
797796
}
798797

798+
/// Registers a scalar UDF within this context.
799+
///
800+
/// Note in SQL queries, function names are looked up using
801+
/// lowercase unless the query uses quotes. For example,
802+
///
803+
/// - `SELECT MY_FUNC(x)...` will look for a function named `"my_func"`
804+
/// - `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"`
805+
pub fn register_udf(&self, f: ScalarUDF) {
806+
self.state
807+
.write()
808+
.scalar_functions
809+
.insert(f.name().to_string(), Arc::new(f));
810+
}
811+
812+
/// Registers an aggregate UDF within this context.
813+
///
814+
/// Note in SQL queries, aggregate names are looked up using
815+
/// lowercase unless the query uses quotes. For example,
816+
///
817+
/// - `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
818+
/// - `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
819+
pub fn register_udaf(&self, f: AggregateUDF) {
820+
self.state
821+
.write()
822+
.aggregate_functions
823+
.insert(f.name.clone(), Arc::new(f));
824+
}
825+
826+
/// Registers a window UDF within this context.
827+
///
828+
/// Note in SQL queries, window function names are looked up using
829+
/// lowercase unless the query uses quotes. For example,
830+
///
831+
/// - `SELECT MY_UDWF(x)...` will look for a window function named `"my_udwf"`
832+
/// - `SELECT "my_UDWF"(x)` will look for a window function named `"my_UDWF"`
833+
pub fn register_udwf(&self, f: WindowUDF) {
834+
self.state
835+
.write()
836+
.window_functions
837+
.insert(f.name.clone(), Arc::new(f));
838+
}
839+
799840
/// Creates a [`DataFrame`] for reading a data source.
800841
///
801842
/// For more control such as reading multiple files, you can use
@@ -1117,50 +1158,6 @@ impl FunctionRegistry for SessionContext {
11171158
}
11181159
}
11191160

1120-
impl MutableFunctionRegistry for SessionContext {
1121-
/// Registers a scalar UDF within this context.
1122-
///
1123-
/// Note in SQL queries, function names are looked up using
1124-
/// lowercase unless the query uses quotes. For example,
1125-
///
1126-
/// - `SELECT MY_FUNC(x)...` will look for a function named `"my_func"`
1127-
/// - `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"`
1128-
fn register_udf(&self, f: ScalarUDF) {
1129-
self.state
1130-
.write()
1131-
.scalar_functions
1132-
.insert(f.name().to_string(), Arc::new(f));
1133-
}
1134-
1135-
/// Registers an aggregate UDF within this context.
1136-
///
1137-
/// Note in SQL queries, aggregate names are looked up using
1138-
/// lowercase unless the query uses quotes. For example,
1139-
///
1140-
/// - `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
1141-
/// - `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
1142-
fn register_udaf(&self, f: AggregateUDF) {
1143-
self.state
1144-
.write()
1145-
.aggregate_functions
1146-
.insert(f.name.clone(), Arc::new(f));
1147-
}
1148-
1149-
/// Registers a window UDF within this context.
1150-
///
1151-
/// Note in SQL queries, window function names are looked up using
1152-
/// lowercase unless the query uses quotes. For example,
1153-
///
1154-
/// - `SELECT MY_UDWF(x)...` will look for a window function named `"my_udwf"`
1155-
/// - `SELECT "my_UDWF"(x)` will look for a window function named `"my_UDWF"`
1156-
fn register_udwf(&self, f: WindowUDF) {
1157-
self.state
1158-
.write()
1159-
.window_functions
1160-
.insert(f.name.clone(), Arc::new(f));
1161-
}
1162-
}
1163-
11641161
/// A planner used to add extensions to DataFusion logical and physical plans.
11651162
#[async_trait]
11661163
pub trait QueryPlanner {
@@ -1301,14 +1298,18 @@ impl SessionState {
13011298
);
13021299
}
13031300

1301+
// register built in functions
1302+
let mut scalar_functions = HashMap::new();
1303+
datafusion_functions::register_all(&mut scalar_functions);
1304+
13041305
SessionState {
13051306
session_id,
13061307
analyzer: Analyzer::new(),
13071308
optimizer: Optimizer::new(),
13081309
physical_optimizers: PhysicalOptimizer::new(),
13091310
query_planner: Arc::new(DefaultQueryPlanner {}),
13101311
catalog_list,
1311-
scalar_functions: HashMap::new(),
1312+
scalar_functions,
13121313
aggregate_functions: HashMap::new(),
13131314
window_functions: HashMap::new(),
13141315
serializer_registry: Arc::new(EmptySerializerRegistry),
@@ -1318,19 +1319,7 @@ impl SessionState {
13181319
table_factories,
13191320
}
13201321
}
1321-
/// Returns new [`SessionState`] using the provided
1322-
/// [`SessionConfig`] and [`RuntimeEnv`].
1323-
#[deprecated(
1324-
since = "32.0.0",
1325-
note = "Use SessionState::new_with_config_rt_and_catalog_list"
1326-
)]
1327-
pub fn with_config_rt_and_catalog_list(
1328-
config: SessionConfig,
1329-
runtime: Arc<RuntimeEnv>,
1330-
catalog_list: Arc<dyn CatalogList>,
1331-
) -> Self {
1332-
Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list)
1333-
}
1322+
13341323
fn register_default_schema(
13351324
config: &SessionConfig,
13361325
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,

datafusion/core/src/prelude.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
//! ```
2727
2828
pub use crate::dataframe::DataFrame;
29-
pub use crate::execution::context::{
30-
MutableFunctionRegistry, SQLOptions, SessionConfig, SessionContext,
31-
};
29+
pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext};
3230
pub use crate::execution::options::{
3331
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
3432
};

datafusion/core/tests/user_defined/user_defined_aggregates.rs

-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use datafusion::{
3535
},
3636
assert_batches_eq,
3737
error::Result,
38-
execution::MutableFunctionRegistry,
3938
logical_expr::{
4039
AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature,
4140
StateTypeFunction, TypeSignature, Volatility,

datafusion/core/tests/user_defined/user_defined_window_functions.rs

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
3131
use arrow_schema::DataType;
3232
use datafusion::{assert_batches_eq, prelude::SessionContext};
3333
use datafusion_common::{Result, ScalarValue};
34-
use datafusion_execution::MutableFunctionRegistry;
3534
use datafusion_expr::{
3635
function::PartitionEvaluatorFactory, PartitionEvaluator, ReturnTypeFunction,
3736
Signature, Volatility, WindowUDF,

datafusion/execution/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ mod stream;
2828
mod task;
2929

3030
pub use disk_manager::DiskManager;
31-
pub use registry::{FunctionRegistry, MutableFunctionRegistry};
31+
pub use registry::FunctionRegistry;
3232
pub use stream::{RecordBatchStream, SendableRecordBatchStream};
3333
pub use task::TaskContext;

datafusion/execution/src/registry.rs

-30
Original file line numberDiff line numberDiff line change
@@ -36,36 +36,6 @@ pub trait FunctionRegistry {
3636
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
3737
}
3838

39-
/// A Function registry that can have functions registered
40-
pub trait MutableFunctionRegistry {
41-
/// Registers a scalar UDF within this context.
42-
///
43-
/// Note in SQL queries, function names are looked up using
44-
/// lowercase unless the query uses quotes. For example,
45-
///
46-
/// - `SELECT MY_FUNC(x)...` will look for a function named `"my_func"`
47-
/// - `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"`
48-
fn register_udf(&self, f: ScalarUDF);
49-
50-
/// Registers an aggregate UDF within this context.
51-
///
52-
/// Note in SQL queries, aggregate names are looked up using
53-
/// lowercase unless the query uses quotes. For example,
54-
///
55-
/// - `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
56-
/// - `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
57-
fn register_udaf(&self, f: AggregateUDF);
58-
59-
/// Registers a window UDF within this context.
60-
///
61-
/// Note in SQL queries, window function names are looked up using
62-
/// lowercase unless the query uses quotes. For example,
63-
///
64-
/// - `SELECT MY_UDWF(x)...` will look for a window function named `"my_udwf"`
65-
/// - `SELECT "my_UDWF"(x)` will look for a window function named `"my_UDWF"`
66-
fn register_udwf(&self, f: WindowUDF);
67-
}
68-
6939
/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
7040
pub trait SerializerRegistry: Send + Sync {
7141
/// Serialize this node to a byte array. This serialization should not include

datafusion/functions/src/encoding/mod.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ mod inner;
1919

2020
use datafusion_common::arrow::datatypes::DataType;
2121
use datafusion_common::{plan_err, DataFusionError, Result};
22-
use datafusion_execution::registry::MutableFunctionRegistry;
2322
use datafusion_expr::TypeSignature::*;
2423
use datafusion_expr::{
2524
ColumnarValue, FunctionImplementation, ScalarUDF, Signature, Volatility,
2625
};
26+
use std::collections::HashMap;
2727
use std::sync::{Arc, OnceLock};
2828
use DataType::*;
2929

@@ -39,12 +39,14 @@ pub fn decode_udf() -> ScalarUDF {
3939
ScalarUDF::new_from_impl(Arc::new(DecodeFunc {}))
4040
}
4141

42-
/// Registers the `encode` and `decode` functions with the function registry
43-
pub fn register(registry: &dyn MutableFunctionRegistry) -> Result<()> {
44-
registry.register_udf(encode_udf());
45-
registry.register_udf(decode_udf());
42+
fn insert(registry: &mut HashMap<String, Arc<ScalarUDF>>, udf: ScalarUDF) {
43+
registry.insert(udf.name().to_string(), Arc::new(udf));
44+
}
4645

47-
Ok(())
46+
/// Registers the `encode` and `decode` functions with the function registry
47+
pub fn register(registry: &mut HashMap<String, Arc<ScalarUDF>>) {
48+
insert(registry, encode_udf());
49+
insert(registry, decode_udf());
4850
}
4951

5052
struct EncodeFunc {}
@@ -95,7 +97,7 @@ static DECODE_SIGNATURE: OnceLock<Signature> = OnceLock::new();
9597

9698
impl FunctionImplementation for DecodeFunc {
9799
fn name(&self) -> &str {
98-
"encode"
100+
"decode"
99101
}
100102

101103
fn signature(&self) -> &Signature {

datafusion/functions/src/lib.rs

+9
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,13 @@
1717

1818
//! Several packages of built in functions for DataFusion
1919
20+
use datafusion_expr::ScalarUDF;
21+
use std::collections::HashMap;
22+
use std::sync::Arc;
23+
2024
pub mod encoding;
25+
26+
/// Registers all "built in" functions from this crate with the provided registry
27+
pub fn register_all(registry: &mut HashMap<String, Arc<ScalarUDF>>) {
28+
encoding::register(registry);
29+
}

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use datafusion::datasource::provider::TableProviderFactory;
3131
use datafusion::datasource::TableProvider;
3232
use datafusion::execution::context::SessionState;
3333
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
34-
use datafusion::execution::MutableFunctionRegistry;
3534
use datafusion::physical_plan::functions::make_scalar_function;
3635
use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, SessionContext};
3736
use datafusion::test_util::{TestTableFactory, TestTableProvider};

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use datafusion::datasource::listing::PartitionedFile;
2525
use datafusion::datasource::object_store::ObjectStoreUrl;
2626
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
2727
use datafusion::execution::context::ExecutionProps;
28-
use datafusion::execution::MutableFunctionRegistry;
2928
use datafusion::logical_expr::{
3029
create_udf, BuiltinScalarFunction, JoinType, Operator, Volatility,
3130
};

datafusion/proto/tests/cases/serialize.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use arrow::array::ArrayRef;
2121
use arrow::datatypes::DataType;
2222

23-
use datafusion::execution::{FunctionRegistry, MutableFunctionRegistry};
23+
use datafusion::execution::FunctionRegistry;
2424
use datafusion::physical_plan::functions::make_scalar_function;
2525
use datafusion::prelude::SessionContext;
2626
use datafusion_expr::{col, create_udf, lit};

0 commit comments

Comments
 (0)