Skip to content

Commit e53c9a0

Browse files
committed
port digest
1 parent b7f4772 commit e53c9a0

File tree

5 files changed

+436
-3
lines changed

5 files changed

+436
-3
lines changed

datafusion/functions/Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ core_expressions = []
3434
# enable datetime functions
3535
datetime_expressions = []
3636
# Enable encoding by default so the doctests work. In general don't automatically enable all packages.
37-
default = ["core_expressions", "datetime_expressions", "encoding_expressions", "math_expressions", "regex_expressions"]
37+
default = ["core_expressions", "datetime_expressions", "encoding_expressions", "math_expressions", "regex_expressions", "crypto_expressions"]
3838
# enable encode/decode functions
3939
encoding_expressions = ["base64", "hex"]
4040
# enable math functions
4141
math_expressions = []
4242
# enable regular expressions
4343
regex_expressions = ["regex"]
44-
44+
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
4545
[lib]
4646
name = "datafusion_functions"
4747
path = "src/lib.rs"
@@ -61,7 +61,10 @@ hex = { version = "0.4", optional = true }
6161
itertools = { workspace = true }
6262
log = { workspace = true }
6363
regex = { version = "1.8", optional = true }
64-
64+
blake2 = { version = "^0.10.2", optional = true }
65+
blake3 = { version = "1.0", optional = true }
66+
md-5 = { version = "^0.10.0", optional = true }
67+
sha2 = { version = "^0.10.1", optional = true }
6568
[dev-dependencies]
6669
criterion = "0.5"
6770
rand = { workspace = true }
Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
use arrow::array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait};
2+
use arrow::datatypes::DataType;
3+
use blake2::{Blake2b512, Blake2s256, Digest};
4+
use blake3::Hasher as Blake3;
5+
use datafusion_common::plan_err;
6+
use datafusion_common::{
7+
cast::{as_generic_binary_array, as_generic_string_array},
8+
exec_err, internal_err, DataFusionError, Result, ScalarValue,
9+
};
10+
use datafusion_expr::ColumnarValue;
11+
use md5::Md5;
12+
use sha2::{Sha224, Sha256, Sha384, Sha512};
13+
use std::fmt::{self, Write};
14+
use std::str::FromStr;
15+
use std::sync::Arc;
16+
17+
macro_rules! define_digest_function {
18+
($NAME: ident, $METHOD: ident, $DOC: expr) => {
19+
#[doc = $DOC]
20+
pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
21+
if args.len() != 1 {
22+
return exec_err!(
23+
"{:?} args were supplied but {} takes exactly one argument",
24+
args.len(),
25+
DigestAlgorithm::$METHOD.to_string()
26+
);
27+
}
28+
digest_process(&args[0], DigestAlgorithm::$METHOD)
29+
}
30+
};
31+
}
32+
define_digest_function!(
33+
sha224,
34+
Sha224,
35+
"computes sha224 hash digest of the given input"
36+
);
37+
define_digest_function!(
38+
sha256,
39+
Sha256,
40+
"computes sha256 hash digest of the given input"
41+
);
42+
define_digest_function!(
43+
sha384,
44+
Sha384,
45+
"computes sha384 hash digest of the given input"
46+
);
47+
define_digest_function!(
48+
sha512,
49+
Sha512,
50+
"computes sha512 hash digest of the given input"
51+
);
52+
define_digest_function!(
53+
blake2b,
54+
Blake2b,
55+
"computes blake2b hash digest of the given input"
56+
);
57+
define_digest_function!(
58+
blake2s,
59+
Blake2s,
60+
"computes blake2s hash digest of the given input"
61+
);
62+
define_digest_function!(
63+
blake3,
64+
Blake3,
65+
"computes blake3 hash digest of the given input"
66+
);
67+
68+
macro_rules! digest_to_scalar {
69+
($METHOD: ident, $INPUT:expr) => {{
70+
ScalarValue::Binary($INPUT.as_ref().map(|v| {
71+
let mut digest = $METHOD::default();
72+
digest.update(v);
73+
digest.finalize().as_slice().to_vec()
74+
}))
75+
}};
76+
}
77+
78+
#[derive(Debug, Copy, Clone)]
79+
pub enum DigestAlgorithm {
80+
Md5,
81+
Sha224,
82+
Sha256,
83+
Sha384,
84+
Sha512,
85+
Blake2s,
86+
Blake2b,
87+
Blake3,
88+
}
89+
90+
/// Digest computes a binary hash of the given data, accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`].
91+
/// Second argument is the algorithm to use.
92+
/// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512.
93+
pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
94+
if args.len() != 2 {
95+
return exec_err!(
96+
"{:?} args were supplied but digest takes exactly two arguments",
97+
args.len()
98+
);
99+
}
100+
let digest_algorithm = match &args[1] {
101+
ColumnarValue::Scalar(scalar) => match scalar {
102+
ScalarValue::Utf8(Some(method)) | ScalarValue::LargeUtf8(Some(method)) => {
103+
method.parse::<DigestAlgorithm>()
104+
}
105+
other => exec_err!("Unsupported data type {other:?} for function digest"),
106+
},
107+
ColumnarValue::Array(_) => {
108+
internal_err!("Digest using dynamically decided method is not yet supported")
109+
}
110+
}?;
111+
digest_process(&args[0], digest_algorithm)
112+
}
113+
impl FromStr for DigestAlgorithm {
114+
type Err = DataFusionError;
115+
fn from_str(name: &str) -> Result<DigestAlgorithm> {
116+
Ok(match name {
117+
"md5" => Self::Md5,
118+
"sha224" => Self::Sha224,
119+
"sha256" => Self::Sha256,
120+
"sha384" => Self::Sha384,
121+
"sha512" => Self::Sha512,
122+
"blake2b" => Self::Blake2b,
123+
"blake2s" => Self::Blake2s,
124+
"blake3" => Self::Blake3,
125+
_ => {
126+
let options = [
127+
Self::Md5,
128+
Self::Sha224,
129+
Self::Sha256,
130+
Self::Sha384,
131+
Self::Sha512,
132+
Self::Blake2s,
133+
Self::Blake2b,
134+
Self::Blake3,
135+
]
136+
.iter()
137+
.map(|i| i.to_string())
138+
.collect::<Vec<_>>()
139+
.join(", ");
140+
return plan_err!(
141+
"There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
142+
);
143+
}
144+
})
145+
}
146+
}
147+
impl fmt::Display for DigestAlgorithm {
148+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
149+
write!(f, "{}", format!("{self:?}").to_lowercase())
150+
}
151+
}
152+
// /// computes md5 hash digest of the given input
153+
// pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
154+
// if args.len() != 1 {
155+
// return exec_err!(
156+
// "{:?} args were supplied but {} takes exactly one argument",
157+
// args.len(),
158+
// DigestAlgorithm::Md5
159+
// );
160+
// }
161+
// let value = digest_process(&args[0], DigestAlgorithm::Md5)?;
162+
// // md5 requires special handling because of its unique utf8 return type
163+
// Ok(match value {
164+
// ColumnarValue::Array(array) => {
165+
// let binary_array = as_binary_array(&array)?;
166+
// let string_array: StringArray = binary_array
167+
// .iter()
168+
// .map(|opt| opt.map(hex_encode::<_>))
169+
// .collect();
170+
// ColumnarValue::Array(Arc::new(string_array))
171+
// }
172+
// ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
173+
// ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>)))
174+
// }
175+
// _ => return exec_err!("Impossibly got invalid results from digest"),
176+
// })
177+
// }
178+
179+
/// this function exists so that we do not need to pull in the crate hex. it is only used by md5
180+
/// function below
181+
#[inline]
182+
fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
183+
let mut s = String::with_capacity(data.as_ref().len() * 2);
184+
for b in data.as_ref() {
185+
// Writing to a string never errors, so we can unwrap here.
186+
write!(&mut s, "{b:02x}").unwrap();
187+
}
188+
s
189+
}
190+
pub fn utf8_or_binary_to_binary_type(
191+
arg_type: &DataType,
192+
name: &str,
193+
) -> Result<DataType> {
194+
Ok(match arg_type {
195+
DataType::LargeUtf8
196+
| DataType::Utf8
197+
| DataType::Binary
198+
| DataType::LargeBinary => DataType::Binary,
199+
DataType::Null => DataType::Null,
200+
_ => {
201+
return plan_err!(
202+
"The {name:?} function can only accept strings or binary arrays."
203+
);
204+
}
205+
})
206+
}
207+
macro_rules! digest_to_array {
208+
($METHOD:ident, $INPUT:expr) => {{
209+
let binary_array: BinaryArray = $INPUT
210+
.iter()
211+
.map(|x| {
212+
x.map(|x| {
213+
let mut digest = $METHOD::default();
214+
digest.update(x);
215+
digest.finalize()
216+
})
217+
})
218+
.collect();
219+
Arc::new(binary_array)
220+
}};
221+
}
222+
impl DigestAlgorithm {
223+
/// digest an optional string to its hash value, null values are returned as is
224+
pub fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
225+
ColumnarValue::Scalar(match self {
226+
Self::Md5 => digest_to_scalar!(Md5, value),
227+
Self::Sha224 => digest_to_scalar!(Sha224, value),
228+
Self::Sha256 => digest_to_scalar!(Sha256, value),
229+
Self::Sha384 => digest_to_scalar!(Sha384, value),
230+
Self::Sha512 => digest_to_scalar!(Sha512, value),
231+
Self::Blake2b => digest_to_scalar!(Blake2b512, value),
232+
Self::Blake2s => digest_to_scalar!(Blake2s256, value),
233+
Self::Blake3 => ScalarValue::Binary(value.map(|v| {
234+
let mut digest = Blake3::default();
235+
digest.update(v);
236+
Blake3::finalize(&digest).as_bytes().to_vec()
237+
})),
238+
})
239+
}
240+
241+
/// digest a binary array to their hash values
242+
pub fn digest_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
243+
where
244+
T: OffsetSizeTrait,
245+
{
246+
let input_value = as_generic_binary_array::<T>(value)?;
247+
let array: ArrayRef = match self {
248+
Self::Md5 => digest_to_array!(Md5, input_value),
249+
Self::Sha224 => digest_to_array!(Sha224, input_value),
250+
Self::Sha256 => digest_to_array!(Sha256, input_value),
251+
Self::Sha384 => digest_to_array!(Sha384, input_value),
252+
Self::Sha512 => digest_to_array!(Sha512, input_value),
253+
Self::Blake2b => digest_to_array!(Blake2b512, input_value),
254+
Self::Blake2s => digest_to_array!(Blake2s256, input_value),
255+
Self::Blake3 => {
256+
let binary_array: BinaryArray = input_value
257+
.iter()
258+
.map(|opt| {
259+
opt.map(|x| {
260+
let mut digest = Blake3::default();
261+
digest.update(x);
262+
Blake3::finalize(&digest).as_bytes().to_vec()
263+
})
264+
})
265+
.collect();
266+
Arc::new(binary_array)
267+
}
268+
};
269+
Ok(ColumnarValue::Array(array))
270+
}
271+
272+
/// digest a string array to their hash values
273+
pub fn digest_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
274+
where
275+
T: OffsetSizeTrait,
276+
{
277+
let input_value = as_generic_string_array::<T>(value)?;
278+
let array: ArrayRef = match self {
279+
Self::Md5 => digest_to_array!(Md5, input_value),
280+
Self::Sha224 => digest_to_array!(Sha224, input_value),
281+
Self::Sha256 => digest_to_array!(Sha256, input_value),
282+
Self::Sha384 => digest_to_array!(Sha384, input_value),
283+
Self::Sha512 => digest_to_array!(Sha512, input_value),
284+
Self::Blake2b => digest_to_array!(Blake2b512, input_value),
285+
Self::Blake2s => digest_to_array!(Blake2s256, input_value),
286+
Self::Blake3 => {
287+
let binary_array: BinaryArray = input_value
288+
.iter()
289+
.map(|opt| {
290+
opt.map(|x| {
291+
let mut digest = Blake3::default();
292+
digest.update(x.as_bytes());
293+
Blake3::finalize(&digest).as_bytes().to_vec()
294+
})
295+
})
296+
.collect();
297+
Arc::new(binary_array)
298+
}
299+
};
300+
Ok(ColumnarValue::Array(array))
301+
}
302+
}
303+
pub fn digest_process(
304+
value: &ColumnarValue,
305+
digest_algorithm: DigestAlgorithm,
306+
) -> Result<ColumnarValue> {
307+
match value {
308+
ColumnarValue::Array(a) => match a.data_type() {
309+
DataType::Utf8 => digest_algorithm.digest_utf8_array::<i32>(a.as_ref()),
310+
DataType::LargeUtf8 => digest_algorithm.digest_utf8_array::<i64>(a.as_ref()),
311+
DataType::Binary => digest_algorithm.digest_binary_array::<i32>(a.as_ref()),
312+
DataType::LargeBinary => {
313+
digest_algorithm.digest_binary_array::<i64>(a.as_ref())
314+
}
315+
other => exec_err!(
316+
"Unsupported data type {other:?} for function {digest_algorithm}"
317+
),
318+
},
319+
ColumnarValue::Scalar(scalar) => match scalar {
320+
ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => {
321+
Ok(digest_algorithm
322+
.digest_scalar(a.as_ref().map(|s: &String| s.as_bytes())))
323+
}
324+
ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
325+
.digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
326+
other => exec_err!(
327+
"Unsupported data type {other:?} for function {digest_algorithm}"
328+
),
329+
},
330+
}
331+
}

0 commit comments

Comments
 (0)