-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: teach BitPackedEncoding canonical_into #2324
Changes from 3 commits
3a24da2
77b779c
10da4b4
1045a96
e4cbde9
409a98c
715c7df
eeee805
89a4e90
fa243bf
e063b93
348e3c4
05ac017
8308c6a
2639bb1
db8bdfe
801eeeb
32ec159
e6d3435
2fb3a35
8b1f1c7
6dd57c0
549f68e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,8 +105,10 @@ fn canonical_into<T: NativePType>(bencher: Bencher, (arr_len, chunk_count): (usi | |
.into_array(); | ||
|
||
bencher.bench(|| { | ||
let mut primitive_builder = | ||
PrimitiveBuilder::<T>::with_capacity(arr.dtype().nullability(), arr_len * chunk_count); | ||
let mut primitive_builder = PrimitiveBuilder::<T>::with_capacity( | ||
arr.dtype().nullability(), | ||
arr_len * chunk_count + 1024, | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This + 1024 values is really important we need to make sure that it is actually applied in the ctor of the builders if needed, or maybe always? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm not sure how best to approach this. Perhaps arrays should have a size_hint? Perhaps every builder should assume it needs a magic 1024 extra elements of space? 1024 is kind of a magic number in Vortex. |
||
chunked | ||
joseph-isaacs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.clone() | ||
.canonicalize_into(&mut primitive_builder) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,17 @@ | ||
use std::ops::DerefMut as _; | ||
|
||
use arrow_buffer::ArrowNativeType; | ||
use fastlanes::BitPacking; | ||
use vortex_array::array::PrimitiveArray; | ||
use vortex_array::builders::{ArrayBuilder as _, PrimitiveBuilder}; | ||
use vortex_array::patches::Patches; | ||
use vortex_array::validity::Validity; | ||
use vortex_array::variants::PrimitiveArrayTrait; | ||
use vortex_array::IntoArray; | ||
use vortex_buffer::{buffer, Buffer, BufferMut, ByteBuffer}; | ||
use vortex_buffer::{Buffer, BufferMut, ByteBuffer}; | ||
use vortex_dtype::{ | ||
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType, | ||
match_each_integer_ptype, match_each_integer_ptype_with_unsigned_type, | ||
match_each_unsigned_integer_ptype, NativePType, PType, | ||
}; | ||
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; | ||
use vortex_scalar::Scalar; | ||
|
@@ -196,39 +200,52 @@ pub fn gather_patches( | |
} | ||
|
||
pub fn unpack(array: BitPackedArray) -> VortexResult<PrimitiveArray> { | ||
match_each_integer_ptype_with_unsigned_type!(array.ptype(), |$P, $UnsignedT| { | ||
unpack_primitive::<$P, $UnsignedT>(array) | ||
}) | ||
} | ||
|
||
pub fn unpack_primitive<T: NativePType, UnsignedT: NativePType + BitPacking>( | ||
array: BitPackedArray, | ||
) -> VortexResult<PrimitiveArray> { | ||
let mut builder = | ||
PrimitiveBuilder::with_capacity(array.dtype().nullability(), array.len() + 1024); | ||
assert!(size_of::<T>() == size_of::<UnsignedT>()); | ||
unpack_into::<T, UnsignedT, _, _>( | ||
array, | ||
&mut builder, | ||
|x| unsafe { std::mem::transmute(x) }, | ||
|x| unsafe { std::mem::transmute(x) }, | ||
)?; | ||
builder.finish_into_primitive() | ||
} | ||
|
||
pub(crate) fn unpack_into<T: NativePType, UnsignedT: NativePType + BitPacking, F, G>( | ||
array: BitPackedArray, | ||
// TODO(ngates): do we want to use fastlanes alignment for this buffer? | ||
builder: &mut PrimitiveBuilder<T>, | ||
transmute: F, | ||
transmute_mut: G, | ||
) -> VortexResult<()> | ||
where | ||
F: Fn(&[UnsignedT]) -> &[T], | ||
G: Fn(&mut [T]) -> &mut [UnsignedT], | ||
{ | ||
let bit_width = array.bit_width() as usize; | ||
let length = array.len(); | ||
let offset = array.offset() as usize; | ||
let ptype = array.ptype(); | ||
let mut unpacked = match_each_unsigned_integer_ptype!(ptype.to_unsigned(), |$P| { | ||
PrimitiveArray::new( | ||
unpack_primitive::<$P>(array.packed_slice::<$P>(), bit_width, offset, length), | ||
array.validity(), | ||
) | ||
}); | ||
|
||
// Cast to signed if necessary | ||
if ptype.is_signed_int() { | ||
unpacked = unpacked.reinterpret_cast(ptype); | ||
} | ||
|
||
if let Some(patches) = array.patches() { | ||
unpacked.patch(patches) | ||
} else { | ||
Ok(unpacked) | ||
} | ||
} | ||
builder.nulls.append_validity(array.validity(), length)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think this should live in the builder with a method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now, all the builder methods act on both the values and the nulls. I think I prefer the interface to either be: update values and nulls together or directly use the nulls and the values separately. I could add a |
||
|
||
pub fn unpack_primitive<T: NativePType + BitPacking>( | ||
packed: &[T], | ||
bit_width: usize, | ||
offset: usize, | ||
length: usize, | ||
) -> Buffer<T> { | ||
if bit_width == 0 { | ||
return buffer![T::zero(); length]; | ||
builder.append_zeros(length); | ||
return Ok(()); | ||
} | ||
|
||
let packed = array.packed_slice::<UnsignedT>(); | ||
let builder_current_length = builder.len(); | ||
builder.values.reserve((length + 1023) / 1024 * 1024); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So do you think this is the part that reallocated and effects perf? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We know the length of the array, so could you instead allocated a 1024 * |T| and decode the last block into that and then copy values from this to the builder? (If there is not already enough space). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the benchmarks, we ensure the resulting builder is big enough to hold all the elements plus any scratch space needed by the decoded (in this case, 1024 empty spots, though only 1023 are needed). This operation shouldn't allocate in the benchmarks. I've verified this in the profile (there's no Yeah, what you're describing is a reasonable alternative to adding extra space to the builders. I'd be happy to do that, though at this point it feels like we should do that in a follow up PR since this one is fairly large already. |
||
|
||
// How many fastlanes vectors we will process. | ||
// Packed array might not start at 0 when the array is sliced. Offset is guaranteed to be < 1024. | ||
let num_chunks = (offset + length + 1023) / 1024; | ||
|
@@ -241,42 +258,48 @@ pub fn unpack_primitive<T: NativePType + BitPacking>( | |
num_chunks * elems_per_chunk | ||
); | ||
|
||
// Allocate a result vector. | ||
// TODO(ngates): do we want to use fastlanes alignment for this buffer? | ||
let mut output = BufferMut::with_capacity(num_chunks * 1024 - offset); | ||
|
||
// Handle first chunk if offset is non 0. We have to decode the chunk and skip first offset elements | ||
let first_full_chunk = if offset != 0 { | ||
let chunk: &[T] = &packed[0..elems_per_chunk]; | ||
let mut decoded = [T::zero(); 1024]; | ||
let chunk = &packed[0..elems_per_chunk]; | ||
let mut decoded = [UnsignedT::zero(); 1024]; | ||
unsafe { BitPacking::unchecked_unpack(bit_width, chunk, &mut decoded) }; | ||
output.extend_from_slice(&decoded[offset..]); | ||
builder | ||
.values | ||
.extend_from_slice(transmute(&decoded[offset..])); | ||
1 | ||
} else { | ||
0 | ||
}; | ||
|
||
// Loop over all the chunks. | ||
(first_full_chunk..num_chunks).for_each(|i| { | ||
let chunk: &[T] = &packed[i * elems_per_chunk..][0..elems_per_chunk]; | ||
Comment on lines
-260
to
-261
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OOI, why did you move from a for each to a loop? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I needed a |
||
for i in first_full_chunk..num_chunks { | ||
let chunk = &packed[i * elems_per_chunk..][0..elems_per_chunk]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there a lots of chunks (>L2) cache that we want to apply patches after each loop iteration? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reasonable, but then I need to slice/chunk/partition the patches which is not free. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True |
||
|
||
// SAFETY: | ||
// | ||
// 1. unchecked_unpack only writes into the output and when it is finished, all the outputs | ||
// have been written to. | ||
// | ||
// 2. The output buffer is exactly size 1024. | ||
unsafe { | ||
let output_len = output.len(); | ||
output.set_len(output_len + 1024); | ||
BitPacking::unchecked_unpack(bit_width, chunk, &mut output[output_len..][0..1024]); | ||
builder.values.set_len(builder.values.len() + 1024); | ||
BitPacking::unchecked_unpack( | ||
bit_width, | ||
chunk, | ||
&mut transmute_mut(builder.values.deref_mut()) | ||
[builder_current_length + i * 1024 - offset..][..1024], | ||
); | ||
} | ||
}); | ||
} | ||
|
||
// The final chunk may have had padding | ||
output.truncate(length); | ||
builder.truncate(builder_current_length + length); | ||
|
||
assert_eq!( | ||
output.len(), | ||
length, | ||
"Expected unpacked array to be of length {} but got {}", | ||
length, | ||
output.len() | ||
); | ||
output.freeze() | ||
if let Some(patches) = array.patches() { | ||
builder.patch(patches, builder_current_length)?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult<Scalar> { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,9 @@ use vortex_array::vtable::{ | |
CanonicalVTable, StatisticsVTable, ValidateVTable, ValidityVTable, VariantsVTable, | ||
VisitorVTable, | ||
}; | ||
use vortex_array::{encoding_ids, impl_encoding, Array, Canonical, IntoArray, RkyvMetadata}; | ||
use vortex_array::{encoding_ids, impl_encoding, Array, Canonical, RkyvMetadata}; | ||
use vortex_buffer::ByteBuffer; | ||
use vortex_dtype::{DType, NativePType, PType}; | ||
use vortex_dtype::{match_each_integer_ptype_with_unsigned_type, DType, NativePType, PType}; | ||
use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; | ||
use vortex_mask::Mask; | ||
|
||
|
@@ -264,8 +264,17 @@ impl CanonicalVTable<BitPackedArray> for BitPackedEncoding { | |
array: BitPackedArray, | ||
builder: &mut dyn ArrayBuilder, | ||
) -> VortexResult<()> { | ||
// TODO(joe): add specialised impl | ||
builder.extend_from_array(array.into_array()) | ||
match_each_integer_ptype_with_unsigned_type!(array.ptype(), |$T, $UnsignedT| { | ||
unpack_into::<$T, $UnsignedT, _, _>( | ||
array, | ||
builder | ||
.as_any_mut() | ||
.downcast_mut() | ||
.vortex_expect("bit packed array must canonicalize into a primitive array"), | ||
|x| unsafe { std::mem::transmute(x) }, | ||
|x| unsafe { std::mem::transmute(x) }, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some safety and explanation comment would be good here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
) | ||
}) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ use vortex_dtype::Nullability::{NonNullable, Nullable}; | |
use vortex_error::{vortex_panic, VortexExpect, VortexResult}; | ||
|
||
use crate::validity::Validity; | ||
use crate::IntoArrayVariant as _; | ||
|
||
/// This is borrowed from arrow's null buffer builder, however we expose a `append_buffer` | ||
/// method to append a boolean buffer directly. | ||
|
@@ -80,11 +81,51 @@ impl LazyNullBufferBuilder { | |
.append_buffer(&bool_buffer); | ||
} | ||
|
||
pub fn append_validity(&mut self, validity: Validity, length: usize) -> VortexResult<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had been doing this with a validity mask. I wonder if we should choose either of the two methods and use them consistently
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should use validity_mask here, it should perf better than creating a validity and then a boolean buffer below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I went with the following and changed
|
||
match validity { | ||
Validity::NonNullable => {} | ||
Validity::AllValid => { | ||
self.append_n_non_nulls(length); | ||
} | ||
Validity::AllInvalid => { | ||
self.append_n_nulls(length); | ||
} | ||
Validity::Array(array) => { | ||
self.append_buffer(array.into_bool()?.boolean_buffer()); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
pub fn set_bit(&mut self, index: usize, v: bool) { | ||
self.materialize_if_needed(); | ||
self.inner | ||
.as_mut() | ||
.vortex_expect("buffer just materialized") | ||
.set_bit(index, v); | ||
} | ||
|
||
pub fn len(&self) -> usize { | ||
// self.len is the length of the builder if the inner buffer is not materialized | ||
self.inner.as_ref().map(|i| i.len()).unwrap_or(self.len) | ||
} | ||
|
||
pub fn truncate(&mut self, len: usize) { | ||
if let Some(b) = self.inner.as_mut() { | ||
b.truncate(len) | ||
} | ||
self.len = len; | ||
} | ||
|
||
pub fn reserve(&mut self, n: usize) { | ||
self.materialize_if_needed(); | ||
self.inner | ||
.as_mut() | ||
.vortex_expect("buffer just materialized") | ||
.reserve(n); | ||
} | ||
|
||
pub fn finish(&mut self) -> Option<NullBuffer> { | ||
self.len = 0; | ||
Some(NullBuffer::new(self.inner.take()?.finish())) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,20 +1,21 @@ | ||||||||||||||||
use std::any::Any; | ||||||||||||||||
|
||||||||||||||||
use vortex_buffer::BufferMut; | ||||||||||||||||
use vortex_dtype::{DType, NativePType, Nullability}; | ||||||||||||||||
use vortex_dtype::{match_each_unsigned_integer_ptype, DType, NativePType, Nullability}; | ||||||||||||||||
use vortex_error::{vortex_bail, VortexResult}; | ||||||||||||||||
use vortex_mask::AllOr; | ||||||||||||||||
|
||||||||||||||||
use crate::array::{BoolArray, PrimitiveArray}; | ||||||||||||||||
use crate::builders::lazy_validity_builder::LazyNullBufferBuilder; | ||||||||||||||||
use crate::builders::ArrayBuilder; | ||||||||||||||||
use crate::patches::Patches; | ||||||||||||||||
use crate::validity::Validity; | ||||||||||||||||
use crate::variants::PrimitiveArrayTrait; | ||||||||||||||||
use crate::{Array, IntoArray, IntoCanonical}; | ||||||||||||||||
use crate::{Array, IntoArray, IntoArrayVariant as _, IntoCanonical}; | ||||||||||||||||
|
||||||||||||||||
pub struct PrimitiveBuilder<T: NativePType> { | ||||||||||||||||
values: BufferMut<T>, | ||||||||||||||||
nulls: LazyNullBufferBuilder, | ||||||||||||||||
pub values: BufferMut<T>, | ||||||||||||||||
pub nulls: LazyNullBufferBuilder, | ||||||||||||||||
dtype: DType, | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
|
@@ -45,6 +46,55 @@ impl<T: NativePType> PrimitiveBuilder<T> { | |||||||||||||||
None => self.append_null(), | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
pub fn patch(&mut self, patches: Patches, starting_at: usize) -> VortexResult<()> { | ||||||||||||||||
let (array_len, offset, indices, values) = patches.into_parts(); | ||||||||||||||||
assert!(starting_at + array_len == self.len()); | ||||||||||||||||
let indices = indices.into_primitive()?; | ||||||||||||||||
let values = values.into_primitive()?; | ||||||||||||||||
let validity = values.validity_mask()?; | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the validity is all true, then remove the check from the below loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||
let values = values.as_slice::<T>(); | ||||||||||||||||
match_each_unsigned_integer_ptype!(indices.ptype(), |$P| { | ||||||||||||||||
for (compressed_index, decompressed_index) in indices.into_primitive()?.as_slice::<$P>().into_iter().enumerate() { | ||||||||||||||||
let decompressed_index = *decompressed_index as usize; | ||||||||||||||||
let out_index = starting_at + decompressed_index - offset; | ||||||||||||||||
if validity.value(compressed_index) { | ||||||||||||||||
self.values[out_index] = values[compressed_index] | ||||||||||||||||
} else { | ||||||||||||||||
self.nulls.set_bit(out_index, false) | ||||||||||||||||
} | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think you can remove the branch if you ensure that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did this. |
||||||||||||||||
} | ||||||||||||||||
}); | ||||||||||||||||
|
||||||||||||||||
Ok(()) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this live in its own fn There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
pub fn truncate(&mut self, len: usize) { | ||||||||||||||||
self.values.truncate(len); | ||||||||||||||||
self.nulls.truncate(len); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
pub fn finish_into_primitive(&mut self) -> VortexResult<PrimitiveArray> { | ||||||||||||||||
let validity = match (self.nulls.finish(), self.dtype().nullability()) { | ||||||||||||||||
(None, Nullability::NonNullable) => Validity::NonNullable, | ||||||||||||||||
(Some(_), Nullability::NonNullable) => { | ||||||||||||||||
vortex_bail!("Non-nullable builder has null values") | ||||||||||||||||
} | ||||||||||||||||
(None, Nullability::Nullable) => Validity::AllValid, | ||||||||||||||||
(Some(nulls), Nullability::Nullable) => { | ||||||||||||||||
if nulls.null_count() == nulls.len() { | ||||||||||||||||
Validity::AllInvalid | ||||||||||||||||
} else { | ||||||||||||||||
Validity::Array(BoolArray::from(nulls.into_inner()).into_array()) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
}; | ||||||||||||||||
|
||||||||||||||||
Ok(PrimitiveArray::new( | ||||||||||||||||
std::mem::take(&mut self.values).freeze(), | ||||||||||||||||
validity, | ||||||||||||||||
)) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
impl<T: NativePType> ArrayBuilder for PrimitiveBuilder<T> { | ||||||||||||||||
|
@@ -94,21 +144,6 @@ impl<T: NativePType> ArrayBuilder for PrimitiveBuilder<T> { | |||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
fn finish(&mut self) -> VortexResult<Array> { | ||||||||||||||||
let validity = match (self.nulls.finish(), self.dtype().nullability()) { | ||||||||||||||||
(None, Nullability::NonNullable) => Validity::NonNullable, | ||||||||||||||||
(Some(_), Nullability::NonNullable) => { | ||||||||||||||||
vortex_bail!("Non-nullable builder has null values") | ||||||||||||||||
} | ||||||||||||||||
(None, Nullability::Nullable) => Validity::AllValid, | ||||||||||||||||
(Some(nulls), Nullability::Nullable) => { | ||||||||||||||||
if nulls.null_count() == nulls.len() { | ||||||||||||||||
Validity::AllInvalid | ||||||||||||||||
} else { | ||||||||||||||||
Validity::Array(BoolArray::from(nulls.into_inner()).into_array()) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
}; | ||||||||||||||||
|
||||||||||||||||
Ok(PrimitiveArray::new(std::mem::take(&mut self.values).freeze(), validity).into_array()) | ||||||||||||||||
self.finish_into_primitive().map(IntoArray::into_array) | ||||||||||||||||
} | ||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates patches