Skip to content

Commit

Permalink
Use PrimitiveValueDecoder::decode_spaced in string/list/map lengths d…
Browse files Browse the repository at this point in the history
…ecoding
  • Loading branch information
Jefffrey committed Sep 24, 2024
1 parent bb6ccf9 commit f225acb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 69 deletions.
25 changes: 7 additions & 18 deletions src/array_decoder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{Field, FieldRef};
use snafu::ResultExt;

use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::array_decoder::derive_present_vec;
use crate::column::{get_present_vec, Column};
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::proto::stream::Kind;
Expand Down Expand Up @@ -67,27 +67,16 @@ impl ArrayBatchDecoder for ListArrayDecoder {
) -> Result<ArrayRef> {
let present = derive_present_vec(&mut self.present, parent_present, batch_size);

// How many lengths we need to fetch
let elements_to_fetch = if let Some(present) = &present {
present.iter().filter(|&&is_present| is_present).count()
let mut lengths = vec![0; batch_size];
if let Some(present) = &present {
self.lengths.decode_spaced(&mut lengths, present)?;
} else {
batch_size
};
let lengths = self
.lengths
.by_ref()
.take(elements_to_fetch)
.collect::<Result<Vec<_>>>()?;
debug_assert_eq!(
lengths.len(),
elements_to_fetch,
"less lengths than expected in ListArray"
);
self.lengths.decode(&mut lengths)?;
}
let total_length: i64 = lengths.iter().sum();
// Fetch child array as one Array with total_length elements
let child_array = self.inner.next_batch(total_length as usize, None)?;
let lengths = populate_lengths_with_nulls(lengths, batch_size, &present);
let offsets = OffsetBuffer::from_lengths(lengths);
let offsets = OffsetBuffer::from_lengths(lengths.into_iter().map(|l| l as usize));
let null_buffer = present.map(NullBuffer::from);

let array = ListArray::try_new(self.field.clone(), offsets, child_array, null_buffer)
Expand Down
25 changes: 7 additions & 18 deletions src/array_decoder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{Field, Fields};
use snafu::ResultExt;

use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls};
use crate::array_decoder::derive_present_vec;
use crate::column::{get_present_vec, Column};
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::error::{ArrowSnafu, Result};
Expand Down Expand Up @@ -78,22 +78,12 @@ impl ArrayBatchDecoder for MapArrayDecoder {
) -> Result<ArrayRef> {
let present = derive_present_vec(&mut self.present, parent_present, batch_size);

// How many lengths we need to fetch
let elements_to_fetch = if let Some(present) = &present {
present.iter().filter(|&&is_present| is_present).count()
let mut lengths = vec![0; batch_size];
if let Some(present) = &present {
self.lengths.decode_spaced(&mut lengths, present)?;
} else {
batch_size
};
let lengths = self
.lengths
.by_ref()
.take(elements_to_fetch)
.collect::<Result<Vec<_>>>()?;
debug_assert_eq!(
lengths.len(),
elements_to_fetch,
"less lengths than expected in MapArray"
);
self.lengths.decode(&mut lengths)?;
}
let total_length: i64 = lengths.iter().sum();
// Fetch key and value arrays, each with total_length elements
// Fetch child array as one Array with total_length elements
Expand All @@ -103,8 +93,7 @@ impl ArrayBatchDecoder for MapArrayDecoder {
let entries =
StructArray::try_new(self.fields.clone(), vec![keys_array, values_array], None)
.context(ArrowSnafu)?;
let lengths = populate_lengths_with_nulls(lengths, batch_size, &present);
let offsets = OffsetBuffer::from_lengths(lengths);
let offsets = OffsetBuffer::from_lengths(lengths.into_iter().map(|l| l as usize));
let null_buffer = present.map(NullBuffer::from);

let field = Arc::new(Field::new_struct("entries", self.fields.clone(), false));
Expand Down
23 changes: 0 additions & 23 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,29 +241,6 @@ fn derive_present_vec(
}
}

/// Fix the lengths to account for nulls (represented as 0 length)
fn populate_lengths_with_nulls(
lengths: Vec<i64>,
batch_size: usize,
present: &Option<Vec<bool>>,
) -> Vec<usize> {
if let Some(present) = present {
let mut lengths_with_nulls = Vec::with_capacity(batch_size);
let mut lengths = lengths.iter();
for &is_present in present {
if is_present {
let length = *lengths.next().unwrap();
lengths_with_nulls.push(length as usize);
} else {
lengths_with_nulls.push(0);
}
}
lengths_with_nulls
} else {
lengths.into_iter().map(|l| l as usize).collect()
}
}

fn create_null_buffer(present: Option<Vec<bool>>) -> Option<NullBuffer> {
match present {
// Edge case where keys of map cannot have a null buffer
Expand Down
18 changes: 8 additions & 10 deletions src/array_decoder/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::compute::kernels::cast;
use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType, GenericStringType};
use snafu::ResultExt;

use crate::array_decoder::{create_null_buffer, derive_present_vec, populate_lengths_with_nulls};
use crate::array_decoder::{create_null_buffer, derive_present_vec};
use crate::column::{get_present_vec, Column};
use crate::compression::Decompressor;
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
Expand Down Expand Up @@ -116,14 +116,12 @@ impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
) -> Result<GenericByteArray<T>> {
let present = derive_present_vec(&mut self.present, parent_present, batch_size);

// How many lengths we need to fetch
let elements_to_fetch = if let Some(present) = &present {
present.iter().filter(|&&is_present| is_present).count()
let mut lengths = vec![0; batch_size];
if let Some(present) = &present {
self.lengths.decode_spaced(&mut lengths, present)?;
} else {
batch_size
};
let mut lengths = vec![0; elements_to_fetch];
self.lengths.decode(&mut lengths)?;
self.lengths.decode(&mut lengths)?;
}
let total_length: i64 = lengths.iter().sum();
// Fetch all data bytes at once
let mut bytes = Vec::with_capacity(total_length as usize);
Expand All @@ -133,8 +131,8 @@ impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
.read_to_end(&mut bytes)
.context(IoSnafu)?;
let bytes = Buffer::from(bytes);
let lengths = populate_lengths_with_nulls(lengths, batch_size, &present);
let offsets = OffsetBuffer::<T::Offset>::from_lengths(lengths);
let offsets =
OffsetBuffer::<T::Offset>::from_lengths(lengths.into_iter().map(|l| l as usize));
let null_buffer = create_null_buffer(present);

let array =
Expand Down

0 comments on commit f225acb

Please sign in to comment.