Skip to content

Commit

Permalink
Centralize string column decoding into decoder/string.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Mar 4, 2024
1 parent 574886d commit dd536d6
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 250 deletions.
5 changes: 2 additions & 3 deletions src/arrow_reader/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub mod boolean;
pub mod float;
pub mod int;
pub mod present;
pub mod string;
pub mod timestamp;
pub mod tinyint;

Expand Down Expand Up @@ -159,8 +158,8 @@ impl Column {
}

pub struct NullableIterator<T> {
present: Box<dyn Iterator<Item = bool> + Send>,
iter: Box<dyn Iterator<Item = Result<T>> + Send>,
pub(crate) present: Box<dyn Iterator<Item = bool> + Send>,
pub(crate) iter: Box<dyn Iterator<Item = Result<T>> + Send>,
}

impl<T> Iterator for NullableIterator<T> {
Expand Down
114 changes: 0 additions & 114 deletions src/arrow_reader/column/string.rs

This file was deleted.

141 changes: 8 additions & 133 deletions src/arrow_reader/decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, BinaryBuilder, BooleanBuilder, PrimitiveBuilder, StringArray, StringBuilder,
StringDictionaryBuilder,
};
use arrow::array::{Array, ArrayRef, BinaryBuilder, BooleanBuilder, PrimitiveBuilder};
use arrow::datatypes::ArrowPrimitiveType;
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
TimestampNanosecondType, UInt64Type,
TimestampNanosecondType,
};
use arrow::record_batch::RecordBatch;
use snafu::ResultExt;
Expand All @@ -16,7 +13,6 @@ use crate::arrow_reader::column::binary::new_binary_iterator;
use crate::arrow_reader::column::boolean::new_boolean_iter;
use crate::arrow_reader::column::float::new_float_iter;
use crate::arrow_reader::column::int::new_int_iter;
use crate::arrow_reader::column::string::StringDecoder;
use crate::arrow_reader::column::timestamp::new_timestamp_iter;
use crate::arrow_reader::column::NullableIterator;
use crate::error::{self, Result};
Expand All @@ -25,14 +21,16 @@ use crate::stripe::Stripe;

use self::list::ListArrayDecoder;
use self::map::MapArrayDecoder;
use self::string::new_string_decoder;
use self::struct_decoder::StructArrayDecoder;

use super::column::tinyint::new_i8_iter;
use super::column::Column;

pub mod list;
pub mod map;
pub mod struct_decoder;
mod list;
mod map;
mod string;
mod struct_decoder;

struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
inner: NullableIterator<T::Native>,
Expand Down Expand Up @@ -151,123 +149,6 @@ impl ArrayBatchDecoder for BooleanArrayDecoder {
}
}

struct DirectStringArrayDecoder {
inner: NullableIterator<String>,
}

impl DirectStringArrayDecoder {
pub fn new(inner: NullableIterator<String>) -> Self {
Self { inner }
}
}

impl ArrayBatchDecoder for DirectStringArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
let mut builder = StringBuilder::new();

let mut iter = self.inner.by_ref().take(batch_size);
if let Some(parent_present) = parent_present {
debug_assert_eq!(
parent_present.len(),
batch_size,
"when provided, parent_present length must equal batch_size"
);

for &is_present in parent_present {
if is_present {
// TODO: return as error instead
let opt = iter
.next()
.transpose()?
.expect("array less than expected length");
builder.append_option(opt);
} else {
builder.append_null();
}
}
} else {
for opt in iter {
let opt = opt?;
builder.append_option(opt);
}
};

let array = Arc::new(builder.finish());
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
}
}

struct DictionaryStringArrayDecoder {
indexes: NullableIterator<u64>,
dictionary: Arc<StringArray>,
}

impl DictionaryStringArrayDecoder {
pub fn new(indexes: NullableIterator<u64>, dictionary: Arc<StringArray>) -> Self {
Self {
indexes,
dictionary,
}
}
}

impl ArrayBatchDecoder for DictionaryStringArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
// Safety: keys won't overflow
let mut builder = StringDictionaryBuilder::<UInt64Type>::new_with_dictionary(
batch_size,
&self.dictionary,
)
.unwrap();

let mut indexes = self.indexes.by_ref().take(batch_size);
if let Some(parent_present) = parent_present {
debug_assert_eq!(
parent_present.len(),
batch_size,
"when provided, parent_present length must equal batch_size"
);

for &is_present in parent_present {
if is_present {
// TODO: return as error instead
let index = indexes
.next()
.transpose()?
.expect("array less than expected length");
builder.append_option(index.map(|idx| self.dictionary.value(idx as usize)));
} else {
builder.append_null();
}
}
} else {
for index in indexes {
let index = index?;
builder.append_option(index.map(|idx| self.dictionary.value(idx as usize)));
}
};

let array = Arc::new(builder.finish());
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
}
}

struct BinaryArrayDecoder {
inner: NullableIterator<Vec<u8>>,
}
Expand Down Expand Up @@ -416,13 +297,7 @@ pub fn array_decoder_factory(
Box::new(Float64ArrayDecoder::new(inner))
}
DataType::String { .. } | DataType::Varchar { .. } | DataType::Char { .. } => {
let inner = StringDecoder::new(column, stripe)?;
match inner {
StringDecoder::Direct(inner) => Box::new(DirectStringArrayDecoder::new(inner)),
StringDecoder::Dictionary((indexes, dictionary)) => {
Box::new(DictionaryStringArrayDecoder::new(indexes, dictionary))
}
}
new_string_decoder(column, stripe)?
}
DataType::Binary { .. } => {
let inner = new_binary_iterator(column, stripe)?;
Expand Down
Loading

0 comments on commit dd536d6

Please sign in to comment.