Skip to content

Commit 3f61d21

Browse files
authored
Use buffers for FSST (#2656)
TODO(ngates): compress uncompressed_lengths
1 parent 44b4019 commit 3f61d21

File tree

7 files changed

+48
-87
lines changed

7 files changed

+48
-87
lines changed
-84 Bytes
Binary file not shown.

encodings/fsst/src/array.rs

+12-39
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use vortex_array::variants::{BinaryArrayTrait, Utf8ArrayTrait};
55
use vortex_array::vtable::{EncodingVTable, VTableRef};
66
use vortex_array::{
77
Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, ArrayValidityImpl, ArrayVariantsImpl,
8-
Encoding, EncodingId, SerdeMetadata, ToCanonical,
8+
Encoding, EncodingId, SerdeMetadata,
99
};
10-
use vortex_dtype::{DType, Nullability, PType};
10+
use vortex_buffer::Buffer;
11+
use vortex_dtype::DType;
1112
use vortex_error::{VortexResult, vortex_bail};
1213
use vortex_mask::Mask;
1314

@@ -16,8 +17,8 @@ use crate::serde::FSSTMetadata;
1617
#[derive(Clone, Debug)]
1718
pub struct FSSTArray {
1819
dtype: DType,
19-
symbols: ArrayRef,
20-
symbol_lengths: ArrayRef,
20+
symbols: Buffer<Symbol>,
21+
symbol_lengths: Buffer<u8>,
2122
codes: ArrayRef,
2223
uncompressed_lengths: ArrayRef,
2324
stats_set: ArrayStats,
@@ -35,9 +36,6 @@ impl EncodingVTable for FSSTEncoding {
3536
}
3637
}
3738

38-
pub(crate) static SYMBOLS_DTYPE: DType = DType::Primitive(PType::U64, Nullability::NonNullable);
39-
pub(crate) static SYMBOL_LENS_DTYPE: DType = DType::Primitive(PType::U8, Nullability::NonNullable);
40-
4139
impl FSSTArray {
4240
/// Build an FSST array from a set of `symbols` and `codes`.
4341
///
@@ -49,20 +47,11 @@ impl FSSTArray {
4947
/// which tells the decoder to emit the following byte without doing a table lookup.
5048
pub fn try_new(
5149
dtype: DType,
52-
symbols: ArrayRef,
53-
symbol_lengths: ArrayRef,
50+
symbols: Buffer<Symbol>,
51+
symbol_lengths: Buffer<u8>,
5452
codes: ArrayRef,
5553
uncompressed_lengths: ArrayRef,
5654
) -> VortexResult<Self> {
57-
// Check: symbols must be a u64 array
58-
if symbols.dtype() != &SYMBOLS_DTYPE {
59-
vortex_bail!(InvalidArgument: "symbols array must be of type u64")
60-
}
61-
62-
if symbol_lengths.dtype() != &SYMBOL_LENS_DTYPE {
63-
vortex_bail!(InvalidArgument: "symbol_lengths array must be of type u8")
64-
}
65-
6655
// Check: symbols must not have length > MAX_CODE
6756
if symbols.len() > 255 {
6857
vortex_bail!(InvalidArgument: "symbols array must have length <= 255");
@@ -102,12 +91,12 @@ impl FSSTArray {
10291
}
10392

10493
/// Access the symbol table array
105-
pub fn symbols(&self) -> &ArrayRef {
94+
pub fn symbols(&self) -> &Buffer<Symbol> {
10695
&self.symbols
10796
}
10897

10998
/// Access the symbol table array
110-
pub fn symbol_lengths(&self) -> &ArrayRef {
99+
pub fn symbol_lengths(&self) -> &Buffer<u8> {
111100
&self.symbol_lengths
112101
}
113102

@@ -134,27 +123,11 @@ impl FSSTArray {
134123
}
135124

136125
/// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
137-
/// this array, and pass it to the given function.
126+
/// this array.
138127
///
139128
/// This is private to the crate to avoid leaking `fsst-rs` types as part of the public API.
140-
pub(crate) fn with_decompressor<F, R>(&self, apply: F) -> VortexResult<R>
141-
where
142-
F: FnOnce(Decompressor) -> VortexResult<R>,
143-
{
144-
// canonicalize the symbols child array, so we can view it contiguously
145-
let symbols_array = self.symbols().to_primitive()?;
146-
let symbols = symbols_array.as_slice::<u64>();
147-
148-
let symbol_lengths_array = self.symbol_lengths().to_primitive()?;
149-
let symbol_lengths = symbol_lengths_array.as_slice::<u8>();
150-
151-
// Transmute the 64-bit symbol values into fsst `Symbol`s.
152-
// SAFETY: Symbol is guaranteed to be 8 bytes, guaranteed by the compiler.
153-
let symbols = unsafe { std::mem::transmute::<&[u64], &[Symbol]>(symbols) };
154-
155-
// Build a new decompressor that uses these symbols.
156-
let decompressor = Decompressor::new(symbols, symbol_lengths);
157-
apply(decompressor)
129+
pub(crate) fn decompressor(&self) -> Decompressor {
130+
Decompressor::new(self.symbols().as_slice(), self.symbol_lengths().as_slice())
158131
}
159132
}
160133

encodings/fsst/src/canonical.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,15 @@ use crate::FSSTArray;
1313

1414
impl ArrayCanonicalImpl for FSSTArray {
1515
fn _to_canonical(&self) -> VortexResult<Canonical> {
16-
self.with_decompressor(|decompressor| {
17-
fsst_into_varbin_view(decompressor, self, 0).map(Canonical::VarBinView)
18-
})
16+
fsst_into_varbin_view(self.decompressor(), self, 0).map(Canonical::VarBinView)
1917
}
2018

2119
fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
2220
let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
2321
return builder.extend_from_array(&self.to_canonical()?.into_array());
2422
};
25-
let view = self.with_decompressor(|decompressor| {
26-
fsst_into_varbin_view(decompressor, self, builder.completed_block_count())
27-
})?;
23+
let view =
24+
fsst_into_varbin_view(self.decompressor(), self, builder.completed_block_count())?;
2825

2926
builder.push_buffer_and_adjusted_views(
3027
view.buffers().iter().cloned(),

encodings/fsst/src/compress.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use vortex_array::accessor::ArrayAccessor;
55
use vortex_array::arrays::builder::VarBinBuilder;
66
use vortex_array::arrays::{VarBinArray, VarBinViewArray};
77
use vortex_array::{Array, IntoArray};
8-
use vortex_buffer::{Buffer, BufferMut, ByteBuffer};
8+
use vortex_buffer::{Buffer, BufferMut};
99
use vortex_dtype::DType;
1010
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail};
1111

@@ -113,13 +113,9 @@ where
113113
let codes = builder
114114
.finish(DType::Binary(dtype.nullability()))
115115
.into_array();
116-
let symbols_vec: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
117-
// SAFETY: Symbol and u64 are same size
118-
let symbols_u64: Buffer<u64> = unsafe { std::mem::transmute(symbols_vec) };
119-
let symbols = symbols_u64.into_array();
116+
let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
117+
let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
120118

121-
let symbol_lengths_vec: ByteBuffer = ByteBuffer::copy_from(compressor.symbol_lengths());
122-
let symbol_lengths = symbol_lengths_vec.into_array();
123119
let uncompressed_lengths = uncompressed_lengths.into_array();
124120

125121
FSSTArray::try_new(dtype, symbols, symbol_lengths, codes, uncompressed_lengths)

encodings/fsst/src/compute/compare.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use fsst::Symbol;
21
use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray};
32
use vortex_array::compute::{CompareFn, Operator, compare, compare_lengths_to_empty};
43
use vortex_array::validity::Validity;
@@ -69,15 +68,12 @@ fn compare_fsst_constant(
6968
return Ok(None);
7069
}
7170

72-
let symbols = left.symbols().to_primitive()?;
73-
let symbols_u64 = symbols.as_slice::<u64>();
74-
75-
let symbol_lens = left.symbol_lengths().to_primitive()?;
76-
let symbol_lens_u8 = symbol_lens.as_slice::<u8>();
71+
let symbols = left.symbols();
72+
let symbol_lens = left.symbol_lengths();
7773

7874
let mut compressor = fsst::CompressorBuilder::new();
79-
for (symbol, symbol_len) in symbols_u64.iter().zip(symbol_lens_u8.iter()) {
80-
compressor.insert(Symbol::from_slice(&symbol.to_le_bytes()), *symbol_len as _);
75+
for (symbol, symbol_len) in symbols.iter().zip(symbol_lens.iter()) {
76+
compressor.insert(*symbol, *symbol_len as usize);
8177
}
8278
let compressor = compressor.build();
8379

encodings/fsst/src/compute/mod.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,9 @@ impl ScalarAtFn<&FSSTArray> for FSSTEncoding {
8282
.value()
8383
.ok_or_else(|| vortex_err!("expected null to already be handled"))?;
8484

85-
array.with_decompressor(|decompressor| {
86-
let decoded_buffer = ByteBuffer::from(decompressor.decompress(binary_datum.as_slice()));
87-
Ok(varbin_scalar(decoded_buffer, array.dtype()))
88-
})
85+
let decoded_buffer =
86+
ByteBuffer::from(array.decompressor().decompress(binary_datum.as_slice()));
87+
Ok(varbin_scalar(decoded_buffer, array.dtype()))
8988
}
9089
}
9190

encodings/fsst/src/serde.rs

+23-23
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
1+
use fsst::Symbol;
12
use serde::{Deserialize, Serialize};
23
use vortex_array::serde::ArrayParts;
34
use vortex_array::vtable::SerdeVTable;
45
use vortex_array::{
5-
Array, ArrayChildVisitor, ArrayContext, ArrayRef, ArrayVisitorImpl, DeserializeMetadata,
6-
SerdeMetadata,
6+
Array, ArrayBufferVisitor, ArrayChildVisitor, ArrayContext, ArrayRef, ArrayVisitorImpl,
7+
DeserializeMetadata, SerdeMetadata,
78
};
9+
use vortex_buffer::Buffer;
810
use vortex_dtype::{DType, Nullability, PType};
9-
use vortex_error::{VortexExpect, VortexResult};
11+
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
1012

11-
use crate::array::{SYMBOL_LENS_DTYPE, SYMBOLS_DTYPE};
1213
use crate::{FSSTArray, FSSTEncoding};
1314

1415
#[derive(Debug, Clone, Serialize, Deserialize)]
1516
pub struct FSSTMetadata {
16-
symbols_len: usize,
17-
codes_nullability: Nullability,
1817
uncompressed_lengths_ptype: PType,
1918
}
2019

2120
impl ArrayVisitorImpl<SerdeMetadata<FSSTMetadata>> for FSSTArray {
21+
fn _buffers(&self, visitor: &mut dyn ArrayBufferVisitor) {
22+
visitor.visit_buffer(&self.symbols().clone().into_byte_buffer());
23+
visitor.visit_buffer(&self.symbol_lengths().clone().into_byte_buffer());
24+
}
25+
2226
fn _children(&self, visitor: &mut dyn ArrayChildVisitor) {
23-
visitor.visit_child("symbols", self.symbols());
24-
visitor.visit_child("symbol_lengths", self.symbol_lengths());
2527
visitor.visit_child("codes", self.codes());
2628
visitor.visit_child("uncompressed_lengths", self.uncompressed_lengths());
2729
}
2830

2931
fn _metadata(&self) -> SerdeMetadata<FSSTMetadata> {
3032
SerdeMetadata(FSSTMetadata {
31-
symbols_len: self.symbols().len(),
32-
codes_nullability: self.codes().dtype().nullability(),
3333
uncompressed_lengths_ptype: PType::try_from(self.uncompressed_lengths().dtype())
3434
.vortex_expect("Must be a valid PType"),
3535
})
@@ -46,17 +46,19 @@ impl SerdeVTable<&FSSTArray> for FSSTEncoding {
4646
) -> VortexResult<ArrayRef> {
4747
let metadata = SerdeMetadata::<FSSTMetadata>::deserialize(parts.metadata())?;
4848

49-
let symbols = parts
50-
.child(0)
51-
.decode(ctx, SYMBOLS_DTYPE.clone(), metadata.symbols_len)?;
52-
let symbol_lengths =
53-
parts
54-
.child(1)
55-
.decode(ctx, SYMBOL_LENS_DTYPE.clone(), metadata.symbols_len)?;
49+
if parts.nbuffers() != 2 {
50+
vortex_bail!(InvalidArgument: "Expected 2 buffers, got {}", parts.nbuffers());
51+
}
52+
let symbols = Buffer::<Symbol>::from_byte_buffer(parts.buffer(0)?);
53+
let symbol_lengths = Buffer::<u8>::from_byte_buffer(parts.buffer(1)?);
54+
55+
if parts.nchildren() != 2 {
56+
vortex_bail!(InvalidArgument: "Expected 2 children, got {}", parts.nchildren());
57+
}
5658
let codes = parts
57-
.child(2)
58-
.decode(ctx, DType::Binary(metadata.codes_nullability), len)?;
59-
let uncompressed_lengths = parts.child(3).decode(
59+
.child(0)
60+
.decode(ctx, DType::Binary(dtype.nullability()), len)?;
61+
let uncompressed_lengths = parts.child(1).decode(
6062
ctx,
6163
DType::Primitive(
6264
metadata.uncompressed_lengths_ptype,
@@ -76,7 +78,7 @@ impl SerdeVTable<&FSSTArray> for FSSTEncoding {
7678
mod test {
7779
use vortex_array::SerdeMetadata;
7880
use vortex_array::test_harness::check_metadata;
79-
use vortex_dtype::{Nullability, PType};
81+
use vortex_dtype::PType;
8082

8183
use crate::serde::FSSTMetadata;
8284

@@ -86,8 +88,6 @@ mod test {
8688
check_metadata(
8789
"fsst.metadata",
8890
SerdeMetadata(FSSTMetadata {
89-
symbols_len: usize::MAX,
90-
codes_nullability: Nullability::Nullable,
9191
uncompressed_lengths_ptype: PType::U64,
9292
}),
9393
);

0 commit comments

Comments
 (0)