Skip to content

Commit 54efb65

Browse files
authored
Revisit List Row Encoding (#5807) (#5811)
* Revisit List Row Encoding (#5807) * Remove println * Fix row decode
1 parent 1ebd2a4 commit 54efb65

File tree

3 files changed

+115
-96
lines changed

3 files changed

+115
-96
lines changed

arrow-row/src/lib.rs

+26-22
Original file line numberDiff line numberDiff line change
@@ -295,16 +295,9 @@ mod variable;
295295
///
296296
/// Lists are encoded by first encoding all child elements to the row format.
297297
///
298-
/// A "canonical byte array" is then constructed by concatenating the row
299-
/// encodings of all their elements into a single binary array, followed
300-
/// by the lengths of each encoded row, and the number of elements, encoded
301-
/// as big endian `u32`.
302-
///
303-
/// This canonical byte array is then encoded using the variable length byte
304-
/// encoding described above.
305-
///
306-
/// _The lengths are not strictly necessary but greatly simplify decode, they
307-
/// may be removed in a future iteration_.
298+
/// A list value is then encoded as the concatenation of each of the child elements,
299+
/// separately encoded using the variable length encoding described above, followed
300+
/// by the variable length encoding of an empty byte array.
308301
///
309302
/// For example given:
310303
///
@@ -323,24 +316,23 @@ mod variable;
323316
/// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘
324317
///```
325318
///
326-
/// Which would be grouped into the following canonical byte arrays:
319+
/// Which would be encoded as
327320
///
328321
/// ```text
329-
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
330-
/// [1_u8, 2_u8, 3_u8] │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│
331-
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
332-
/// └──── rows ────┘ └───────── row lengths ─────────┘ └─ count ─┘
333-
///
334-
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
335-
/// [1_u8, null] │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│
336-
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
322+
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
323+
/// [1_u8, 2_u8, 3_u8] │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
324+
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
325+
/// └──── 1_u8 ────┘ └──── 2_u8 ────┘ └──── 3_u8 ────┘
326+
///
327+
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
328+
/// [1_u8, null] │02│01│01│00│00│02│02│00│00│00│00│02│01│
329+
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
330+
/// └──── 1_u8 ────┘ └──── null ────┘
331+
///
337332
///```
338333
///
339334
/// With `[]` represented by an empty byte array, and `null` a null byte array.
340335
///
341-
/// These byte arrays will then be encoded using the variable length byte encoding
342-
/// described above.
343-
///
344336
/// # Ordering
345337
///
346338
/// ## Float Ordering
@@ -2271,4 +2263,16 @@ mod tests {
22712263

22722264
dictionary_eq(&back[0], &array);
22732265
}
2266+
2267+
#[test]
2268+
fn test_list_prefix() {
2269+
let mut a = ListBuilder::new(Int8Builder::new());
2270+
a.append_value([None]);
2271+
a.append_value([None, None]);
2272+
let a = a.finish();
2273+
2274+
let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2275+
let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2276+
assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2277+
}
22742278
}

arrow-row/src/list.rs

+73-62
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::{RowConverter, Rows, SortField};
19-
use arrow_array::builder::BufferBuilder;
18+
use crate::{null_sentinel, RowConverter, Rows, SortField};
2019
use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
20+
use arrow_buffer::{Buffer, MutableBuffer};
2121
use arrow_data::ArrayDataBuilder;
2222
use arrow_schema::{ArrowError, SortOptions};
2323
use std::ops::Range;
@@ -43,12 +43,10 @@ pub fn compute_lengths<O: OffsetSizeTrait>(
4343
fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
4444
match range {
4545
None => 1,
46-
Some(range) if range.start == range.end => 1,
4746
Some(range) => {
48-
let element_count = range.end - range.start;
49-
let row_bytes = range.map(|i| rows.row(i).as_ref().len()).sum::<usize>();
50-
let total = (1 + element_count) * std::mem::size_of::<u32>() + row_bytes;
51-
super::variable::padded_length(Some(total))
47+
1 + range
48+
.map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
49+
.sum::<usize>()
5250
}
5351
}
5452
}
@@ -63,7 +61,6 @@ pub fn encode<O: OffsetSizeTrait>(
6361
opts: SortOptions,
6462
array: &GenericListArray<O>,
6563
) {
66-
let mut temporary = vec![];
6764
offsets
6865
.iter_mut()
6966
.skip(1)
@@ -74,42 +71,28 @@ pub fn encode<O: OffsetSizeTrait>(
7471
let end = offsets[1].as_usize();
7572
let range = array.is_valid(idx).then_some(start..end);
7673
let out = &mut data[*offset..];
77-
*offset += encode_one(out, &mut temporary, rows, range, opts)
74+
*offset += encode_one(out, rows, range, opts)
7875
});
7976
}
8077

8178
#[inline]
8279
fn encode_one(
8380
out: &mut [u8],
84-
temporary: &mut Vec<u8>,
8581
rows: &Rows,
8682
range: Option<Range<usize>>,
8783
opts: SortOptions,
8884
) -> usize {
89-
temporary.clear();
90-
9185
match range {
92-
None => super::variable::encode_one(out, None, opts),
93-
Some(range) if range.start == range.end => {
94-
super::variable::encode_one(out, Some(&[]), opts)
95-
}
86+
None => super::variable::encode_null(out, opts),
87+
Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
9688
Some(range) => {
97-
for row in range.clone().map(|i| rows.row(i)) {
98-
temporary.extend_from_slice(row.as_ref());
99-
}
100-
for row in range.clone().map(|i| rows.row(i)) {
101-
let len: u32 = row
102-
.as_ref()
103-
.len()
104-
.try_into()
105-
.expect("ListArray or LargeListArray containing a list of more than u32::MAX items is not supported");
106-
temporary.extend_from_slice(&len.to_be_bytes());
89+
let mut offset = 0;
90+
for i in range {
91+
let row = rows.row(i);
92+
offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
10793
}
108-
let row_count: u32 = (range.end - range.start)
109-
.try_into()
110-
.expect("lists containing more than u32::MAX elements not supported");
111-
temporary.extend_from_slice(&row_count.to_be_bytes());
112-
super::variable::encode_one(out, Some(temporary), opts)
94+
offset += super::variable::encode_empty(&mut out[offset..], opts);
95+
offset
11396
}
11497
}
11598
}
@@ -125,50 +108,78 @@ pub unsafe fn decode<O: OffsetSizeTrait>(
125108
field: &SortField,
126109
validate_utf8: bool,
127110
) -> Result<GenericListArray<O>, ArrowError> {
128-
let canonical = super::variable::decode_binary::<i64>(rows, field.options);
129-
130-
let mut offsets = BufferBuilder::<O>::new(rows.len() + 1);
131-
offsets.append(O::from_usize(0).unwrap());
132-
let mut current_offset = 0;
133-
134-
let mut child_rows = Vec::with_capacity(rows.len());
135-
canonical.value_offsets().windows(2).for_each(|w| {
136-
let start = w[0] as usize;
137-
let end = w[1] as usize;
138-
if start == end {
139-
// Null or empty list
140-
offsets.append(O::from_usize(current_offset).unwrap());
141-
return;
142-
}
111+
let opts = field.options;
112+
113+
let mut values_bytes = 0;
143114

144-
let row = &canonical.value_data()[start..end];
145-
let element_count_start = row.len() - 4;
146-
let element_count =
147-
u32::from_be_bytes((&row[element_count_start..]).try_into().unwrap()) as usize;
115+
let mut offset = 0;
116+
let mut offsets = Vec::with_capacity(rows.len() + 1);
117+
offsets.push(O::usize_as(0));
148118

149-
let lengths_start = element_count_start - (element_count * 4);
119+
for row in rows.iter_mut() {
150120
let mut row_offset = 0;
151-
row[lengths_start..element_count_start]
152-
.chunks_exact(4)
153-
.for_each(|chunk| {
154-
let len = u32::from_be_bytes(chunk.try_into().unwrap());
155-
let next_row_offset = row_offset + len as usize;
156-
child_rows.push(&row[row_offset..next_row_offset]);
157-
row_offset = next_row_offset;
121+
loop {
122+
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
123+
values_bytes += x.len();
158124
});
125+
if decoded <= 1 {
126+
offsets.push(O::usize_as(offset));
127+
break;
128+
}
129+
row_offset += decoded;
130+
offset += 1;
131+
}
132+
}
133+
O::from_usize(offset).expect("overflow");
159134

160-
current_offset += element_count;
161-
offsets.append(O::from_usize(current_offset).unwrap());
135+
let mut null_count = 0;
136+
let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
137+
let valid = rows[x][0] != null_sentinel(opts);
138+
null_count += !valid as usize;
139+
valid
162140
});
163141

142+
let mut values_offsets = Vec::with_capacity(offset);
143+
let mut values_bytes = Vec::with_capacity(values_bytes);
144+
for row in rows.iter_mut() {
145+
let mut row_offset = 0;
146+
loop {
147+
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
148+
values_bytes.extend_from_slice(x)
149+
});
150+
row_offset += decoded;
151+
if decoded <= 1 {
152+
break;
153+
}
154+
values_offsets.push(values_bytes.len());
155+
}
156+
*row = &row[row_offset..];
157+
}
158+
159+
if opts.descending {
160+
values_bytes.iter_mut().for_each(|o| *o = !*o);
161+
}
162+
163+
let mut last_value_offset = 0;
164+
let mut child_rows: Vec<_> = values_offsets
165+
.into_iter()
166+
.map(|offset| {
167+
let v = &values_bytes[last_value_offset..offset];
168+
last_value_offset = offset;
169+
v
170+
})
171+
.collect();
172+
164173
let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
165174
assert_eq!(child.len(), 1);
175+
166176
let child_data = child[0].to_data();
167177

168178
let builder = ArrayDataBuilder::new(field.data_type.clone())
169179
.len(rows.len())
170-
.nulls(canonical.nulls().cloned())
171-
.add_buffer(offsets.finish())
180+
.null_count(null_count)
181+
.null_bit_buffer(Some(nulls.into()))
182+
.add_buffer(Buffer::from_vec(offsets))
172183
.add_child_data(child_data);
173184

174185
Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))

arrow-row/src/variable.rs

+16-12
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,23 @@ pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
8383
}
8484
}
8585

86+
pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
87+
out[0] = null_sentinel(opts);
88+
1
89+
}
90+
91+
pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
92+
out[0] = match opts.descending {
93+
true => !EMPTY_SENTINEL,
94+
false => EMPTY_SENTINEL,
95+
};
96+
1
97+
}
98+
8699
pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
87100
match val {
88-
Some([]) => {
89-
out[0] = match opts.descending {
90-
true => !EMPTY_SENTINEL,
91-
false => EMPTY_SENTINEL,
92-
};
93-
1
94-
}
101+
None => encode_null(out, opts),
102+
Some([]) => encode_empty(out, opts),
95103
Some(val) => {
96104
// Write `2_u8` to demarcate as non-empty, non-null string
97105
out[0] = NON_EMPTY_SENTINEL;
@@ -111,10 +119,6 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
111119
}
112120
len
113121
}
114-
None => {
115-
out[0] = null_sentinel(opts);
116-
1
117-
}
118122
}
119123
}
120124

@@ -148,7 +152,7 @@ fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
148152
end_offset
149153
}
150154

151-
fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
155+
pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
152156
let (non_empty_sentinel, continuation) = match options.descending {
153157
true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
154158
false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),

0 commit comments

Comments
 (0)