Skip to content

Commit 1634a65

Browse files
authored
Allow constructing ByteViewArray from existing blocks (#5796)
* Allow constructing ByteViewArray from existing blocks * Format * Add tests * More tests
1 parent 7fe01bb commit 1634a65

File tree

2 files changed

+222
-17
lines changed

2 files changed

+222
-17
lines changed

arrow-array/src/builder/generic_bytes_view_builder.rs

+210-17
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,42 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::builder::ArrayBuilder;
19-
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
20-
use crate::{ArrayRef, GenericByteViewArray};
21-
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
22-
use arrow_data::ByteView;
23-
2418
use std::any::Any;
2519
use std::marker::PhantomData;
2620
use std::sync::Arc;
2721

22+
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
23+
use arrow_data::ByteView;
24+
use arrow_schema::ArrowError;
25+
26+
use crate::builder::ArrayBuilder;
27+
use crate::types::bytes::ByteArrayNativeType;
28+
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
29+
use crate::{ArrayRef, GenericByteViewArray};
30+
2831
const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;
2932

3033
/// A builder for [`GenericByteViewArray`]
3134
///
32-
/// See [`Self::append_value`] for the allocation strategy
35+
/// A [`GenericByteViewArray`] consists of a list of data blocks containing string data,
36+
/// and a list of views into those buffers.
37+
///
38+
/// This builder can be used in two ways
39+
///
40+
/// # Append Values
41+
///
42+
/// To avoid bump allocating, this builder allocates data in fixed size blocks, configurable
43+
/// using [`GenericByteViewBuilder::with_block_size`]. [`GenericByteViewBuilder::append_value`]
44+
/// writes values larger than 12 bytes to the current in-progress block, with values smaller
45+
/// than 12 bytes inlined into the views. If a value is appended that will not fit in the
46+
/// in-progress block, it will be closed, and a new block of sufficient size allocated
47+
///
48+
/// # Append Views
49+
///
50+
/// Some use-cases may wish to reuse an existing allocation containing string data, for example,
51+
/// when parsing data from a parquet data page. In such a case entire blocks can be appended
52+
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended
53+
/// using [`GenericByteViewBuilder::try_append_view`]
3354
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
3455
views_builder: BufferBuilder<u128>,
3556
null_buffer_builder: NullBufferBuilder,
@@ -62,6 +83,98 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
6283
Self { block_size, ..self }
6384
}
6485

86+
/// Append a new data block returning the new block offset
87+
///
88+
/// Note: this will first flush any in-progress block
89+
///
90+
/// This allows appending views from blocks added using [`Self::append_block`]. See
91+
/// [`Self::append_value`] for appending individual values
92+
///
93+
/// ```
94+
/// # use arrow_array::builder::StringViewBuilder;
95+
/// let mut builder = StringViewBuilder::new();
96+
///
97+
/// let block = builder.append_block(b"helloworldbingobongo".into());
98+
///
99+
/// builder.try_append_view(block, 0, 5).unwrap();
100+
/// builder.try_append_view(block, 5, 5).unwrap();
101+
/// builder.try_append_view(block, 10, 5).unwrap();
102+
/// builder.try_append_view(block, 15, 5).unwrap();
103+
/// builder.try_append_view(block, 0, 15).unwrap();
104+
/// let array = builder.finish();
105+
///
106+
/// let actual: Vec<_> = array.iter().flatten().collect();
107+
/// let expected = &["hello", "world", "bingo", "bongo", "helloworldbingo"];
108+
/// assert_eq!(actual, expected);
109+
/// ```
110+
pub fn append_block(&mut self, buffer: Buffer) -> u32 {
111+
assert!(buffer.len() < u32::MAX as usize);
112+
113+
self.flush_in_progress();
114+
let offset = self.completed.len();
115+
self.push_completed(buffer);
116+
offset as u32
117+
}
118+
119+
/// Try to append a view of the given `block`, `offset` and `length`
120+
///
121+
/// See [`Self::append_block`]
122+
pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
123+
let b = self.completed.get(block as usize).ok_or_else(|| {
124+
ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
125+
})?;
126+
let start = offset as usize;
127+
let end = start.saturating_add(len as usize);
128+
129+
let b = b.get(start..end).ok_or_else(|| {
130+
ArrowError::InvalidArgumentError(format!(
131+
"Range {start}..{end} out of bounds for block of length {}",
132+
b.len()
133+
))
134+
})?;
135+
136+
if T::Native::from_bytes_checked(b).is_none() {
137+
return Err(ArrowError::InvalidArgumentError(
138+
"Invalid view data".to_string(),
139+
));
140+
}
141+
142+
if len <= 12 {
143+
let mut view_buffer = [0; 16];
144+
view_buffer[0..4].copy_from_slice(&len.to_le_bytes());
145+
view_buffer[4..4 + b.len()].copy_from_slice(b);
146+
self.views_builder.append(u128::from_le_bytes(view_buffer));
147+
} else {
148+
let view = ByteView {
149+
length: len,
150+
prefix: u32::from_le_bytes(b[0..4].try_into().unwrap()),
151+
buffer_index: block,
152+
offset,
153+
};
154+
self.views_builder.append(view.into());
155+
}
156+
157+
self.null_buffer_builder.append_non_null();
158+
Ok(())
159+
}
160+
161+
/// Flushes the in progress block if any
162+
#[inline]
163+
fn flush_in_progress(&mut self) {
164+
if !self.in_progress.is_empty() {
165+
let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
166+
self.push_completed(f)
167+
}
168+
}
169+
170+
/// Append a block to `self.completed`, checking for overflow
171+
#[inline]
172+
fn push_completed(&mut self, block: Buffer) {
173+
assert!(block.len() < u32::MAX as usize, "Block too large");
174+
assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
175+
self.completed.push(block);
176+
}
177+
65178
/// Appends a value into the builder
66179
///
67180
/// # Panics
@@ -84,12 +197,9 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
84197

85198
let required_cap = self.in_progress.len() + v.len();
86199
if self.in_progress.capacity() < required_cap {
87-
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
88-
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
89-
if !flushed.is_empty() {
90-
assert!(self.completed.len() < u32::MAX as usize);
91-
self.completed.push(flushed.into());
92-
}
200+
self.flush_in_progress();
201+
let to_reserve = v.len().max(self.block_size as usize);
202+
self.in_progress.reserve(to_reserve);
93203
};
94204
let offset = self.in_progress.len() as u32;
95205
self.in_progress.extend_from_slice(v);
@@ -122,10 +232,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
122232

123233
/// Builds the [`GenericByteViewArray`] and reset this builder
124234
pub fn finish(&mut self) -> GenericByteViewArray<T> {
125-
let mut completed = std::mem::take(&mut self.completed);
126-
if !self.in_progress.is_empty() {
127-
completed.push(std::mem::take(&mut self.in_progress).into());
128-
}
235+
self.flush_in_progress();
236+
let completed = std::mem::take(&mut self.completed);
129237
let len = self.views_builder.len();
130238
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
131239
let nulls = self.null_buffer_builder.finish();
@@ -219,3 +327,88 @@ pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
219327
/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with
220328
/// [`GenericByteViewBuilder::append_null`] as normal.
221329
pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
330+
331+
#[cfg(test)]
332+
mod tests {
333+
use super::*;
334+
use crate::Array;
335+
336+
#[test]
337+
fn test_string_view() {
338+
let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
339+
let b2 = Buffer::from(b"cupcakes");
340+
let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
341+
342+
let mut v = StringViewBuilder::new();
343+
assert_eq!(v.append_block(b1), 0);
344+
345+
v.append_value("This is a very long string that exceeds the inline length");
346+
v.append_value("This is another very long string that exceeds the inline length");
347+
348+
assert_eq!(v.append_block(b2), 2);
349+
assert_eq!(v.append_block(b3), 3);
350+
351+
// Test short strings
352+
v.try_append_view(0, 0, 5).unwrap(); // world
353+
v.try_append_view(0, 6, 7).unwrap(); // bananas
354+
v.try_append_view(2, 3, 5).unwrap(); // cake
355+
v.try_append_view(2, 0, 3).unwrap(); // cup
356+
v.try_append_view(2, 0, 8).unwrap(); // cupcakes
357+
v.try_append_view(0, 13, 4).unwrap(); // 😁
358+
v.try_append_view(0, 13, 0).unwrap(); //
359+
360+
// Test longer strings
361+
v.try_append_view(3, 0, 16).unwrap(); // Many strings are
362+
v.try_append_view(1, 0, 19).unwrap(); // This is a very long
363+
v.try_append_view(3, 13, 27).unwrap(); // here contained of great length
364+
365+
v.append_value("I do so like long strings");
366+
367+
let array = v.finish_cloned();
368+
array.to_data().validate_full().unwrap();
369+
assert_eq!(array.data_buffers().len(), 5);
370+
let actual: Vec<_> = array.iter().map(Option::unwrap).collect();
371+
assert_eq!(
372+
actual,
373+
&[
374+
"This is a very long string that exceeds the inline length",
375+
"This is another very long string that exceeds the inline length",
376+
"world",
377+
"bananas",
378+
"cakes",
379+
"cup",
380+
"cupcakes",
381+
"😁",
382+
"",
383+
"Many strings are",
384+
"This is a very long",
385+
"are here contained of great",
386+
"I do so like long strings"
387+
]
388+
);
389+
390+
let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
391+
assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17");
392+
393+
let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
394+
assert_eq!(
395+
err.to_string(),
396+
"Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
397+
);
398+
399+
let err = v.try_append_view(0, 13, 2).unwrap_err();
400+
assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
401+
402+
let err = v.try_append_view(0, 40, 0).unwrap_err();
403+
assert_eq!(
404+
err.to_string(),
405+
"Invalid argument error: Range 40..40 out of bounds for block of length 17"
406+
);
407+
408+
let err = v.try_append_view(5, 0, 0).unwrap_err();
409+
assert_eq!(
410+
err.to_string(),
411+
"Invalid argument error: No block found with index 5"
412+
);
413+
}
414+
}

arrow-array/src/types.rs

+12
Original file line numberDiff line numberDiff line change
@@ -1386,20 +1386,32 @@ pub(crate) mod bytes {
13861386
impl<O: OffsetSizeTrait> ByteArrayTypeSealed for GenericBinaryType<O> {}
13871387

13881388
pub trait ByteArrayNativeType: std::fmt::Debug + Send + Sync {
1389+
fn from_bytes_checked(b: &[u8]) -> Option<&Self>;
1390+
13891391
/// # Safety
13901392
///
13911393
/// `b` must be a valid byte sequence for `Self`
13921394
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self;
13931395
}
13941396

13951397
impl ByteArrayNativeType for [u8] {
1398+
#[inline]
1399+
fn from_bytes_checked(b: &[u8]) -> Option<&Self> {
1400+
Some(b)
1401+
}
1402+
13961403
#[inline]
13971404
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
13981405
b
13991406
}
14001407
}
14011408

14021409
impl ByteArrayNativeType for str {
1410+
#[inline]
1411+
fn from_bytes_checked(b: &[u8]) -> Option<&Self> {
1412+
std::str::from_utf8(b).ok()
1413+
}
1414+
14031415
#[inline]
14041416
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
14051417
std::str::from_utf8_unchecked(b)

0 commit comments

Comments
 (0)