Skip to content

Commit e4a9424

Browse files
authored
Update the CONCAT scalar function to support Utf8View (#12224)
* wip * feat: Update the CONCAT scalar function to support Utf8View * fmt * fmt and add default return type for concat * fix clippy lint Signed-off-by: Devan <[email protected]> * fmt Signed-off-by: Devan <[email protected]> * add more tests for sqllogic Signed-off-by: Devan <[email protected]> * make sure no casting with LargeUtf8 * fixing utf8large * fix large utf8 Signed-off-by: Devan <[email protected]> * fix large utf8 Signed-off-by: Devan <[email protected]> * add test Signed-off-by: Devan <[email protected]> * fmt Signed-off-by: Devan <[email protected]> * make it so Utf8View just returns Utf8 Signed-off-by: Devan <[email protected]> * wip -- trying to build a stringview with columnar refs Signed-off-by: Devan <[email protected]> * built stringview builder but it does allocate a new String each iter :( Signed-off-by: Devan <[email protected]> * add some testing Signed-off-by: Devan <[email protected]> * clippy Signed-off-by: Devan <[email protected]> --------- Signed-off-by: Devan <[email protected]>
1 parent bf6c82f commit e4a9424

File tree

3 files changed

+416
-34
lines changed

3 files changed

+416
-34
lines changed

datafusion/functions/src/string/common.rs

+190-5
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ use std::sync::Arc;
2222

2323
use arrow::array::{
2424
new_null_array, Array, ArrayAccessor, ArrayDataBuilder, ArrayIter, ArrayRef,
25-
GenericStringArray, GenericStringBuilder, OffsetSizeTrait, StringArray,
26-
StringBuilder, StringViewArray,
25+
GenericStringArray, GenericStringBuilder, LargeStringArray, OffsetSizeTrait,
26+
StringArray, StringBuilder, StringViewArray, StringViewBuilder,
2727
};
2828
use arrow::buffer::{Buffer, MutableBuffer, NullBuffer};
2929
use arrow::datatypes::DataType;
30-
3130
use datafusion_common::cast::{as_generic_string_array, as_string_view_array};
3231
use datafusion_common::Result;
3332
use datafusion_common::{exec_err, ScalarValue};
@@ -249,26 +248,41 @@ where
249248
}
250249
}
251250

251+
#[derive(Debug)]
252252
pub(crate) enum ColumnarValueRef<'a> {
253253
Scalar(&'a [u8]),
254254
NullableArray(&'a StringArray),
255255
NonNullableArray(&'a StringArray),
256+
NullableLargeStringArray(&'a LargeStringArray),
257+
NonNullableLargeStringArray(&'a LargeStringArray),
258+
NullableStringViewArray(&'a StringViewArray),
259+
NonNullableStringViewArray(&'a StringViewArray),
256260
}
257261

258262
impl<'a> ColumnarValueRef<'a> {
259263
#[inline]
260264
pub fn is_valid(&self, i: usize) -> bool {
261265
match &self {
262-
Self::Scalar(_) | Self::NonNullableArray(_) => true,
266+
Self::Scalar(_)
267+
| Self::NonNullableArray(_)
268+
| Self::NonNullableLargeStringArray(_)
269+
| Self::NonNullableStringViewArray(_) => true,
263270
Self::NullableArray(array) => array.is_valid(i),
271+
Self::NullableStringViewArray(array) => array.is_valid(i),
272+
Self::NullableLargeStringArray(array) => array.is_valid(i),
264273
}
265274
}
266275

267276
#[inline]
268277
pub fn nulls(&self) -> Option<NullBuffer> {
269278
match &self {
270-
Self::Scalar(_) | Self::NonNullableArray(_) => None,
279+
Self::Scalar(_)
280+
| Self::NonNullableArray(_)
281+
| Self::NonNullableStringViewArray(_)
282+
| Self::NonNullableLargeStringArray(_) => None,
271283
Self::NullableArray(array) => array.nulls().cloned(),
284+
Self::NullableStringViewArray(array) => array.nulls().cloned(),
285+
Self::NullableLargeStringArray(array) => array.nulls().cloned(),
272286
}
273287
}
274288
}
@@ -387,10 +401,30 @@ impl StringArrayBuilder {
387401
.extend_from_slice(array.value(i).as_bytes());
388402
}
389403
}
404+
ColumnarValueRef::NullableLargeStringArray(array) => {
405+
if !CHECK_VALID || array.is_valid(i) {
406+
self.value_buffer
407+
.extend_from_slice(array.value(i).as_bytes());
408+
}
409+
}
410+
ColumnarValueRef::NullableStringViewArray(array) => {
411+
if !CHECK_VALID || array.is_valid(i) {
412+
self.value_buffer
413+
.extend_from_slice(array.value(i).as_bytes());
414+
}
415+
}
390416
ColumnarValueRef::NonNullableArray(array) => {
391417
self.value_buffer
392418
.extend_from_slice(array.value(i).as_bytes());
393419
}
420+
ColumnarValueRef::NonNullableLargeStringArray(array) => {
421+
self.value_buffer
422+
.extend_from_slice(array.value(i).as_bytes());
423+
}
424+
ColumnarValueRef::NonNullableStringViewArray(array) => {
425+
self.value_buffer
426+
.extend_from_slice(array.value(i).as_bytes());
427+
}
394428
}
395429
}
396430

@@ -416,6 +450,157 @@ impl StringArrayBuilder {
416450
}
417451
}
418452

453+
pub(crate) struct StringViewArrayBuilder {
454+
builder: StringViewBuilder,
455+
block: String,
456+
}
457+
458+
impl StringViewArrayBuilder {
459+
pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self {
460+
let builder = StringViewBuilder::with_capacity(data_capacity);
461+
Self {
462+
builder,
463+
block: String::new(),
464+
}
465+
}
466+
467+
pub fn write<const CHECK_VALID: bool>(
468+
&mut self,
469+
column: &ColumnarValueRef,
470+
i: usize,
471+
) {
472+
match column {
473+
ColumnarValueRef::Scalar(s) => {
474+
self.block.push_str(std::str::from_utf8(s).unwrap());
475+
}
476+
ColumnarValueRef::NullableArray(array) => {
477+
if !CHECK_VALID || array.is_valid(i) {
478+
self.block.push_str(
479+
std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
480+
);
481+
}
482+
}
483+
ColumnarValueRef::NullableLargeStringArray(array) => {
484+
if !CHECK_VALID || array.is_valid(i) {
485+
self.block.push_str(
486+
std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
487+
);
488+
}
489+
}
490+
ColumnarValueRef::NullableStringViewArray(array) => {
491+
if !CHECK_VALID || array.is_valid(i) {
492+
self.block.push_str(
493+
std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
494+
);
495+
}
496+
}
497+
ColumnarValueRef::NonNullableArray(array) => {
498+
self.block
499+
.push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
500+
}
501+
ColumnarValueRef::NonNullableLargeStringArray(array) => {
502+
self.block
503+
.push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
504+
}
505+
ColumnarValueRef::NonNullableStringViewArray(array) => {
506+
self.block
507+
.push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
508+
}
509+
}
510+
}
511+
512+
pub fn append_offset(&mut self) {
513+
self.builder.append_value(&self.block);
514+
self.block = String::new();
515+
}
516+
517+
pub fn finish(mut self) -> StringViewArray {
518+
self.builder.finish()
519+
}
520+
}
521+
522+
pub(crate) struct LargeStringArrayBuilder {
523+
offsets_buffer: MutableBuffer,
524+
value_buffer: MutableBuffer,
525+
}
526+
527+
impl LargeStringArrayBuilder {
528+
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
529+
let mut offsets_buffer = MutableBuffer::with_capacity(
530+
(item_capacity + 1) * std::mem::size_of::<i64>(),
531+
);
532+
// SAFETY: the first offset value is definitely not going to exceed the bounds.
533+
unsafe { offsets_buffer.push_unchecked(0_i64) };
534+
Self {
535+
offsets_buffer,
536+
value_buffer: MutableBuffer::with_capacity(data_capacity),
537+
}
538+
}
539+
540+
pub fn write<const CHECK_VALID: bool>(
541+
&mut self,
542+
column: &ColumnarValueRef,
543+
i: usize,
544+
) {
545+
match column {
546+
ColumnarValueRef::Scalar(s) => {
547+
self.value_buffer.extend_from_slice(s);
548+
}
549+
ColumnarValueRef::NullableArray(array) => {
550+
if !CHECK_VALID || array.is_valid(i) {
551+
self.value_buffer
552+
.extend_from_slice(array.value(i).as_bytes());
553+
}
554+
}
555+
ColumnarValueRef::NullableLargeStringArray(array) => {
556+
if !CHECK_VALID || array.is_valid(i) {
557+
self.value_buffer
558+
.extend_from_slice(array.value(i).as_bytes());
559+
}
560+
}
561+
ColumnarValueRef::NullableStringViewArray(array) => {
562+
if !CHECK_VALID || array.is_valid(i) {
563+
self.value_buffer
564+
.extend_from_slice(array.value(i).as_bytes());
565+
}
566+
}
567+
ColumnarValueRef::NonNullableArray(array) => {
568+
self.value_buffer
569+
.extend_from_slice(array.value(i).as_bytes());
570+
}
571+
ColumnarValueRef::NonNullableLargeStringArray(array) => {
572+
self.value_buffer
573+
.extend_from_slice(array.value(i).as_bytes());
574+
}
575+
ColumnarValueRef::NonNullableStringViewArray(array) => {
576+
self.value_buffer
577+
.extend_from_slice(array.value(i).as_bytes());
578+
}
579+
}
580+
}
581+
582+
pub fn append_offset(&mut self) {
583+
let next_offset: i64 = self
584+
.value_buffer
585+
.len()
586+
.try_into()
587+
.expect("byte array offset overflow");
588+
unsafe { self.offsets_buffer.push_unchecked(next_offset) };
589+
}
590+
591+
pub fn finish(self, null_buffer: Option<NullBuffer>) -> LargeStringArray {
592+
let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
593+
.len(self.offsets_buffer.len() / std::mem::size_of::<i64>() - 1)
594+
.add_buffer(self.offsets_buffer.into())
595+
.add_buffer(self.value_buffer.into())
596+
.nulls(null_buffer);
597+
// SAFETY: all data that was appended was valid Large UTF8 and the values
598+
// and offsets were created correctly
599+
let array_data = unsafe { array_builder.build_unchecked() };
600+
LargeStringArray::from(array_data)
601+
}
602+
}
603+
419604
fn case_conversion_array<'a, O, F>(array: &'a ArrayRef, op: F) -> Result<ArrayRef>
420605
where
421606
O: OffsetSizeTrait,

0 commit comments

Comments
 (0)