17
17
18
18
//! "crypto" DataFusion functions
19
19
20
- use arrow:: array:: { Array , ArrayRef , BinaryArray , OffsetSizeTrait } ;
20
+ use arrow:: array:: {
21
+ Array , ArrayRef , BinaryArray , BinaryArrayType , BinaryViewArray , GenericBinaryArray ,
22
+ OffsetSizeTrait ,
23
+ } ;
21
24
use arrow:: array:: { AsArray , GenericStringArray , StringArray , StringViewArray } ;
22
25
use arrow:: datatypes:: DataType ;
23
26
use blake2:: { Blake2b512 , Blake2s256 , Digest } ;
@@ -198,11 +201,13 @@ pub fn utf8_or_binary_to_binary_type(
198
201
arg_type : & DataType ,
199
202
name : & str ,
200
203
) -> Result < DataType > {
204
+ dbg ! ( arg_type) ;
201
205
Ok ( match arg_type {
202
206
DataType :: Utf8View
203
207
| DataType :: LargeUtf8
204
208
| DataType :: Utf8
205
209
| DataType :: Binary
210
+ | DataType :: BinaryView
206
211
| DataType :: LargeBinary => DataType :: Binary ,
207
212
DataType :: Null => DataType :: Null ,
208
213
_ => {
@@ -251,30 +256,44 @@ impl DigestAlgorithm {
251
256
where
252
257
T : OffsetSizeTrait ,
253
258
{
254
- let input_value = as_generic_binary_array :: < T > ( value) ?;
255
- let array: ArrayRef = match self {
256
- Self :: Md5 => digest_to_array ! ( Md5 , input_value) ,
257
- Self :: Sha224 => digest_to_array ! ( Sha224 , input_value) ,
258
- Self :: Sha256 => digest_to_array ! ( Sha256 , input_value) ,
259
- Self :: Sha384 => digest_to_array ! ( Sha384 , input_value) ,
260
- Self :: Sha512 => digest_to_array ! ( Sha512 , input_value) ,
261
- Self :: Blake2b => digest_to_array ! ( Blake2b512 , input_value) ,
262
- Self :: Blake2s => digest_to_array ! ( Blake2s256 , input_value) ,
263
- Self :: Blake3 => {
264
- let binary_array: BinaryArray = input_value
265
- . iter ( )
266
- . map ( |opt| {
267
- opt. map ( |x| {
268
- let mut digest = Blake3 :: default ( ) ;
269
- digest. update ( x) ;
270
- Blake3 :: finalize ( & digest) . as_bytes ( ) . to_vec ( )
271
- } )
272
- } )
273
- . collect ( ) ;
274
- Arc :: new ( binary_array)
259
+ let array = match value. data_type ( ) {
260
+ DataType :: Binary | DataType :: LargeBinary => {
261
+ let v = value. as_binary :: < T > ( ) ;
262
+ self . digest_binary_array_impl :: < & GenericBinaryArray < T > > ( v)
263
+ }
264
+ DataType :: BinaryView => {
265
+ let v = value. as_binary_view ( ) ;
266
+ self . digest_binary_array_impl :: < & BinaryViewArray > ( v)
267
+ }
268
+ other => {
269
+ return exec_err ! ( "unsupported type for digest_utf_array: {other:?}" )
275
270
}
276
271
} ;
277
272
Ok ( ColumnarValue :: Array ( array) )
273
+ // let input_value = as_generic_binary_array::<T>(value)?;
274
+ // let array: ArrayRef = match self {
275
+ // Self::Md5 => digest_to_array!(Md5, input_value),
276
+ // Self::Sha224 => digest_to_array!(Sha224, input_value),
277
+ // Self::Sha256 => digest_to_array!(Sha256, input_value),
278
+ // Self::Sha384 => digest_to_array!(Sha384, input_value),
279
+ // Self::Sha512 => digest_to_array!(Sha512, input_value),
280
+ // Self::Blake2b => digest_to_array!(Blake2b512, input_value),
281
+ // Self::Blake2s => digest_to_array!(Blake2s256, input_value),
282
+ // Self::Blake3 => {
283
+ // let binary_array: BinaryArray = input_value
284
+ // .iter()
285
+ // .map(|opt| {
286
+ // opt.map(|x| {
287
+ // let mut digest = Blake3::default();
288
+ // digest.update(x);
289
+ // Blake3::finalize(&digest).as_bytes().to_vec()
290
+ // })
291
+ // })
292
+ // .collect();
293
+ // Arc::new(binary_array)
294
+ // }
295
+ // };
296
+ // Ok(ColumnarValue::Array(array))
278
297
}
279
298
280
299
/// digest a string array to their hash values
@@ -328,6 +347,37 @@ impl DigestAlgorithm {
328
347
}
329
348
}
330
349
}
350
+
351
+ pub fn digest_binary_array_impl < ' a , BinaryArrType > (
352
+ self ,
353
+ input_value : BinaryArrType ,
354
+ ) -> ArrayRef
355
+ where
356
+ BinaryArrType : BinaryArrayType < ' a > ,
357
+ {
358
+ match self {
359
+ Self :: Md5 => digest_to_array ! ( Md5 , input_value) ,
360
+ Self :: Sha224 => digest_to_array ! ( Sha224 , input_value) ,
361
+ Self :: Sha256 => digest_to_array ! ( Sha256 , input_value) ,
362
+ Self :: Sha384 => digest_to_array ! ( Sha384 , input_value) ,
363
+ Self :: Sha512 => digest_to_array ! ( Sha512 , input_value) ,
364
+ Self :: Blake2b => digest_to_array ! ( Blake2b512 , input_value) ,
365
+ Self :: Blake2s => digest_to_array ! ( Blake2s256 , input_value) ,
366
+ Self :: Blake3 => {
367
+ let binary_array: BinaryArray = input_value
368
+ . iter ( )
369
+ . map ( |opt| {
370
+ opt. map ( |x| {
371
+ let mut digest = Blake3 :: default ( ) ;
372
+ digest. update ( x) ;
373
+ Blake3 :: finalize ( & digest) . as_bytes ( ) . to_vec ( )
374
+ } )
375
+ } )
376
+ . collect ( ) ;
377
+ Arc :: new ( binary_array)
378
+ }
379
+ }
380
+ }
331
381
}
332
382
pub fn digest_process (
333
383
value : & ColumnarValue ,
@@ -342,22 +392,27 @@ pub fn digest_process(
342
392
DataType :: LargeBinary => {
343
393
digest_algorithm. digest_binary_array :: < i64 > ( a. as_ref ( ) )
344
394
}
345
- other => exec_err ! (
346
- "Unsupported data type {other:?} for function {digest_algorithm}"
347
- ) ,
348
- } ,
349
- ColumnarValue :: Scalar ( scalar) => match scalar {
350
- ScalarValue :: Utf8View ( a)
351
- | ScalarValue :: Utf8 ( a)
352
- | ScalarValue :: LargeUtf8 ( a) => {
353
- Ok ( digest_algorithm
354
- . digest_scalar ( a. as_ref ( ) . map ( |s : & String | s. as_bytes ( ) ) ) )
395
+ DataType :: BinaryView => {
396
+ digest_algorithm. digest_binary_array :: < i32 > ( a. as_ref ( ) )
355
397
}
356
- ScalarValue :: Binary ( a) | ScalarValue :: LargeBinary ( a) => Ok ( digest_algorithm
357
- . digest_scalar ( a. as_ref ( ) . map ( |v : & Vec < u8 > | v. as_slice ( ) ) ) ) ,
358
398
other => exec_err ! (
359
399
"Unsupported data type {other:?} for function {digest_algorithm}"
360
400
) ,
361
401
} ,
402
+ ColumnarValue :: Scalar ( scalar) => {
403
+ match scalar {
404
+ ScalarValue :: Utf8View ( a)
405
+ | ScalarValue :: Utf8 ( a)
406
+ | ScalarValue :: LargeUtf8 ( a) => Ok ( digest_algorithm
407
+ . digest_scalar ( a. as_ref ( ) . map ( |s : & String | s. as_bytes ( ) ) ) ) ,
408
+ ScalarValue :: Binary ( a)
409
+ | ScalarValue :: LargeBinary ( a)
410
+ | ScalarValue :: BinaryView ( a) => Ok ( digest_algorithm
411
+ . digest_scalar ( a. as_ref ( ) . map ( |v : & Vec < u8 > | v. as_slice ( ) ) ) ) ,
412
+ other => exec_err ! (
413
+ "Unsupported data type {other:?} for function {digest_algorithm}"
414
+ ) ,
415
+ }
416
+ }
362
417
}
363
418
}
0 commit comments