Skip to content

Commit 629ed2c

Browse files
committed
feat: ChunkedArray uses a builder to implement to_canonical
1 parent 1088582 commit 629ed2c

File tree

1 file changed

+7
-247
lines changed

1 file changed

+7
-247
lines changed

vortex-array/src/arrays/chunked/canonical.rs

+7-247
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,14 @@
1-
use arrow_buffer::BooleanBufferBuilder;
2-
use vortex_buffer::BufferMut;
3-
use vortex_dtype::{DType, NativePType, Nullability, PType, StructDType, match_each_native_ptype};
4-
use vortex_error::{VortexExpect, VortexResult, vortex_err};
1+
use vortex_error::VortexResult;
52

6-
use crate::array::ArrayCanonicalImpl;
7-
use crate::arrays::chunked::ChunkedArray;
8-
use crate::arrays::extension::ExtensionArray;
9-
use crate::arrays::null::NullArray;
10-
use crate::arrays::primitive::PrimitiveArray;
11-
use crate::arrays::struct_::StructArray;
12-
use crate::arrays::{BoolArray, ListArray, VarBinViewArray};
13-
use crate::builders::ArrayBuilder;
14-
use crate::compute::{scalar_at, slice, try_cast};
15-
use crate::validity::Validity;
16-
use crate::{Array, ArrayRef, ArrayVariants, Canonical, ToCanonical};
3+
use super::ChunkedArray;
4+
use crate::builders::{ArrayBuilder, builder_with_capacity};
5+
use crate::{Array as _, ArrayCanonicalImpl, Canonical};
176

187
impl ArrayCanonicalImpl for ChunkedArray {
198
fn _to_canonical(&self) -> VortexResult<Canonical> {
20-
let validity = Validity::copy_from_array(self)?;
21-
try_canonicalize_chunks(self.chunks(), validity, self.dtype())
9+
let mut builder = builder_with_capacity(self.dtype(), self.len());
10+
self.append_to_builder(builder.as_mut())?;
11+
builder.finish().to_canonical()
2212
}
2313

2414
fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
@@ -29,236 +19,6 @@ impl ArrayCanonicalImpl for ChunkedArray {
2919
}
3020
}
3121

32-
pub(crate) fn try_canonicalize_chunks(
33-
chunks: &[ArrayRef],
34-
validity: Validity,
35-
dtype: &DType,
36-
) -> VortexResult<Canonical> {
37-
match dtype {
38-
// Structs can have their internal field pointers swizzled to push the chunking down
39-
// one level internally without copying or decompressing any data.
40-
DType::Struct(struct_dtype, _) => {
41-
let struct_array = swizzle_struct_chunks(chunks, validity, struct_dtype)?;
42-
Ok(Canonical::Struct(struct_array))
43-
}
44-
45-
// Extension arrays are containers that wraps an inner storage array with some metadata.
46-
// We delegate to the canonical format of the internal storage array for every chunk, and
47-
// push the chunking down into the inner storage array.
48-
//
49-
// Input:
50-
// ------
51-
//
52-
// ChunkedArray
53-
// / \
54-
// / \
55-
// ExtensionArray ExtensionArray
56-
// | |
57-
// storage storage
58-
//
59-
//
60-
// Output:
61-
// ------
62-
//
63-
// ExtensionArray
64-
// |
65-
// ChunkedArray
66-
// / \
67-
// storage storage
68-
//
69-
DType::Extension(ext_dtype) => {
70-
// Recursively apply canonicalization and packing to the storage array backing
71-
// each chunk of the extension array.
72-
let storage_chunks: Vec<ArrayRef> = chunks
73-
.iter()
74-
// Extension-typed arrays can be compressed into something that is not an
75-
// ExtensionArray, so we should canonicalize each chunk into ExtensionArray first.
76-
.map(|chunk| {
77-
chunk
78-
.clone()
79-
.as_extension_typed()
80-
.vortex_expect("Chunk could not be downcast to ExtensionArrayTrait")
81-
.storage_data()
82-
})
83-
.collect();
84-
let storage_dtype = ext_dtype.storage_dtype().clone();
85-
let chunked_storage =
86-
ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();
87-
88-
Ok(Canonical::Extension(ExtensionArray::new(
89-
ext_dtype.clone(),
90-
chunked_storage,
91-
)))
92-
}
93-
94-
DType::List(..) => {
95-
// TODO(joe): improve performance, use a listview, once it exists
96-
97-
let list = pack_lists(chunks, validity, dtype)?;
98-
Ok(Canonical::List(list))
99-
}
100-
101-
DType::Bool(_) => {
102-
let bool_array = pack_bools(chunks, validity)?;
103-
Ok(Canonical::Bool(bool_array))
104-
}
105-
DType::Primitive(ptype, _) => {
106-
match_each_native_ptype!(ptype, |$P| {
107-
let prim_array = pack_primitives::<$P>(chunks, validity)?;
108-
Ok(Canonical::Primitive(prim_array))
109-
})
110-
}
111-
DType::Utf8(_) => {
112-
let varbin_array = pack_views(chunks, dtype, validity)?;
113-
Ok(Canonical::VarBinView(varbin_array))
114-
}
115-
DType::Binary(_) => {
116-
let varbin_array = pack_views(chunks, dtype, validity)?;
117-
Ok(Canonical::VarBinView(varbin_array))
118-
}
119-
DType::Null => {
120-
let len = chunks.iter().map(|chunk| chunk.len()).sum();
121-
let null_array = NullArray::new(len);
122-
Ok(Canonical::Null(null_array))
123-
}
124-
}
125-
}
126-
127-
fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexResult<ListArray> {
128-
let len: usize = chunks.iter().map(|c| c.len()).sum();
129-
let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
130-
offsets.push(0);
131-
let mut elements = Vec::new();
132-
let elem_dtype = dtype
133-
.as_list_element()
134-
.vortex_expect("ListArray must have List dtype");
135-
136-
for chunk in chunks {
137-
let chunk = chunk.to_list()?;
138-
// TODO: handle i32 offsets if they fit.
139-
let offsets_arr = try_cast(
140-
chunk.offsets(),
141-
&DType::Primitive(PType::I64, Nullability::NonNullable),
142-
)?
143-
.to_primitive()?;
144-
145-
let first_offset_value: usize = usize::try_from(&scalar_at(&offsets_arr, 0)?)?;
146-
let last_offset_value: usize =
147-
usize::try_from(&scalar_at(&offsets_arr, offsets_arr.len() - 1)?)?;
148-
elements.push(slice(
149-
chunk.elements(),
150-
first_offset_value,
151-
last_offset_value,
152-
)?);
153-
154-
let adjustment_from_previous = *offsets
155-
.last()
156-
.ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
157-
offsets.extend(
158-
offsets_arr
159-
.as_slice::<i64>()
160-
.iter()
161-
.skip(1)
162-
.map(|off| off + adjustment_from_previous - first_offset_value as i64),
163-
);
164-
}
165-
let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
166-
let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
167-
168-
ListArray::try_new(chunked_elements, offsets.into_array(), validity)
169-
}
170-
171-
/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
172-
/// StructArray, where the Array for each Field is a ChunkedArray.
173-
///
174-
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
175-
/// been checked to have the same DType already.
176-
fn swizzle_struct_chunks(
177-
chunks: &[ArrayRef],
178-
validity: Validity,
179-
struct_dtype: &StructDType,
180-
) -> VortexResult<StructArray> {
181-
let len = chunks.iter().map(|chunk| chunk.len()).sum();
182-
let mut field_arrays = Vec::new();
183-
184-
for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
185-
let field_chunks = chunks
186-
.iter()
187-
.map(|c| {
188-
c.as_struct_typed()
189-
.vortex_expect("Chunk was not a StructArray")
190-
.maybe_null_field_by_idx(field_idx)
191-
.vortex_expect("Invalid chunked array")
192-
})
193-
.collect::<Vec<_>>();
194-
let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
195-
field_arrays.push(field_array.into_array());
196-
}
197-
198-
StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
199-
}
200-
201-
/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array.
202-
///
203-
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
204-
/// been checked to have the same DType already.
205-
fn pack_bools(chunks: &[ArrayRef], validity: Validity) -> VortexResult<BoolArray> {
206-
let len = chunks.iter().map(|chunk| chunk.len()).sum();
207-
let mut buffer = BooleanBufferBuilder::new(len);
208-
for chunk in chunks {
209-
let chunk = chunk.to_bool()?;
210-
buffer.append_buffer(chunk.boolean_buffer());
211-
}
212-
Ok(BoolArray::new(buffer.finish(), validity))
213-
}
214-
215-
/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single
216-
/// contiguous array.
217-
///
218-
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
219-
/// been checked to have the same DType already.
220-
fn pack_primitives<T: NativePType>(
221-
chunks: &[ArrayRef],
222-
validity: Validity,
223-
) -> VortexResult<PrimitiveArray> {
224-
let total_len = chunks.iter().map(|a| a.len()).sum();
225-
let mut buffer = BufferMut::with_capacity(total_len);
226-
for chunk in chunks {
227-
let chunk = chunk.to_primitive()?;
228-
buffer.extend_from_slice(chunk.as_slice::<T>());
229-
}
230-
Ok(PrimitiveArray::new(buffer.freeze(), validity))
231-
}
232-
233-
/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single
234-
/// contiguous array.
235-
///
236-
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
237-
/// been checked to have the same DType already.
238-
fn pack_views(
239-
chunks: &[ArrayRef],
240-
dtype: &DType,
241-
validity: Validity,
242-
) -> VortexResult<VarBinViewArray> {
243-
let total_len = chunks.iter().map(|a| a.len()).sum();
244-
let mut views = BufferMut::with_capacity(total_len);
245-
let mut buffers = Vec::new();
246-
for chunk in chunks {
247-
let buffers_offset = u32::try_from(buffers.len())?;
248-
let canonical_chunk = chunk.to_varbinview()?;
249-
buffers.extend(canonical_chunk.buffers().iter().cloned());
250-
251-
views.extend(
252-
canonical_chunk
253-
.views()
254-
.iter()
255-
.map(|view| view.offset_view(buffers_offset)),
256-
);
257-
}
258-
259-
VarBinViewArray::try_new(views.freeze(), buffers, dtype.clone(), validity)
260-
}
261-
26222
#[cfg(test)]
26323
mod tests {
26424
use std::sync::Arc;

0 commit comments

Comments
 (0)