Skip to content

Commit d7fa775

Browse files
authored
Append Row to Rows (#4466) (#4470)
* Append Row to Rows (#4466) * Tweak docs * Pass slices to encode * Clippy
1 parent 9468905 commit d7fa775

File tree

5 files changed

+118
-53
lines changed

5 files changed

+118
-53
lines changed

arrow-row/src/dictionary.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,19 @@ pub fn compute_dictionary_mapping(
5858

5959
/// Encode dictionary values not preserving the dictionary encoding
6060
pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
61-
out: &mut Rows,
61+
data: &mut [u8],
62+
offsets: &mut [usize],
6263
column: &DictionaryArray<K>,
6364
values: &Rows,
6465
null: &Row<'_>,
6566
) {
66-
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
67+
for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
6768
let row = match k {
6869
Some(k) => values.row(k.as_usize()).data,
6970
None => null.data,
7071
};
7172
let end_offset = *offset + row.len();
72-
out.buffer[*offset..end_offset].copy_from_slice(row);
73+
data[*offset..end_offset].copy_from_slice(row);
7374
*offset = end_offset;
7475
}
7576
}
@@ -79,27 +80,26 @@ pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
7980
/// - single `0_u8` if null
8081
/// - the bytes of the corresponding normalized key including the null terminator
8182
pub fn encode_dictionary<K: ArrowDictionaryKeyType>(
82-
out: &mut Rows,
83+
data: &mut [u8],
84+
offsets: &mut [usize],
8385
column: &DictionaryArray<K>,
8486
normalized_keys: &[Option<&[u8]>],
8587
opts: SortOptions,
8688
) {
87-
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
89+
for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
8890
match k.and_then(|k| normalized_keys[k.as_usize()]) {
8991
Some(normalized_key) => {
9092
let end_offset = *offset + 1 + normalized_key.len();
91-
out.buffer[*offset] = 1;
92-
out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key);
93+
data[*offset] = 1;
94+
data[*offset + 1..end_offset].copy_from_slice(normalized_key);
9395
// Negate if descending
9496
if opts.descending {
95-
out.buffer[*offset..end_offset]
96-
.iter_mut()
97-
.for_each(|v| *v = !*v)
97+
data[*offset..end_offset].iter_mut().for_each(|v| *v = !*v)
9898
}
9999
*offset = end_offset;
100100
}
101101
None => {
102-
out.buffer[*offset] = null_sentinel(opts);
102+
data[*offset] = null_sentinel(opts);
103103
*offset += 1;
104104
}
105105
}

arrow-row/src/fixed.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::array::PrimitiveArray;
19-
use crate::{null_sentinel, Rows};
19+
use crate::null_sentinel;
2020
use arrow_array::builder::BufferBuilder;
2121
use arrow_array::{ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray};
2222
use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
@@ -177,14 +177,15 @@ where
177177
/// - 1 byte `0` if null or `1` if valid
178178
/// - bytes of [`FixedLengthEncoding`]
179179
pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
180-
out: &mut Rows,
180+
data: &mut [u8],
181+
offsets: &mut [usize],
181182
i: I,
182183
opts: SortOptions,
183184
) {
184-
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
185+
for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
185186
let end_offset = *offset + T::ENCODED_LEN;
186187
if let Some(val) = maybe_val {
187-
let to_write = &mut out.buffer[*offset..end_offset];
188+
let to_write = &mut data[*offset..end_offset];
188189
to_write[0] = 1;
189190
let mut encoded = val.encode();
190191
if opts.descending {
@@ -193,30 +194,31 @@ pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
193194
}
194195
to_write[1..].copy_from_slice(encoded.as_ref())
195196
} else {
196-
out.buffer[*offset] = null_sentinel(opts);
197+
data[*offset] = null_sentinel(opts);
197198
}
198199
*offset = end_offset;
199200
}
200201
}
201202

202203
pub fn encode_fixed_size_binary(
203-
out: &mut Rows,
204+
data: &mut [u8],
205+
offsets: &mut [usize],
204206
array: &FixedSizeBinaryArray,
205207
opts: SortOptions,
206208
) {
207209
let len = array.value_length() as usize;
208-
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(array.iter()) {
210+
for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(array.iter()) {
209211
let end_offset = *offset + len + 1;
210212
if let Some(val) = maybe_val {
211-
let to_write = &mut out.buffer[*offset..end_offset];
213+
let to_write = &mut data[*offset..end_offset];
212214
to_write[0] = 1;
213215
to_write[1..].copy_from_slice(&val[..len]);
214216
if opts.descending {
215217
// Flip bits to reverse order
216218
to_write[1..1 + len].iter_mut().for_each(|v| *v = !*v)
217219
}
218220
} else {
219-
out.buffer[*offset] = null_sentinel(opts);
221+
data[*offset] = null_sentinel(opts);
220222
}
221223
*offset = end_offset;
222224
}

arrow-row/src/lib.rs

Lines changed: 86 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ impl Codec {
458458
let nulls = converter.convert_columns(&[null_array])?;
459459

460460
let owned = OwnedRow {
461-
data: nulls.buffer,
461+
data: nulls.buffer.into(),
462462
config: nulls.config,
463463
};
464464
Ok(Self::DictionaryValues(converter, owned))
@@ -496,7 +496,7 @@ impl Codec {
496496

497497
let nulls = converter.convert_columns(&nulls)?;
498498
let owned = OwnedRow {
499-
data: nulls.buffer,
499+
data: nulls.buffer.into(),
500500
config: nulls.config,
501501
};
502502

@@ -715,7 +715,13 @@ impl RowConverter {
715715
columns.iter().zip(self.fields.iter()).zip(encoders)
716716
{
717717
// We encode a column at a time to minimise dispatch overheads
718-
encode_column(&mut rows, column.as_ref(), field.options, &encoder)
718+
encode_column(
719+
&mut rows.buffer,
720+
&mut rows.offsets,
721+
column.as_ref(),
722+
field.options,
723+
&encoder,
724+
)
719725
}
720726

721727
if cfg!(debug_assertions) {
@@ -756,6 +762,48 @@ impl RowConverter {
756762
unsafe { self.convert_raw(&mut rows, validate_utf8) }
757763
}
758764

765+
/// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
766+
/// a total length of `data_capacity`
767+
///
768+
/// This can be used to buffer a selection of [`Row`]
769+
///
770+
/// ```
771+
/// # use std::sync::Arc;
772+
/// # use std::collections::HashSet;
773+
/// # use arrow_array::cast::AsArray;
774+
/// # use arrow_array::StringArray;
775+
/// # use arrow_row::{Row, RowConverter, SortField};
776+
/// # use arrow_schema::DataType;
777+
/// #
778+
/// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
779+
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
780+
///
781+
/// // Convert to row format and deduplicate
782+
/// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
783+
/// let mut distinct_rows = converter.empty_rows(3, 100);
784+
/// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
785+
/// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
786+
///
787+
/// // Note: we could skip buffering and feed the filtered iterator directly
788+
/// // into convert_rows, this is done for demonstration purposes only
789+
/// let distinct = converter.convert_rows(&distinct_rows).unwrap();
790+
/// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
791+
/// assert_eq!(&values, &["hello", "world", "a"]);
792+
/// ```
793+
pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
794+
let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
795+
offsets.push(0);
796+
797+
Rows {
798+
offsets,
799+
buffer: Vec::with_capacity(data_capacity),
800+
config: RowConfig {
801+
fields: self.fields.clone(),
802+
validate_utf8: false,
803+
},
804+
}
805+
}
806+
759807
/// Convert raw bytes into [`ArrayRef`]
760808
///
761809
/// # Safety
@@ -832,14 +880,25 @@ struct RowConfig {
832880
#[derive(Debug)]
833881
pub struct Rows {
834882
/// Underlying row bytes
835-
buffer: Box<[u8]>,
883+
buffer: Vec<u8>,
836884
/// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
837-
offsets: Box<[usize]>,
885+
offsets: Vec<usize>,
838886
/// The config for these rows
839887
config: RowConfig,
840888
}
841889

842890
impl Rows {
891+
/// Append a [`Row`] to this [`Rows`]
892+
pub fn push(&mut self, row: Row<'_>) {
893+
assert!(
894+
Arc::ptr_eq(&row.config.fields, &self.config.fields),
895+
"row was not produced by this RowConverter"
896+
);
897+
self.config.validate_utf8 |= row.config.validate_utf8;
898+
self.buffer.extend_from_slice(row.data);
899+
self.offsets.push(self.buffer.len())
900+
}
901+
843902
pub fn row(&self, row: usize) -> Row<'_> {
844903
let end = self.offsets[row + 1];
845904
let start = self.offsets[row];
@@ -1171,66 +1230,67 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
11711230
let buffer = vec![0_u8; cur_offset];
11721231

11731232
Rows {
1174-
buffer: buffer.into(),
1175-
offsets: offsets.into(),
1233+
buffer,
1234+
offsets,
11761235
config,
11771236
}
11781237
}
11791238

11801239
/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
11811240
fn encode_column(
1182-
out: &mut Rows,
1241+
data: &mut [u8],
1242+
offsets: &mut [usize],
11831243
column: &dyn Array,
11841244
opts: SortOptions,
11851245
encoder: &Encoder<'_>,
11861246
) {
11871247
match encoder {
11881248
Encoder::Stateless => {
11891249
downcast_primitive_array! {
1190-
column => fixed::encode(out, column, opts),
1250+
column => fixed::encode(data, offsets, column, opts),
11911251
DataType::Null => {}
1192-
DataType::Boolean => fixed::encode(out, column.as_boolean(), opts),
1252+
DataType::Boolean => fixed::encode(data, offsets, column.as_boolean(), opts),
11931253
DataType::Binary => {
1194-
variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
1254+
variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
11951255
}
11961256
DataType::LargeBinary => {
1197-
variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
1257+
variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
11981258
}
11991259
DataType::Utf8 => variable::encode(
1200-
out,
1260+
data, offsets,
12011261
column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
12021262
opts,
12031263
),
12041264
DataType::LargeUtf8 => variable::encode(
1205-
out,
1265+
data, offsets,
12061266
column.as_string::<i64>()
12071267
.iter()
12081268
.map(|x| x.map(|x| x.as_bytes())),
12091269
opts,
12101270
),
12111271
DataType::FixedSizeBinary(_) => {
12121272
let array = column.as_any().downcast_ref().unwrap();
1213-
fixed::encode_fixed_size_binary(out, array, opts)
1273+
fixed::encode_fixed_size_binary(data, offsets, array, opts)
12141274
}
12151275
_ => unreachable!(),
12161276
}
12171277
}
12181278
Encoder::Dictionary(dict) => {
12191279
downcast_dictionary_array! {
1220-
column => encode_dictionary(out, column, dict, opts),
1280+
column => encode_dictionary(data, offsets, column, dict, opts),
12211281
_ => unreachable!()
12221282
}
12231283
}
12241284
Encoder::DictionaryValues(values, nulls) => {
12251285
downcast_dictionary_array! {
1226-
column => encode_dictionary_values(out, column, values, nulls),
1286+
column => encode_dictionary_values(data, offsets, column, values, nulls),
12271287
_ => unreachable!()
12281288
}
12291289
}
12301290
Encoder::Struct(rows, null) => {
12311291
let array = as_struct_array(column);
12321292
let null_sentinel = null_sentinel(opts);
1233-
out.offsets
1293+
offsets
12341294
.iter_mut()
12351295
.skip(1)
12361296
.enumerate()
@@ -1240,15 +1300,17 @@ fn encode_column(
12401300
false => (*null, null_sentinel),
12411301
};
12421302
let end_offset = *offset + 1 + row.as_ref().len();
1243-
out.buffer[*offset] = sentinel;
1244-
out.buffer[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1303+
data[*offset] = sentinel;
1304+
data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
12451305
*offset = end_offset;
12461306
})
12471307
}
12481308
Encoder::List(rows) => match column.data_type() {
1249-
DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)),
1309+
DataType::List(_) => {
1310+
list::encode(data, offsets, rows, opts, as_list_array(column))
1311+
}
12501312
DataType::LargeList(_) => {
1251-
list::encode(out, rows, opts, as_large_list_array(column))
1313+
list::encode(data, offsets, rows, opts, as_large_list_array(column))
12521314
}
12531315
_ => unreachable!(),
12541316
},
@@ -1384,9 +1446,9 @@ mod tests {
13841446
.unwrap();
13851447
let rows = converter.convert_columns(&cols).unwrap();
13861448

1387-
assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
1449+
assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
13881450
assert_eq!(
1389-
rows.buffer.as_ref(),
1451+
rows.buffer,
13901452
&[
13911453
1, 128, 1, //
13921454
1, 191, 166, 102, 102, //

arrow-row/src/list.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,23 @@ fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
5757
///
5858
/// `rows` should contain the encoded child elements
5959
pub fn encode<O: OffsetSizeTrait>(
60-
out: &mut Rows,
60+
data: &mut [u8],
61+
offsets: &mut [usize],
6162
rows: &Rows,
6263
opts: SortOptions,
6364
array: &GenericListArray<O>,
6465
) {
6566
let mut temporary = vec![];
66-
let offsets = array.value_offsets().windows(2);
67-
out.offsets
67+
offsets
6868
.iter_mut()
6969
.skip(1)
70-
.zip(offsets)
70+
.zip(array.value_offsets().windows(2))
7171
.enumerate()
7272
.for_each(|(idx, (offset, offsets))| {
7373
let start = offsets[0].as_usize();
7474
let end = offsets[1].as_usize();
7575
let range = array.is_valid(idx).then_some(start..end);
76-
let out = &mut out.buffer[*offset..];
76+
let out = &mut data[*offset..];
7777
*offset += encode_one(out, &mut temporary, rows, range, opts)
7878
});
7979
}

0 commit comments

Comments
 (0)