Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: teach BitPackedEncoding canonical_into #2324

Merged
merged 23 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3a24da2
feat: teach BitPackedEncoding canonical_into
danking Feb 11, 2025
77b779c
unnecessary import
danking Feb 11, 2025
10da4b4
fix PrimitiveBuilder::patch
danking Feb 11, 2025
1045a96
with_inputs
danking Feb 12, 2025
e4cbde9
better benchmarks: more sizes, more chunks, patches, move tests
danking Feb 12, 2025
409a98c
append_validity_mask
danking Feb 12, 2025
715c7df
fix patch to loop over validity and then separately loop over values
danking Feb 12, 2025
eeee805
extract macro into parameterized method in PrimitiveBuilder::patch
danking Feb 12, 2025
89a4e90
add nullable arrays
danking Feb 12, 2025
fa243bf
patch validity, but correct this time
danking Feb 12, 2025
e063b93
do not trigger validity materialization for non-null validity
danking Feb 12, 2025
348e3c4
Merge remote-tracking branch 'origin/develop' into dk/bitpacking-builder
danking Feb 12, 2025
05ac017
use new append validity mask
danking Feb 12, 2025
8308c6a
extract function out of match
danking Feb 13, 2025
2639bb1
guard on non-nullability not validity
danking Feb 13, 2025
db8bdfe
one loop
danking Feb 13, 2025
801eeeb
fix test code
danking Feb 13, 2025
32ec159
Merge remote-tracking branch 'origin/develop' into dk/bitpacking-builder
danking Feb 13, 2025
e6d3435
safety comment
danking Feb 14, 2025
2fb3a35
special handling of last chunk, no extra allocations
danking Feb 14, 2025
8b1f1c7
constants for bench args
danking Feb 14, 2025
6dd57c0
correct the case of a single chunk which is both offset and short
danking Feb 14, 2025
549f68e
Merge remote-tracking branch 'origin/develop' into dk/bitpacking-builder
danking Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions encodings/fastlanes/benches/canonicalize_bench.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates patches

    let mut rng = StdRng::seed_from_u64(0);
    let values = (0..len)
        .map(|_| {
            if rng.gen_bool(0.99) {
                T::from(rng.gen_range(0..6)).vortex_expect("valid value")
                    + T::from(7).vortex_expect("sdf")
            } else {
                T::from(rng.gen_range(0..100)).vortex_expect("valid value")
                    * T::from(100).expect("valid value")
            }
        })
        .collect::<BufferMut<T>>()
        .into_array()
        .into_primitive()
        .vortex_unwrap();

    let bit = bitpack_encode(values, 12).vortex_unwrap();

Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ fn canonical_into<T: NativePType>(bencher: Bencher, (arr_len, chunk_count): (usi
.into_array();

bencher.bench(|| {
let mut primitive_builder =
PrimitiveBuilder::<T>::with_capacity(arr.dtype().nullability(), arr_len * chunk_count);
let mut primitive_builder = PrimitiveBuilder::<T>::with_capacity(
arr.dtype().nullability(),
arr_len * chunk_count + 1024,
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This + 1024 values is really important we need to make sure that it is actually applied in the ctor of the builders if needed, or maybe always?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure how best to approach this. Perhaps arrays should have a size_hint? Perhaps every builder should assume it needs a magic 1024 extra elements of space? 1024 is kind of a magic number in Vortex.

chunked
.clone()
.canonicalize_into(&mut primitive_builder)
Expand Down
121 changes: 72 additions & 49 deletions encodings/fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::ops::DerefMut as _;

use arrow_buffer::ArrowNativeType;
use fastlanes::BitPacking;
use vortex_array::array::PrimitiveArray;
use vortex_array::builders::{ArrayBuilder as _, PrimitiveBuilder};
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::IntoArray;
use vortex_buffer::{buffer, Buffer, BufferMut, ByteBuffer};
use vortex_buffer::{Buffer, BufferMut, ByteBuffer};
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
match_each_integer_ptype, match_each_integer_ptype_with_unsigned_type,
match_each_unsigned_integer_ptype, NativePType, PType,
};
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_scalar::Scalar;
Expand Down Expand Up @@ -196,39 +200,52 @@ pub fn gather_patches(
}

pub fn unpack(array: BitPackedArray) -> VortexResult<PrimitiveArray> {
match_each_integer_ptype_with_unsigned_type!(array.ptype(), |$P, $UnsignedT| {
unpack_primitive::<$P, $UnsignedT>(array)
})
}

pub fn unpack_primitive<T: NativePType, UnsignedT: NativePType + BitPacking>(
array: BitPackedArray,
) -> VortexResult<PrimitiveArray> {
let mut builder =
PrimitiveBuilder::with_capacity(array.dtype().nullability(), array.len() + 1024);
assert!(size_of::<T>() == size_of::<UnsignedT>());
unpack_into::<T, UnsignedT, _, _>(
array,
&mut builder,
|x| unsafe { std::mem::transmute(x) },
|x| unsafe { std::mem::transmute(x) },
)?;
builder.finish_into_primitive()
}

pub(crate) fn unpack_into<T: NativePType, UnsignedT: NativePType + BitPacking, F, G>(
array: BitPackedArray,
// TODO(ngates): do we want to use fastlanes alignment for this buffer?
builder: &mut PrimitiveBuilder<T>,
transmute: F,
transmute_mut: G,
) -> VortexResult<()>
where
F: Fn(&[UnsignedT]) -> &[T],
G: Fn(&mut [T]) -> &mut [UnsignedT],
{
let bit_width = array.bit_width() as usize;
let length = array.len();
let offset = array.offset() as usize;
let ptype = array.ptype();
let mut unpacked = match_each_unsigned_integer_ptype!(ptype.to_unsigned(), |$P| {
PrimitiveArray::new(
unpack_primitive::<$P>(array.packed_slice::<$P>(), bit_width, offset, length),
array.validity(),
)
});

// Cast to signed if necessary
if ptype.is_signed_int() {
unpacked = unpacked.reinterpret_cast(ptype);
}

if let Some(patches) = array.patches() {
unpacked.patch(patches)
} else {
Ok(unpacked)
}
}
builder.nulls.append_validity(array.validity(), length)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this should live in the builder with a method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, all the builder methods act on both the values and the nulls. I think I prefer the interface to either be: update values and nulls together or directly use the nulls and the values separately. I could add a parts(&mut self) -> (&mut BufferMut<T>, &mut LazyNullBufferBuilder) method.


pub fn unpack_primitive<T: NativePType + BitPacking>(
packed: &[T],
bit_width: usize,
offset: usize,
length: usize,
) -> Buffer<T> {
if bit_width == 0 {
return buffer![T::zero(); length];
builder.append_zeros(length);
return Ok(());
}

let packed = array.packed_slice::<UnsignedT>();
let builder_current_length = builder.len();
builder.values.reserve((length + 1023) / 1024 * 1024);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do you think this is the part that reallocated and effects perf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know the length of the array, so could you instead allocated a 1024 * |T| and decode the last block into that and then copy values from this to the builder? (If there is not already enough space).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the benchmarks, we ensure the resulting builder is big enough to hold all the elements plus any scratch space needed by the decoded (in this case, 1024 empty spots, though only 1023 are needed). This operation shouldn't allocate in the benchmarks. I've verified this in the profile (there's no BufferMut::reserve or BytesMut::extend_from_slice both of which I've seen when it allocates).

Yeah, what you're describing is a reasonable alternative to adding extra space to the builders. I'd be happy to do that, though at this point it feels like we should do that in a follow up PR since this one is fairly large already.


// How many fastlanes vectors we will process.
// Packed array might not start at 0 when the array is sliced. Offset is guaranteed to be < 1024.
let num_chunks = (offset + length + 1023) / 1024;
Expand All @@ -241,42 +258,48 @@ pub fn unpack_primitive<T: NativePType + BitPacking>(
num_chunks * elems_per_chunk
);

// Allocate a result vector.
// TODO(ngates): do we want to use fastlanes alignment for this buffer?
let mut output = BufferMut::with_capacity(num_chunks * 1024 - offset);

// Handle first chunk if offset is non 0. We have to decode the chunk and skip first offset elements
let first_full_chunk = if offset != 0 {
let chunk: &[T] = &packed[0..elems_per_chunk];
let mut decoded = [T::zero(); 1024];
let chunk = &packed[0..elems_per_chunk];
let mut decoded = [UnsignedT::zero(); 1024];
unsafe { BitPacking::unchecked_unpack(bit_width, chunk, &mut decoded) };
output.extend_from_slice(&decoded[offset..]);
builder
.values
.extend_from_slice(transmute(&decoded[offset..]));
1
} else {
0
};

// Loop over all the chunks.
(first_full_chunk..num_chunks).for_each(|i| {
let chunk: &[T] = &packed[i * elems_per_chunk..][0..elems_per_chunk];
Comment on lines -260 to -261
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOI, why did you move from a for each to a loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed a ? and I couldn't quickly sort out how to make for_each take a Result-returning function.

for i in first_full_chunk..num_chunks {
let chunk = &packed[i * elems_per_chunk..][0..elems_per_chunk];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there a lots of chunks (>L2) cache that we want to apply patches after each loop iteration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable, but then I need to slice/chunk/partition the patches which is not free.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True


// SAFETY:
//
// 1. unchecked_unpack only writes into the output and when it is finished, all the outputs
// have been written to.
//
// 2. The output buffer is exactly size 1024.
unsafe {
let output_len = output.len();
output.set_len(output_len + 1024);
BitPacking::unchecked_unpack(bit_width, chunk, &mut output[output_len..][0..1024]);
builder.values.set_len(builder.values.len() + 1024);
BitPacking::unchecked_unpack(
bit_width,
chunk,
&mut transmute_mut(builder.values.deref_mut())
[builder_current_length + i * 1024 - offset..][..1024],
);
}
});
}

// The final chunk may have had padding
output.truncate(length);
builder.truncate(builder_current_length + length);

assert_eq!(
output.len(),
length,
"Expected unpacked array to be of length {} but got {}",
length,
output.len()
);
output.freeze()
if let Some(patches) = array.patches() {
builder.patch(patches, builder_current_length)?;
}

Ok(())
}

pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult<Scalar> {
Expand Down
17 changes: 13 additions & 4 deletions encodings/fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use vortex_array::vtable::{
CanonicalVTable, StatisticsVTable, ValidateVTable, ValidityVTable, VariantsVTable,
VisitorVTable,
};
use vortex_array::{encoding_ids, impl_encoding, Array, Canonical, IntoArray, RkyvMetadata};
use vortex_array::{encoding_ids, impl_encoding, Array, Canonical, RkyvMetadata};
use vortex_buffer::ByteBuffer;
use vortex_dtype::{DType, NativePType, PType};
use vortex_dtype::{match_each_integer_ptype_with_unsigned_type, DType, NativePType, PType};
use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult};
use vortex_mask::Mask;

Expand Down Expand Up @@ -264,8 +264,17 @@ impl CanonicalVTable<BitPackedArray> for BitPackedEncoding {
array: BitPackedArray,
builder: &mut dyn ArrayBuilder,
) -> VortexResult<()> {
// TODO(joe): add specialised impl
builder.extend_from_array(array.into_array())
match_each_integer_ptype_with_unsigned_type!(array.ptype(), |$T, $UnsignedT| {
unpack_into::<$T, $UnsignedT, _, _>(
array,
builder
.as_any_mut()
.downcast_mut()
.vortex_expect("bit packed array must canonicalize into a primitive array"),
|x| unsafe { std::mem::transmute(x) },
|x| unsafe { std::mem::transmute(x) },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some safety and explanation comment would be good here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

)
})
}
}

Expand Down
41 changes: 41 additions & 0 deletions vortex-array/src/builders/lazy_validity_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use vortex_dtype::Nullability::{NonNullable, Nullable};
use vortex_error::{vortex_panic, VortexExpect, VortexResult};

use crate::validity::Validity;
use crate::IntoArrayVariant as _;

/// This is borrowed from arrow's null buffer builder, however we expose a `append_buffer`
/// method to append a boolean buffer directly.
Expand Down Expand Up @@ -80,11 +81,51 @@ impl LazyNullBufferBuilder {
.append_buffer(&bool_buffer);
}

pub fn append_validity(&mut self, validity: Validity, length: usize) -> VortexResult<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had been doing this with a validity mask. I wonder if we should choose either of the two methods and use them consistently

        match array.validity_mask()?.boolean_buffer() {
            AllOr::All => {
                self.nulls.append_n_non_nulls(array.len());
            }
            AllOr::None => self.nulls.append_n_nulls(array.len()),
            AllOr::Some(validity) => self.nulls.append_buffer(validity.clone()),
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use validity_mask here, it should perf better than creating a validity and then a boolean buffer below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I went with the following and changed append_buffer to take a reference to a BooleanBuffer rather than an owned one.

    pub fn append_validity_mask(&mut self, validity_mask: Mask) {
        match validity_mask {
            Mask::AllTrue(len) => self.append_n_non_nulls(len),
            Mask::AllFalse(len) => self.append_n_nulls(len),
            Mask::Values(is_valid) => self.append_buffer(is_valid.boolean_buffer()),
        }
    }

match validity {
Validity::NonNullable => {}
Validity::AllValid => {
self.append_n_non_nulls(length);
}
Validity::AllInvalid => {
self.append_n_nulls(length);
}
Validity::Array(array) => {
self.append_buffer(array.into_bool()?.boolean_buffer());
}
}

Ok(())
}

pub fn set_bit(&mut self, index: usize, v: bool) {
self.materialize_if_needed();
self.inner
.as_mut()
.vortex_expect("buffer just materialized")
.set_bit(index, v);
}

pub fn len(&self) -> usize {
// self.len is the length of the builder if the inner buffer is not materialized
self.inner.as_ref().map(|i| i.len()).unwrap_or(self.len)
}

pub fn truncate(&mut self, len: usize) {
if let Some(b) = self.inner.as_mut() {
b.truncate(len)
}
self.len = len;
}

pub fn reserve(&mut self, n: usize) {
self.materialize_if_needed();
self.inner
.as_mut()
.vortex_expect("buffer just materialized")
.reserve(n);
}

pub fn finish(&mut self) -> Option<NullBuffer> {
self.len = 0;
Some(NullBuffer::new(self.inner.take()?.finish()))
Expand Down
75 changes: 55 additions & 20 deletions vortex-array/src/builders/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use std::any::Any;

use vortex_buffer::BufferMut;
use vortex_dtype::{DType, NativePType, Nullability};
use vortex_dtype::{match_each_unsigned_integer_ptype, DType, NativePType, Nullability};
use vortex_error::{vortex_bail, VortexResult};
use vortex_mask::AllOr;

use crate::array::{BoolArray, PrimitiveArray};
use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
use crate::builders::ArrayBuilder;
use crate::patches::Patches;
use crate::validity::Validity;
use crate::variants::PrimitiveArrayTrait;
use crate::{Array, IntoArray, IntoCanonical};
use crate::{Array, IntoArray, IntoArrayVariant as _, IntoCanonical};

pub struct PrimitiveBuilder<T: NativePType> {
values: BufferMut<T>,
nulls: LazyNullBufferBuilder,
pub values: BufferMut<T>,
pub nulls: LazyNullBufferBuilder,
dtype: DType,
}

Expand Down Expand Up @@ -45,6 +46,55 @@ impl<T: NativePType> PrimitiveBuilder<T> {
None => self.append_null(),
}
}

pub fn patch(&mut self, patches: Patches, starting_at: usize) -> VortexResult<()> {
let (array_len, offset, indices, values) = patches.into_parts();
assert!(starting_at + array_len == self.len());
let indices = indices.into_primitive()?;
let values = values.into_primitive()?;
let validity = values.validity_mask()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the validity is all true, then remove the check from the below loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

let values = values.as_slice::<T>();
match_each_unsigned_integer_ptype!(indices.ptype(), |$P| {
for (compressed_index, decompressed_index) in indices.into_primitive()?.as_slice::<$P>().into_iter().enumerate() {
let decompressed_index = *decompressed_index as usize;
let out_index = starting_at + decompressed_index - offset;
if validity.value(compressed_index) {
self.values[out_index] = values[compressed_index]
} else {
self.nulls.set_bit(out_index, false)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if validity.value(compressed_index) {
self.values[out_index] = values[compressed_index]
} else {
self.nulls.set_bit(out_index, false)
}
self.values[out_index] = values[compressed_index];
self.nulls.set_bit(out_index, validity.value(compressed_index));

I think you can remove the branch if you ensure that self.nulls has the correct backing array below

Copy link
Member Author

@danking danking Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did something different but indeed there are no branches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this.

}
});

Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this live in its own fn

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

pub fn truncate(&mut self, len: usize) {
self.values.truncate(len);
self.nulls.truncate(len);
}

pub fn finish_into_primitive(&mut self) -> VortexResult<PrimitiveArray> {
let validity = match (self.nulls.finish(), self.dtype().nullability()) {
(None, Nullability::NonNullable) => Validity::NonNullable,
(Some(_), Nullability::NonNullable) => {
vortex_bail!("Non-nullable builder has null values")
}
(None, Nullability::Nullable) => Validity::AllValid,
(Some(nulls), Nullability::Nullable) => {
if nulls.null_count() == nulls.len() {
Validity::AllInvalid
} else {
Validity::Array(BoolArray::from(nulls.into_inner()).into_array())
}
}
};

Ok(PrimitiveArray::new(
std::mem::take(&mut self.values).freeze(),
validity,
))
}
}

impl<T: NativePType> ArrayBuilder for PrimitiveBuilder<T> {
Expand Down Expand Up @@ -94,21 +144,6 @@ impl<T: NativePType> ArrayBuilder for PrimitiveBuilder<T> {
}

fn finish(&mut self) -> VortexResult<Array> {
let validity = match (self.nulls.finish(), self.dtype().nullability()) {
(None, Nullability::NonNullable) => Validity::NonNullable,
(Some(_), Nullability::NonNullable) => {
vortex_bail!("Non-nullable builder has null values")
}
(None, Nullability::Nullable) => Validity::AllValid,
(Some(nulls), Nullability::Nullable) => {
if nulls.null_count() == nulls.len() {
Validity::AllInvalid
} else {
Validity::Array(BoolArray::from(nulls.into_inner()).into_array())
}
}
};

Ok(PrimitiveArray::new(std::mem::take(&mut self.values).freeze(), validity).into_array())
self.finish_into_primitive().map(IntoArray::into_array)
}
}
Loading
Loading