diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e320b2ffc835..b6439f13f7fd 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1431,6 +1431,7 @@ dependencies = [ "rand", "regex", "sha2", + "twox-hash", "unicode-segmentation", "uuid", ] @@ -3407,9 +3408,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" [[package]] name = "rustc_version" @@ -4190,6 +4191,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", + "rand", "static_assertions", ] diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index db3e6838f6a5..f6336cd18af8 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -35,6 +35,7 @@ workspace = true # enable core functions core_expressions = [] crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] +hash_expressions = ["twox-hash"] # enable datetime functions datetime_expressions = [] # Enable encoding by default so the doctests work. In general don't automatically enable all packages. @@ -46,6 +47,7 @@ default = [ "regex_expressions", "string_expressions", "unicode_expressions", + "hash_expressions", ] # enable encode/decode functions encoding_expressions = ["base64", "hex"] @@ -85,6 +87,7 @@ md-5 = { version = "^0.10.0", optional = true } rand = { workspace = true } regex = { workspace = true, optional = true } sha2 = { version = "^0.10.1", optional = true } +twox-hash = { version = "1.6.3", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } uuid = { version = "1.7", features = ["v4"], optional = true } diff --git a/datafusion/functions/src/hash/mod.rs b/datafusion/functions/src/hash/mod.rs new file mode 100644 index 000000000000..1268b463767c --- /dev/null +++ b/datafusion/functions/src/hash/mod.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! "xxhash" DataFusion functions + +use datafusion_expr::ScalarUDF; +use std::sync::Arc; + +pub mod xxhash; +make_udf_function!(xxhash::XxHash32Func, xxhash32); +make_udf_function!(xxhash::XxHash64Func, xxhash64); + +pub mod expr_fn { + export_functions!( + ( + xxhash32, + "Computes the XXHash32 hash of a binary string.", + input + ), + ( + xxhash64, + "Computes the XXHash64 hash of a binary string.", + input + ) + ); +} + +/// Returns all DataFusion functions defined in this package +pub fn functions() -> Vec> { + vec![xxhash32(), xxhash64()] +} diff --git a/datafusion/functions/src/hash/xxhash.rs b/datafusion/functions/src/hash/xxhash.rs new file mode 100644 index 000000000000..c01646b62db8 --- /dev/null +++ b/datafusion/functions/src/hash/xxhash.rs @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, AsArray, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray, + StringBuilder, +}; +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::{Binary, Int64, LargeBinary, LargeUtf8, Utf8, Utf8View}; +use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use datafusion_macros::user_doc; +use std::any::Any; +use std::hash::Hasher; +use std::sync::Arc; +use twox_hash::{XxHash32, XxHash64}; + +#[user_doc( + doc_section(label = "Hashing Functions"), + description = "Computes the XXHash32 hash of a binary string.", + syntax_example = "xxhash32(expression [,seed])", + sql_example = r#"```sql +> select xxhash32('foo'); ++-------------------------------------------+ +| xxhash32(Utf8("foo")) | ++-------------------------------------------+ +| | ++-------------------------------------------+ +```"#, + standard_argument(name = "expression", prefix = "String") +)] +#[derive(Debug)] +pub struct XxHash32Func { + signature: Signature, +} + +impl Default for XxHash32Func { + fn default() -> Self { + Self::new() + } +} + +impl XxHash32Func { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![Utf8View]), + TypeSignature::Exact(vec![Utf8]), + TypeSignature::Exact(vec![LargeUtf8]), + TypeSignature::Exact(vec![Binary]), + TypeSignature::Exact(vec![LargeBinary]), + TypeSignature::Exact(vec![Utf8View, Int64]), + TypeSignature::Exact(vec![Utf8, Int64]), + TypeSignature::Exact(vec![LargeUtf8, Int64]), + TypeSignature::Exact(vec![Binary, Int64]), + TypeSignature::Exact(vec![LargeBinary, Int64]), + ], + Volatility::Immutable, + ), + } + } + + pub fn hash_scalar(&self, value: &[u8]) -> Result { + // let value_str = to_string_from_scalar(value)?; + hash_value(value, XxHash32::default(), HashType::U32) + } +} + +impl ScalarUDFImpl for XxHash32Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxhash32" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + Ok(match &arg_types[0] { + LargeUtf8 | LargeBinary => Utf8, + Utf8View | Utf8 | Binary => Utf8, + Null => Null, + Dictionary(_, t) => match **t { + LargeUtf8 | LargeBinary => Utf8, + Utf8 | Binary => Utf8, + Null => Null, + _ => { + return plan_err!( + "The xxhash32 can only accept Utf8, Utf8View, LargeUtf8, Binary and LargeBinary but got {:?}", + **t + ); + } + }, + other => { + return plan_err!( + "The xxhash32 can only accept Utf8, Utf8View, LargeUtf8, Binary and LargeBinary but got {other}" + ); + } + }) + } + + fn invoke_batch( + &self, + args: &[ColumnarValue], + _number_rows: usize, + ) -> Result { + let input_data = &args[0]; + + let seed = if args.len() > 1 { + if let ColumnarValue::Scalar(ScalarValue::Int64(Some(seed))) = &args[1] { + if *seed >= 0 && *seed <= u32::MAX as i64 { + *seed as u32 + } else { + return exec_err!("Seed value out of range for UInt32: {}", seed); + } + } else { + let actual_type = format!("{:?}", &args[1]); + return exec_err!("Expected a Int64 seed value, but got {}", actual_type); + } + } else { + 0 // Default seed value + }; + + let result = match input_data { + ColumnarValue::Array(array) => { + let hash_results = + process_array(array, XxHash32::with_seed(seed), HashType::U32)?; + let hash_array = StringArray::from(hash_results); + Arc::new(hash_array) as Arc + } + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(None) + | ScalarValue::Utf8View(None) + | ScalarValue::LargeUtf8(None) => { + let hash_array = StringArray::from(vec![String::new()]); + Arc::new(hash_array) as Arc + } + ScalarValue::Utf8(Some(ref v)) + | ScalarValue::Utf8View(Some(ref v)) + | ScalarValue::LargeUtf8(Some(ref v)) => { + let hash_result = hash_value( + v.as_bytes(), + XxHash32::with_seed(seed), + HashType::U32, + )?; + let hash_array = StringArray::from(vec![hash_result]); + Arc::new(hash_array) as Arc + } + ScalarValue::Binary(Some(ref v)) + | ScalarValue::LargeBinary(Some(ref v)) => { + let hash_result = + hash_value(v, XxHash32::with_seed(seed), HashType::U32)?; + let hash_array = StringArray::from(vec![hash_result]); + Arc::new(hash_array) as Arc + } + _ => { + return internal_err!("Unsupported scalar type: {:?}", scalar); + } + }, + }; + + Ok(ColumnarValue::Array(result)) + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +#[user_doc( + doc_section(label = "Hashing Functions"), + description = "Computes the XXHash64 hash of a binary string.", + syntax_example = "xxhash64(expression [,seed])", + sql_example = r#"```sql +> select xxhash64('foo'); ++-------------------------------------------+ +| xxhash64(Utf8("foo")) | ++-------------------------------------------+ +| | ++-------------------------------------------+ +```"#, + standard_argument(name = "expression", prefix = "String") +)] +#[derive(Debug)] +pub struct XxHash64Func { + signature: Signature, +} + +impl Default for XxHash64Func { + fn default() -> Self { + Self::new() + } +} + +impl XxHash64Func { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![Utf8View]), + TypeSignature::Exact(vec![Utf8]), + TypeSignature::Exact(vec![LargeUtf8]), + TypeSignature::Exact(vec![Binary]), + TypeSignature::Exact(vec![LargeBinary]), + TypeSignature::Exact(vec![Utf8View, Int64]), + TypeSignature::Exact(vec![Utf8, Int64]), + TypeSignature::Exact(vec![LargeUtf8, Int64]), + TypeSignature::Exact(vec![Binary, Int64]), + TypeSignature::Exact(vec![LargeBinary, Int64]), + ], + Volatility::Immutable, + ), + } + } + + pub fn hash_scalar(&self, value: &[u8]) -> Result { + // let value_str = to_string_from_scalar(value)?; + hash_value(value, XxHash64::default(), HashType::U64) + } +} + +impl ScalarUDFImpl for XxHash64Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxhash64" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + Ok(match &arg_types[0] { + LargeUtf8 | LargeBinary => Utf8, + Utf8View | Utf8 | Binary => Utf8, + Null => Null, + Dictionary(_, t) => match **t { + LargeUtf8 | LargeBinary => Utf8, + Utf8 | Binary => Utf8, + Null => Null, + _ => { + return plan_err!( + "The xxhash64 can only accept Utf8, Utf8View, LargeUtf8, Binary and LargeBinary but got {:?}", + **t + ); + } + }, + other => { + return plan_err!( + "The xxhash64 can only accept Utf8, Utf8View, LargeUtf8, Binary and LargeBinary but {other}" + ); + } + }) + } + + fn invoke_batch( + &self, + args: &[ColumnarValue], + _number_rows: usize, + ) -> Result { + let input_data = &args[0]; + + let seed = if args.len() > 1 { + if let ColumnarValue::Scalar(ScalarValue::Int64(Some(seed))) = &args[1] { + if *seed >= 0 { + *seed as u64 + } else { + return exec_err!("Seed value out of range for UInt64: {}", seed); + } + } else { + let actual_type = format!("{:?}", &args[1]); + return exec_err!("Expected a Int64 seed value, but got {}", actual_type); + } + } else { + 0 // Default seed value + }; + + let result = match input_data { + ColumnarValue::Array(array) => { + let hash_results = + process_array(array, XxHash64::with_seed(seed), HashType::U64)?; + let hash_array = StringArray::from(hash_results); + Arc::new(hash_array) as Arc + } + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(None) + | ScalarValue::Utf8View(None) + | ScalarValue::LargeUtf8(None) => { + let hash_array = StringArray::from(vec![String::new()]); + Arc::new(hash_array) as Arc + } + ScalarValue::Utf8(Some(ref v)) + | ScalarValue::Utf8View(Some(ref v)) + | ScalarValue::LargeUtf8(Some(ref v)) => { + let hash_result = hash_value( + v.as_bytes(), + XxHash64::with_seed(seed), + HashType::U64, + )?; + let hash_array = StringArray::from(vec![hash_result]); + Arc::new(hash_array) as Arc + } + ScalarValue::Binary(Some(ref v)) + | ScalarValue::LargeBinary(Some(ref v)) => { + let hash_result = + hash_value(v, XxHash64::with_seed(seed), HashType::U64)?; + let hash_array = StringArray::from(vec![hash_result]); + Arc::new(hash_array) as Arc + } + _ => { + let actual_type = format!("{:?}", scalar); + return exec_err!("Unsupported scalar type: {}", actual_type); + } + }, + }; + + Ok(ColumnarValue::Array(result)) + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +// Helper functions + +#[derive(Clone)] +pub enum HashType { + U32, + U64, +} + +fn hash_value( + value_bytes: &[u8], + mut hasher: T, + hash_type: HashType, +) -> Result { + hasher.write(value_bytes); + let hash = hasher.finish(); + match hash_type { + HashType::U32 => { + let hash_u32 = hash as u32; + Ok(hex::encode(hash_u32.to_be_bytes())) + } + HashType::U64 => { + let hash_u64 = hash; + Ok(hex::encode(hash_u64.to_be_bytes())) + } + } +} + +fn process_array( + array: &dyn Array, + mut hasher: T, + hash_type: HashType, +) -> Result { + let mut hash_results = StringBuilder::new(); + + match array.data_type() { + Utf8View => { + let string_view_array = array.as_string_view(); + for i in 0..array.len() { + if array.is_null(i) { + hash_results.append_value(String::new()); + continue; + } + let value = string_view_array.value(i); + hash_results.append_value(hash_value( + value.as_bytes(), + &mut hasher, + hash_type.clone(), + )?); + } + } + + Utf8 => { + let string_array = array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_null(i) { + hash_results.append_value(String::new()); + continue; + } + let value = string_array.value(i); + hash_results.append_value(hash_value( + value.as_bytes(), + &mut hasher, + hash_type.clone(), + )?); + } + } + + LargeUtf8 => { + let large_string_array = + array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_null(i) { + hash_results.append_value(String::new()); + continue; + } + let value = large_string_array.value(i); + hash_results.append_value(hash_value( + value.as_bytes(), + &mut hasher, + hash_type.clone(), + )?); + } + } + + Binary | LargeBinary => { + let binary_array: &dyn Array = if array.data_type() == &Binary { + array.as_any().downcast_ref::().unwrap() + } else { + array.as_any().downcast_ref::().unwrap() + }; + for i in 0..array.len() { + if array.is_null(i) { + hash_results.append_value(String::new()); + continue; + } + let value = if let Some(binary_array) = + binary_array.as_any().downcast_ref::() + { + binary_array.value(i) + } else { + binary_array + .as_any() + .downcast_ref::() + .unwrap() + .value(i) + }; + hash_results.append_value(hash_value( + value, + &mut hasher, + hash_type.clone(), + )?); + } + } + + DataType::Null => { + for _ in 0..array.len() { + hash_results.append_value(String::new()); + } + } + _ => { + let actual_type = format!("{:?}", array.data_type()); + return exec_err!("Unsupported array type: {}", actual_type); + } + } + + Ok(hash_results.finish()) +} diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index ffb0ab5f51ec..51c295cb8b70 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -133,6 +133,10 @@ make_stub_package!(crypto, "crypto_expressions"); pub mod unicode; make_stub_package!(unicode, "unicode_expressions"); +#[cfg(feature = "hash_expressions")] +pub mod hash; +make_stub_package!(hash, "hash_expressions"); + #[cfg(any(feature = "datetime_expressions", feature = "unicode_expressions"))] pub mod planner; @@ -150,6 +154,8 @@ pub mod expr_fn { pub use super::datetime::expr_fn::*; #[cfg(feature = "encoding_expressions")] pub use super::encoding::expr_fn::*; + #[cfg(feature = "hash_expressions")] + pub use super::hash::expr_fn::*; #[cfg(feature = "math_expressions")] pub use super::math::expr_fn::*; #[cfg(feature = "regex_expressions")] @@ -171,6 +177,7 @@ pub fn all_default_functions() -> Vec> { .chain(crypto::functions()) .chain(unicode::functions()) .chain(string::functions()) + .chain(hash::functions()) .collect::>() } diff --git a/datafusion/sqllogictest/test_files/hash.slt b/datafusion/sqllogictest/test_files/hash.slt new file mode 100644 index 000000000000..bc9048565045 --- /dev/null +++ b/datafusion/sqllogictest/test_files/hash.slt @@ -0,0 +1,129 @@ +# Test xxhash32 with string input +query T +SELECT xxhash32('foo') AS hash_value; +---- +e20f0dd9 + +# Test xxhash32 with array input +query T +SELECT xxhash32(column1) AS xxhash32_result FROM ( SELECT UNNEST(ARRAY[1, 2, 3, 4, 5]) AS column1 ) AS subquery; +---- +b6ecc8b2 +d43589af +b6855437 +01543429 +b30d56b4 + +# Test xxhash32 with Utf8View array input +query T +WITH input_data AS (SELECT arrow_cast(column1, 'Utf8View') as utf8view_value FROM (VALUES ('foobar1'),('foobar2'),('foobar3')) AS t(column1)) SELECT xxhash32(utf8view_value) as hash_value FROM input_data; +---- +2b0d1874 +9925f907 +df748f36 + +query T +SELECT xxhash32(NULL) AS hash_value; +---- +(empty) + +# Test xxhash32 with string input and seed of 1 +query T +SELECT xxhash32('foo', 1) AS hash_value; +---- +1742761f + +# Test xxhash32 with array input and seed of 1 +query T +SELECT xxhash32(column1, 1) AS xxhash32_result FROM ( SELECT UNNEST(ARRAY[1, 2, 3, 4, 5]) AS column1 ) AS subquery; +---- +642684c5 +df0e3329 +99280b78 +e17e2fa9 +97a348b6 + +# Test xxhash32 with null input and seed of 1 +query T +SELECT xxhash32(NULL, 1) AS hash_value; +---- +(empty) + +# Test xxhash32 with binary input +query T +SELECT xxhash32(X'1') AS hash_value; +---- +3892f731 + + +# Test xxhash32 with binary input and seed of 1 +query T +SELECT xxhash32('foo'::BYTEA, 1) AS hash_value; +---- +1742761f + +# Tests for xxhash64 + +# Test xxhash64 with string input +query T +SELECT xxhash64('foo') AS hash_value; +---- +33bf00a859c4ba3f + +# Test xxhash64 with array input +query T +SELECT xxhash64(column1) AS xxhash64_result FROM ( SELECT UNNEST(ARRAY[1, 2, 3, 4, 5]) AS column1 ) AS subquery; +---- +b7b41276360564d4 +5460f49adbe7aba2 +3c697d223fa7e885 +d8316e61d84f6ba4 +c6f2d2dd0ad64fb6 + +# Test xxhash64 with Utf8View array input +query T +WITH input_data AS (SELECT arrow_cast(column1, 'Utf8View') as utf8view_value FROM (VALUES ('foobar1'),('foobar2'),('foobar3')) AS t(column1)) SELECT xxhash64(utf8view_value) as hash_value FROM input_data; +---- +36425528f43b829c +b24f52e2956da1a9 +5e75bd6e3aac89a9 + +query T +SELECT xxhash64(NULL) AS hash_value; +---- +(empty) + +# Test xxhash64 with string input and seed of 1 +query T +SELECT xxhash64('foo', 1) AS hash_value; +---- +c34823c5bf4f2cbd + +# Test xxhash64 with array input and seed of 1 +query T +SELECT xxhash64(column1, 1) AS xxhash64_result FROM ( SELECT UNNEST(ARRAY[1, 2, 3, 4, 5]) AS column1 ) AS subquery; +---- +192aba5fd13fb67d +75b53fdb7dce12fa +4b805d862c3b7497 +e9feb3476d8788cb +8b4dc636e784c7e5 + +# Test xxhash64 with null input and seed of 1 +query T +SELECT xxhash64(NULL, 1) AS hash_value; +---- +(empty) + +# Test xxhash64 with binary input +query T +SELECT xxhash64(X'1') AS hash_value; +---- +8a4127811b21e730 + + +# Test xxhash64 with binary input and seed of 1 +query T +SELECT xxhash64('foo'::BYTEA, 1) AS hash_value; +---- +c34823c5bf4f2cbd \ No newline at end of file diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index b769b8b7bdb0..3e57b60eb6f2 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -4191,6 +4191,8 @@ SELECT map_values(map([100, 5], [42, 43])); - [sha256](#sha256) - [sha384](#sha384) - [sha512](#sha512) +- [xxhash32](#xxhash32) +- [xxhash64](#xxhash64) ### `digest` @@ -4339,6 +4341,52 @@ sha512(expression) +-------------------------------------------+ ``` +### `xxhash32` + +Computes the XXHash32 hash of a binary string. + +``` +xxhash32(expression [,seed]) +``` + +#### Arguments + +- **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of operators. + +#### Example + +```sql +> select xxhash32('foo'); ++-------------------------------------------+ +| xxhash32(Utf8("foo")) | ++-------------------------------------------+ +| | ++-------------------------------------------+ +``` + +### `xxhash64` + +Computes the XXHash64 hash of a binary string. + +``` +xxhash64(expression [,seed]) +``` + +#### Arguments + +- **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of operators. + +#### Example + +```sql +> select xxhash64('foo'); ++-------------------------------------------+ +| xxhash64(Utf8("foo")) | ++-------------------------------------------+ +| | ++-------------------------------------------+ +``` + ## Other Functions - [arrow_cast](#arrow_cast)