Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ChunkedArray uses a builder to implement to_canonical #2511

Merged
merged 5 commits into from
Mar 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 52 additions & 225 deletions vortex-array/src/arrays/chunked/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
use arrow_buffer::BooleanBufferBuilder;
use vortex_buffer::BufferMut;
use vortex_dtype::{DType, NativePType, Nullability, PType, StructDType, match_each_native_ptype};
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{VortexExpect, VortexResult, vortex_err};

use crate::array::ArrayCanonicalImpl;
use crate::arrays::chunked::ChunkedArray;
use crate::arrays::extension::ExtensionArray;
use crate::arrays::null::NullArray;
use crate::arrays::primitive::PrimitiveArray;
use crate::arrays::struct_::StructArray;
use crate::arrays::{BoolArray, ListArray, VarBinViewArray};
use crate::builders::ArrayBuilder;
use super::ChunkedArray;
use crate::arrays::{ListArray, PrimitiveArray, StructArray};
use crate::builders::{ArrayBuilder, builder_with_capacity};
use crate::compute::{scalar_at, slice, try_cast};
use crate::validity::Validity;
use crate::{Array, ArrayRef, ArrayVariants, Canonical, ToCanonical};
use crate::{Array as _, ArrayCanonicalImpl, ArrayRef, Canonical, ToCanonical};

impl ArrayCanonicalImpl for ChunkedArray {
fn _to_canonical(&self) -> VortexResult<Canonical> {
let validity = Validity::copy_from_array(self)?;
try_canonicalize_chunks(self.chunks(), validity, self.dtype())
match self.dtype() {
DType::Struct(struct_dtype, _) => {
let struct_array = swizzle_struct_chunks(
self.chunks(),
Validity::copy_from_array(self)?,
struct_dtype,
)?;
Ok(Canonical::Struct(struct_array))
}
DType::List(elem, _) => Ok(Canonical::List(pack_lists(
self.chunks(),
Validity::copy_from_array(self)?,
elem,
)?)),
_ => {
let mut builder = builder_with_capacity(self.dtype(), self.len());
self.append_to_builder(builder.as_mut())?;
builder.finish().to_canonical()
}
}
}

fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
Expand All @@ -29,109 +41,42 @@ impl ArrayCanonicalImpl for ChunkedArray {
}
}

pub(crate) fn try_canonicalize_chunks(
/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
/// StructArray, where the Array for each Field is a ChunkedArray.
fn swizzle_struct_chunks(
chunks: &[ArrayRef],
validity: Validity,
dtype: &DType,
) -> VortexResult<Canonical> {
match dtype {
// Structs can have their internal field pointers swizzled to push the chunking down
// one level internally without copying or decompressing any data.
DType::Struct(struct_dtype, _) => {
let struct_array = swizzle_struct_chunks(chunks, validity, struct_dtype)?;
Ok(Canonical::Struct(struct_array))
}

// Extension arrays are containers that wraps an inner storage array with some metadata.
// We delegate to the canonical format of the internal storage array for every chunk, and
// push the chunking down into the inner storage array.
//
// Input:
// ------
//
// ChunkedArray
// / \
// / \
// ExtensionArray ExtensionArray
// | |
// storage storage
//
//
// Output:
// ------
//
// ExtensionArray
// |
// ChunkedArray
// / \
// storage storage
//
DType::Extension(ext_dtype) => {
// Recursively apply canonicalization and packing to the storage array backing
// each chunk of the extension array.
let storage_chunks: Vec<ArrayRef> = chunks
.iter()
// Extension-typed arrays can be compressed into something that is not an
// ExtensionArray, so we should canonicalize each chunk into ExtensionArray first.
.map(|chunk| {
chunk
.clone()
.as_extension_typed()
.vortex_expect("Chunk could not be downcast to ExtensionArrayTrait")
.storage_data()
})
.collect();
let storage_dtype = ext_dtype.storage_dtype().clone();
let chunked_storage =
ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();

Ok(Canonical::Extension(ExtensionArray::new(
ext_dtype.clone(),
chunked_storage,
)))
}

DType::List(..) => {
// TODO(joe): improve performance, use a listview, once it exists

let list = pack_lists(chunks, validity, dtype)?;
Ok(Canonical::List(list))
}
struct_dtype: &StructDType,
) -> VortexResult<StructArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut field_arrays = Vec::new();

DType::Bool(_) => {
let bool_array = pack_bools(chunks, validity)?;
Ok(Canonical::Bool(bool_array))
}
DType::Primitive(ptype, _) => {
match_each_native_ptype!(ptype, |$P| {
let prim_array = pack_primitives::<$P>(chunks, validity)?;
Ok(Canonical::Primitive(prim_array))
for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
let field_chunks = chunks
.iter()
.map(|c| {
c.as_struct_typed()
.vortex_expect("Chunk was not a StructArray")
.maybe_null_field_by_idx(field_idx)
.vortex_expect("Invalid chunked array")
})
}
DType::Utf8(_) => {
let varbin_array = pack_views(chunks, dtype, validity)?;
Ok(Canonical::VarBinView(varbin_array))
}
DType::Binary(_) => {
let varbin_array = pack_views(chunks, dtype, validity)?;
Ok(Canonical::VarBinView(varbin_array))
}
DType::Null => {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let null_array = NullArray::new(len);
Ok(Canonical::Null(null_array))
}
.collect::<Vec<_>>();
let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
field_arrays.push(field_array.into_array());
}

StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
}

fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexResult<ListArray> {
fn pack_lists(
chunks: &[ArrayRef],
validity: Validity,
elem_dtype: &DType,
) -> VortexResult<ListArray> {
let len: usize = chunks.iter().map(|c| c.len()).sum();
let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
offsets.push(0);
let mut elements = Vec::new();
let elem_dtype = dtype
.as_list_element()
.vortex_expect("ListArray must have List dtype");

for chunk in chunks {
let chunk = chunk.to_list()?;
Expand Down Expand Up @@ -168,145 +113,27 @@ fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexR
ListArray::try_new(chunked_elements, offsets.into_array(), validity)
}

/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
/// StructArray, where the Array for each Field is a ChunkedArray.
///
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
/// been checked to have the same DType already.
fn swizzle_struct_chunks(
chunks: &[ArrayRef],
validity: Validity,
struct_dtype: &StructDType,
) -> VortexResult<StructArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut field_arrays = Vec::new();

for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
let field_chunks = chunks
.iter()
.map(|c| {
c.as_struct_typed()
.vortex_expect("Chunk was not a StructArray")
.maybe_null_field_by_idx(field_idx)
.vortex_expect("Invalid chunked array")
})
.collect::<Vec<_>>();
let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
field_arrays.push(field_array.into_array());
}

StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
}

/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array.
///
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
/// been checked to have the same DType already.
fn pack_bools(chunks: &[ArrayRef], validity: Validity) -> VortexResult<BoolArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut buffer = BooleanBufferBuilder::new(len);
for chunk in chunks {
let chunk = chunk.to_bool()?;
buffer.append_buffer(chunk.boolean_buffer());
}
Ok(BoolArray::new(buffer.finish(), validity))
}

/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single
/// contiguous array.
///
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
/// been checked to have the same DType already.
fn pack_primitives<T: NativePType>(
chunks: &[ArrayRef],
validity: Validity,
) -> VortexResult<PrimitiveArray> {
let total_len = chunks.iter().map(|a| a.len()).sum();
let mut buffer = BufferMut::with_capacity(total_len);
for chunk in chunks {
let chunk = chunk.to_primitive()?;
buffer.extend_from_slice(chunk.as_slice::<T>());
}
Ok(PrimitiveArray::new(buffer.freeze(), validity))
}

/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single
/// contiguous array.
///
/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
/// been checked to have the same DType already.
fn pack_views(
chunks: &[ArrayRef],
dtype: &DType,
validity: Validity,
) -> VortexResult<VarBinViewArray> {
let total_len = chunks.iter().map(|a| a.len()).sum();
let mut views = BufferMut::with_capacity(total_len);
let mut buffers = Vec::new();
for chunk in chunks {
let buffers_offset = u32::try_from(buffers.len())?;
let canonical_chunk = chunk.to_varbinview()?;
buffers.extend(canonical_chunk.buffers().iter().cloned());

views.extend(
canonical_chunk
.views()
.iter()
.map(|view| view.offset_view(buffers_offset)),
);
}

VarBinViewArray::try_new(views.freeze(), buffers, dtype.clone(), validity)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use vortex_dtype::DType;
use vortex_dtype::DType::{List, Primitive};
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::PType::I32;

use crate::ToCanonical;
use crate::accessor::ArrayAccessor;
use crate::array::Array;
use crate::arrays::chunked::canonical::pack_views;
use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
use crate::compute::{scalar_at, slice};
use crate::compute::scalar_at;
use crate::validity::Validity;
use crate::variants::StructArrayTrait;

fn stringview_array() -> VarBinViewArray {
VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"])
}

#[test]
pub fn pack_sliced_varbin() {
let array1 = slice(&stringview_array(), 1, 3).unwrap();
let array2 = slice(&stringview_array(), 2, 4).unwrap();
let packed = pack_views(
&[array1, array2],
&DType::Utf8(NonNullable),
Validity::NonNullable,
)
.unwrap();
assert_eq!(packed.len(), 4);
let values = packed
.with_iterator(|iter| {
iter.flatten()
.map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
.collect::<Vec<_>>()
})
.unwrap();
assert_eq!(values, &["bar", "baz", "baz", "quak"]);
}

#[test]
pub fn pack_nested_structs() {
let struct_array = StructArray::try_new(
vec!["a".into()].into(),
vec![stringview_array().into_array()],
vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
4,
Validity::NonNullable,
)
Expand Down
Loading