Skip to content

Append Row to Rows (#4466) #4470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions arrow-row/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ pub fn compute_dictionary_mapping(

/// Encode dictionary values not preserving the dictionary encoding
pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
out: &mut Rows,
data: &mut [u8],
offsets: &mut [usize],
column: &DictionaryArray<K>,
values: &Rows,
null: &Row<'_>,
) {
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
let row = match k {
Some(k) => values.row(k.as_usize()).data,
None => null.data,
};
let end_offset = *offset + row.len();
out.buffer[*offset..end_offset].copy_from_slice(row);
data[*offset..end_offset].copy_from_slice(row);
*offset = end_offset;
}
}
Expand All @@ -79,27 +80,26 @@ pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
/// - single `0_u8` if null
/// - the bytes of the corresponding normalized key including the null terminator
pub fn encode_dictionary<K: ArrowDictionaryKeyType>(
out: &mut Rows,
data: &mut [u8],
offsets: &mut [usize],
column: &DictionaryArray<K>,
normalized_keys: &[Option<&[u8]>],
opts: SortOptions,
) {
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
match k.and_then(|k| normalized_keys[k.as_usize()]) {
Some(normalized_key) => {
let end_offset = *offset + 1 + normalized_key.len();
out.buffer[*offset] = 1;
out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key);
data[*offset] = 1;
data[*offset + 1..end_offset].copy_from_slice(normalized_key);
// Negate if descending
if opts.descending {
out.buffer[*offset..end_offset]
.iter_mut()
.for_each(|v| *v = !*v)
data[*offset..end_offset].iter_mut().for_each(|v| *v = !*v)
}
*offset = end_offset;
}
None => {
out.buffer[*offset] = null_sentinel(opts);
data[*offset] = null_sentinel(opts);
*offset += 1;
}
}
Expand Down
20 changes: 11 additions & 9 deletions arrow-row/src/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::array::PrimitiveArray;
use crate::{null_sentinel, Rows};
use crate::null_sentinel;
use arrow_array::builder::BufferBuilder;
use arrow_array::{ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray};
use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
Expand Down Expand Up @@ -177,14 +177,15 @@ where
/// - 1 byte `0` if null or `1` if valid
/// - bytes of [`FixedLengthEncoding`]
pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
out: &mut Rows,
data: &mut [u8],
offsets: &mut [usize],
i: I,
opts: SortOptions,
) {
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
let end_offset = *offset + T::ENCODED_LEN;
if let Some(val) = maybe_val {
let to_write = &mut out.buffer[*offset..end_offset];
let to_write = &mut data[*offset..end_offset];
to_write[0] = 1;
let mut encoded = val.encode();
if opts.descending {
Expand All @@ -193,30 +194,31 @@ pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
}
to_write[1..].copy_from_slice(encoded.as_ref())
} else {
out.buffer[*offset] = null_sentinel(opts);
data[*offset] = null_sentinel(opts);
}
*offset = end_offset;
}
}

pub fn encode_fixed_size_binary(
out: &mut Rows,
data: &mut [u8],
offsets: &mut [usize],
array: &FixedSizeBinaryArray,
opts: SortOptions,
) {
let len = array.value_length() as usize;
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(array.iter()) {
for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(array.iter()) {
let end_offset = *offset + len + 1;
if let Some(val) = maybe_val {
let to_write = &mut out.buffer[*offset..end_offset];
let to_write = &mut data[*offset..end_offset];
to_write[0] = 1;
to_write[1..].copy_from_slice(&val[..len]);
if opts.descending {
// Flip bits to reverse order
to_write[1..1 + len].iter_mut().for_each(|v| *v = !*v)
}
} else {
out.buffer[*offset] = null_sentinel(opts);
data[*offset] = null_sentinel(opts);
}
*offset = end_offset;
}
Expand Down
110 changes: 86 additions & 24 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl Codec {
let nulls = converter.convert_columns(&[null_array])?;

let owned = OwnedRow {
data: nulls.buffer,
data: nulls.buffer.into(),
config: nulls.config,
};
Ok(Self::DictionaryValues(converter, owned))
Expand Down Expand Up @@ -496,7 +496,7 @@ impl Codec {

let nulls = converter.convert_columns(&nulls)?;
let owned = OwnedRow {
data: nulls.buffer,
data: nulls.buffer.into(),
config: nulls.config,
};

Expand Down Expand Up @@ -715,7 +715,13 @@ impl RowConverter {
columns.iter().zip(self.fields.iter()).zip(encoders)
{
// We encode a column at a time to minimise dispatch overheads
encode_column(&mut rows, column.as_ref(), field.options, &encoder)
encode_column(
&mut rows.buffer,
&mut rows.offsets,
column.as_ref(),
field.options,
&encoder,
)
}

if cfg!(debug_assertions) {
Expand Down Expand Up @@ -756,6 +762,48 @@ impl RowConverter {
unsafe { self.convert_raw(&mut rows, validate_utf8) }
}

/// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
/// a total length of `data_capacity`
///
/// This can be used to buffer a selection of [`Row`]
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{Row, RowConverter, SortField};
/// # use arrow_schema::DataType;
/// #
/// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// // This example shows how to buffer only the Row values
/// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already stated just above the code block, the commented out imports just make it hard to see

image

/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
///
/// // Convert to row format and deduplicate
/// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
/// let mut distinct_rows = converter.empty_rows(3, 100);
/// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
/// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
///
/// // Note: we could skip buffering and feed the filtered iterator directly
/// // into convert_rows, this is done for demonstration purposes only
/// let distinct = converter.convert_rows(&distinct_rows).unwrap();
/// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
/// assert_eq!(&values, &["hello", "world", "a"]);
/// ```
pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
offsets.push(0);

Rows {
offsets,
buffer: Vec::with_capacity(data_capacity),
config: RowConfig {
fields: self.fields.clone(),
validate_utf8: false,
},
}
}

/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
Expand Down Expand Up @@ -832,14 +880,25 @@ struct RowConfig {
#[derive(Debug)]
pub struct Rows {
/// Underlying row bytes
buffer: Box<[u8]>,
buffer: Vec<u8>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to double-check this doesn't result in a performance regression, it shouldn't but stranger things have happened

/// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
offsets: Box<[usize]>,
offsets: Vec<usize>,
/// The config for these rows
config: RowConfig,
}

impl Rows {
/// Append a [`Row`] to this [`Rows`]
pub fn push(&mut self, row: Row<'_>) {
assert!(
Arc::ptr_eq(&row.config.fields, &self.config.fields),
"row was not produced by this RowConverter"
);
self.config.validate_utf8 |= row.config.validate_utf8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this just assert that the values of validate_utf8 are the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is consistent with the logic elsewhere, and is ultimately harmless

self.buffer.extend_from_slice(row.data);
self.offsets.push(self.buffer.len())
}

pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
Expand Down Expand Up @@ -1171,66 +1230,67 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
let buffer = vec![0_u8; cur_offset];

Rows {
buffer: buffer.into(),
offsets: offsets.into(),
buffer,
offsets,
config,
}
}

/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
fn encode_column(
out: &mut Rows,
data: &mut [u8],
offsets: &mut [usize],
column: &dyn Array,
opts: SortOptions,
encoder: &Encoder<'_>,
) {
match encoder {
Encoder::Stateless => {
downcast_primitive_array! {
column => fixed::encode(out, column, opts),
column => fixed::encode(data, offsets, column, opts),
DataType::Null => {}
DataType::Boolean => fixed::encode(out, column.as_boolean(), opts),
DataType::Boolean => fixed::encode(data, offsets, column.as_boolean(), opts),
DataType::Binary => {
variable::encode(out, as_generic_binary_array::<i32>(column).iter(), opts)
variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
}
DataType::LargeBinary => {
variable::encode(out, as_generic_binary_array::<i64>(column).iter(), opts)
variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
}
DataType::Utf8 => variable::encode(
out,
data, offsets,
column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
opts,
),
DataType::LargeUtf8 => variable::encode(
out,
data, offsets,
column.as_string::<i64>()
.iter()
.map(|x| x.map(|x| x.as_bytes())),
opts,
),
DataType::FixedSizeBinary(_) => {
let array = column.as_any().downcast_ref().unwrap();
fixed::encode_fixed_size_binary(out, array, opts)
fixed::encode_fixed_size_binary(data, offsets, array, opts)
}
_ => unreachable!(),
}
}
Encoder::Dictionary(dict) => {
downcast_dictionary_array! {
column => encode_dictionary(out, column, dict, opts),
column => encode_dictionary(data, offsets, column, dict, opts),
_ => unreachable!()
}
}
Encoder::DictionaryValues(values, nulls) => {
downcast_dictionary_array! {
column => encode_dictionary_values(out, column, values, nulls),
column => encode_dictionary_values(data, offsets, column, values, nulls),
_ => unreachable!()
}
}
Encoder::Struct(rows, null) => {
let array = as_struct_array(column);
let null_sentinel = null_sentinel(opts);
out.offsets
offsets
.iter_mut()
.skip(1)
.enumerate()
Expand All @@ -1240,15 +1300,17 @@ fn encode_column(
false => (*null, null_sentinel),
};
let end_offset = *offset + 1 + row.as_ref().len();
out.buffer[*offset] = sentinel;
out.buffer[*offset + 1..end_offset].copy_from_slice(row.as_ref());
data[*offset] = sentinel;
data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
*offset = end_offset;
})
}
Encoder::List(rows) => match column.data_type() {
DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)),
DataType::List(_) => {
list::encode(data, offsets, rows, opts, as_list_array(column))
}
DataType::LargeList(_) => {
list::encode(out, rows, opts, as_large_list_array(column))
list::encode(data, offsets, rows, opts, as_large_list_array(column))
}
_ => unreachable!(),
},
Expand Down Expand Up @@ -1384,9 +1446,9 @@ mod tests {
.unwrap();
let rows = converter.convert_columns(&cols).unwrap();

assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(
rows.buffer.as_ref(),
rows.buffer,
&[
1, 128, 1, //
1, 191, 166, 102, 102, //
Expand Down
10 changes: 5 additions & 5 deletions arrow-row/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,23 @@ fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
///
/// `rows` should contain the encoded child elements
pub fn encode<O: OffsetSizeTrait>(
out: &mut Rows,
data: &mut [u8],
offsets: &mut [usize],
rows: &Rows,
opts: SortOptions,
array: &GenericListArray<O>,
) {
let mut temporary = vec![];
let offsets = array.value_offsets().windows(2);
out.offsets
offsets
.iter_mut()
.skip(1)
.zip(offsets)
.zip(array.value_offsets().windows(2))
.enumerate()
.for_each(|(idx, (offset, offsets))| {
let start = offsets[0].as_usize();
let end = offsets[1].as_usize();
let range = array.is_valid(idx).then_some(start..end);
let out = &mut out.buffer[*offset..];
let out = &mut data[*offset..];
*offset += encode_one(out, &mut temporary, rows, range, opts)
});
}
Expand Down
Loading