diff --git a/vortex-array/src/arrays/chunked/canonical.rs b/vortex-array/src/arrays/chunked/canonical.rs index c9526fb700..82c6c9e96c 100644 --- a/vortex-array/src/arrays/chunked/canonical.rs +++ b/vortex-array/src/arrays/chunked/canonical.rs @@ -1,24 +1,36 @@ -use arrow_buffer::BooleanBufferBuilder; use vortex_buffer::BufferMut; -use vortex_dtype::{DType, NativePType, Nullability, PType, StructDType, match_each_native_ptype}; +use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_error::{VortexExpect, VortexResult, vortex_err}; -use crate::array::ArrayCanonicalImpl; -use crate::arrays::chunked::ChunkedArray; -use crate::arrays::extension::ExtensionArray; -use crate::arrays::null::NullArray; -use crate::arrays::primitive::PrimitiveArray; -use crate::arrays::struct_::StructArray; -use crate::arrays::{BoolArray, ListArray, VarBinViewArray}; -use crate::builders::ArrayBuilder; +use super::ChunkedArray; +use crate::arrays::{ListArray, PrimitiveArray, StructArray}; +use crate::builders::{ArrayBuilder, builder_with_capacity}; use crate::compute::{scalar_at, slice, try_cast}; use crate::validity::Validity; -use crate::{Array, ArrayRef, ArrayVariants, Canonical, ToCanonical}; +use crate::{Array as _, ArrayCanonicalImpl, ArrayRef, Canonical, ToCanonical}; impl ArrayCanonicalImpl for ChunkedArray { fn _to_canonical(&self) -> VortexResult { - let validity = Validity::copy_from_array(self)?; - try_canonicalize_chunks(self.chunks(), validity, self.dtype()) + match self.dtype() { + DType::Struct(struct_dtype, _) => { + let struct_array = swizzle_struct_chunks( + self.chunks(), + Validity::copy_from_array(self)?, + struct_dtype, + )?; + Ok(Canonical::Struct(struct_array)) + } + DType::List(elem, _) => Ok(Canonical::List(pack_lists( + self.chunks(), + Validity::copy_from_array(self)?, + elem, + )?)), + _ => { + let mut builder = builder_with_capacity(self.dtype(), self.len()); + self.append_to_builder(builder.as_mut())?; + builder.finish().to_canonical() + } + } } fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> { @@ -29,109 +41,42 @@ impl ArrayCanonicalImpl for ChunkedArray { } } -pub(crate) fn try_canonicalize_chunks( +/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single +/// StructArray, where the Array for each Field is a ChunkedArray. +fn swizzle_struct_chunks( chunks: &[ArrayRef], validity: Validity, - dtype: &DType, -) -> VortexResult { - match dtype { - // Structs can have their internal field pointers swizzled to push the chunking down - // one level internally without copying or decompressing any data. - DType::Struct(struct_dtype, _) => { - let struct_array = swizzle_struct_chunks(chunks, validity, struct_dtype)?; - Ok(Canonical::Struct(struct_array)) - } - - // Extension arrays are containers that wraps an inner storage array with some metadata. - // We delegate to the canonical format of the internal storage array for every chunk, and - // push the chunking down into the inner storage array. - // - // Input: - // ------ - // - // ChunkedArray - // / \ - // / \ - // ExtensionArray ExtensionArray - // | | - // storage storage - // - // - // Output: - // ------ - // - // ExtensionArray - // | - // ChunkedArray - // / \ - // storage storage - // - DType::Extension(ext_dtype) => { - // Recursively apply canonicalization and packing to the storage array backing - // each chunk of the extension array. - let storage_chunks: Vec = chunks - .iter() - // Extension-typed arrays can be compressed into something that is not an - // ExtensionArray, so we should canonicalize each chunk into ExtensionArray first. - .map(|chunk| { - chunk - .clone() - .as_extension_typed() - .vortex_expect("Chunk could not be downcast to ExtensionArrayTrait") - .storage_data() - }) - .collect(); - let storage_dtype = ext_dtype.storage_dtype().clone(); - let chunked_storage = - ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array(); - - Ok(Canonical::Extension(ExtensionArray::new( - ext_dtype.clone(), - chunked_storage, - ))) - } - - DType::List(..) => { - // TODO(joe): improve performance, use a listview, once it exists - - let list = pack_lists(chunks, validity, dtype)?; - Ok(Canonical::List(list)) - } + struct_dtype: &StructDType, +) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let mut field_arrays = Vec::new(); - DType::Bool(_) => { - let bool_array = pack_bools(chunks, validity)?; - Ok(Canonical::Bool(bool_array)) - } - DType::Primitive(ptype, _) => { - match_each_native_ptype!(ptype, |$P| { - let prim_array = pack_primitives::<$P>(chunks, validity)?; - Ok(Canonical::Primitive(prim_array)) + for (field_idx, field_dtype) in struct_dtype.fields().enumerate() { + let field_chunks = chunks + .iter() + .map(|c| { + c.as_struct_typed() + .vortex_expect("Chunk was not a StructArray") + .maybe_null_field_by_idx(field_idx) + .vortex_expect("Invalid chunked array") }) - } - DType::Utf8(_) => { - let varbin_array = pack_views(chunks, dtype, validity)?; - Ok(Canonical::VarBinView(varbin_array)) - } - DType::Binary(_) => { - let varbin_array = pack_views(chunks, dtype, validity)?; - Ok(Canonical::VarBinView(varbin_array)) - } - DType::Null => { - let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let null_array = NullArray::new(len); - Ok(Canonical::Null(null_array)) - } + .collect::>(); + let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; + field_arrays.push(field_array.into_array()); } + + StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity) } -fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexResult { +fn pack_lists( + chunks: &[ArrayRef], + validity: Validity, + elem_dtype: &DType, +) -> VortexResult { let len: usize = chunks.iter().map(|c| c.len()).sum(); let mut offsets = BufferMut::::with_capacity(len + 1); offsets.push(0); let mut elements = Vec::new(); - let elem_dtype = dtype - .as_list_element() - .vortex_expect("ListArray must have List dtype"); for chunk in chunks { let chunk = chunk.to_list()?; @@ -168,102 +113,10 @@ fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexR ListArray::try_new(chunked_elements, offsets.into_array(), validity) } -/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single -/// StructArray, where the Array for each Field is a ChunkedArray. -/// -/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have -/// been checked to have the same DType already. -fn swizzle_struct_chunks( - chunks: &[ArrayRef], - validity: Validity, - struct_dtype: &StructDType, -) -> VortexResult { - let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let mut field_arrays = Vec::new(); - - for (field_idx, field_dtype) in struct_dtype.fields().enumerate() { - let field_chunks = chunks - .iter() - .map(|c| { - c.as_struct_typed() - .vortex_expect("Chunk was not a StructArray") - .maybe_null_field_by_idx(field_idx) - .vortex_expect("Invalid chunked array") - }) - .collect::>(); - let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; - field_arrays.push(field_array.into_array()); - } - - StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity) -} - -/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. -/// -/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have -/// been checked to have the same DType already. -fn pack_bools(chunks: &[ArrayRef], validity: Validity) -> VortexResult { - let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let mut buffer = BooleanBufferBuilder::new(len); - for chunk in chunks { - let chunk = chunk.to_bool()?; - buffer.append_buffer(chunk.boolean_buffer()); - } - Ok(BoolArray::new(buffer.finish(), validity)) -} - -/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single -/// contiguous array. -/// -/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have -/// been checked to have the same DType already. -fn pack_primitives( - chunks: &[ArrayRef], - validity: Validity, -) -> VortexResult { - let total_len = chunks.iter().map(|a| a.len()).sum(); - let mut buffer = BufferMut::with_capacity(total_len); - for chunk in chunks { - let chunk = chunk.to_primitive()?; - buffer.extend_from_slice(chunk.as_slice::()); - } - Ok(PrimitiveArray::new(buffer.freeze(), validity)) -} - -/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single -/// contiguous array. -/// -/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have -/// been checked to have the same DType already. -fn pack_views( - chunks: &[ArrayRef], - dtype: &DType, - validity: Validity, -) -> VortexResult { - let total_len = chunks.iter().map(|a| a.len()).sum(); - let mut views = BufferMut::with_capacity(total_len); - let mut buffers = Vec::new(); - for chunk in chunks { - let buffers_offset = u32::try_from(buffers.len())?; - let canonical_chunk = chunk.to_varbinview()?; - buffers.extend(canonical_chunk.buffers().iter().cloned()); - - views.extend( - canonical_chunk - .views() - .iter() - .map(|view| view.offset_view(buffers_offset)), - ); - } - - VarBinViewArray::try_new(views.freeze(), buffers, dtype.clone(), validity) -} - #[cfg(test)] mod tests { use std::sync::Arc; - use vortex_dtype::DType; use vortex_dtype::DType::{List, Primitive}; use vortex_dtype::Nullability::NonNullable; use vortex_dtype::PType::I32; @@ -271,42 +124,16 @@ mod tests { use crate::ToCanonical; use crate::accessor::ArrayAccessor; use crate::array::Array; - use crate::arrays::chunked::canonical::pack_views; use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray}; - use crate::compute::{scalar_at, slice}; + use crate::compute::scalar_at; use crate::validity::Validity; use crate::variants::StructArrayTrait; - fn stringview_array() -> VarBinViewArray { - VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]) - } - - #[test] - pub fn pack_sliced_varbin() { - let array1 = slice(&stringview_array(), 1, 3).unwrap(); - let array2 = slice(&stringview_array(), 2, 4).unwrap(); - let packed = pack_views( - &[array1, array2], - &DType::Utf8(NonNullable), - Validity::NonNullable, - ) - .unwrap(); - assert_eq!(packed.len(), 4); - let values = packed - .with_iterator(|iter| { - iter.flatten() - .map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) }) - .collect::>() - }) - .unwrap(); - assert_eq!(values, &["bar", "baz", "baz", "quak"]); - } - #[test] pub fn pack_nested_structs() { let struct_array = StructArray::try_new( vec!["a".into()].into(), - vec![stringview_array().into_array()], + vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()], 4, Validity::NonNullable, )