diff --git a/Cargo.toml b/Cargo.toml index 522e6c4b..d82191ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ rust-version = "1.70" arrow = { version = "47.0", features = ["prettyprint"] } bytes = "1.4" chrono = "0.4" + fallible-streaming-iterator = { version = "0.1" } flate2 = "1" futures = { version = "0.3", default-features = false, features = ["std"] } diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index 49ab601f..99c246e9 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -5,17 +5,24 @@ use std::io::{Read, Seek}; use std::sync::Arc; use arrow::array::{ - ArrayRef, BinaryArray, BooleanArray, DictionaryArray, PrimitiveArray, StringArray, + Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, + Date32Array, Date32Builder, Float32Array, Float32Builder, Float64Builder, Int64Array, + Int64Builder, Int8Array, Int8Builder, PrimitiveBuilder, StringArray, StringBuilder, + StringDictionaryBuilder, StructBuilder, TimestampNanosecondBuilder, }; +use arrow::array::{Float64Array, TimestampNanosecondArray}; use arrow::datatypes::{ - Date32Type, Int16Type, Int32Type, Int64Type, Schema, SchemaRef, TimestampNanosecondType, - UInt64Type, + Date32Type, Float32Type, Float64Type, Int64Type, Int8Type, Schema, SchemaRef, + TimestampNanosecondType, UInt64Type, }; +use arrow::datatypes::{Field, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::{RecordBatch, RecordBatchReader}; +use bytes::Bytes; use chrono::{Datelike, NaiveDate, NaiveDateTime}; use snafu::{OptionExt, ResultExt}; +use self::column::struct_column::new_struct_iter; use self::column::tinyint::new_i8_iter; use self::column::Column; use crate::arrow_reader::column::binary::new_binary_iterator; @@ -24,11 +31,13 @@ use crate::arrow_reader::column::date::{new_date_iter, UNIX_EPOCH_FROM_CE}; use crate::arrow_reader::column::float::{new_f32_iter, new_f64_iter}; use crate::arrow_reader::column::int::new_i64_iter; use crate::arrow_reader::column::string::StringDecoder; +use crate::arrow_reader::column::struct_column::StructDecoder; use crate::arrow_reader::column::timestamp::new_timestamp_iter; use crate::arrow_reader::column::NullableIterator; use crate::error::{self, Result}; +use crate::proto::stream::Kind; use crate::proto::{StripeFooter, StripeInformation}; -use crate::reader::decompress::Compression; +use crate::reader::decompress::{Compression, Decompressor}; use crate::reader::schema::{create_field, TypeDescription}; use crate::reader::Reader; @@ -143,6 +152,479 @@ pub enum Decoder { Date(NullableIterator), String(StringDecoder), Binary(NullableIterator>), + Struct(StructDecoder), +} + +macro_rules! impl_append_struct_value { + ($typ:ident) => { + paste::item! { + + fn []( + idx: usize, + column: &ArrayRef, + builder: &mut StructBuilder, + ) { + type Array = [<$typ Array>]; + type Builder = [<$typ Builder>]; + + let values = column.as_any().downcast_ref::().unwrap(); + for value in values { + builder + .field_builder::(idx) + .unwrap() + .append_option(value); + } + println!("--values: {:?}, builder len: {}--", values, builder.len()); + } + } + }; +} + +impl_append_struct_value!(Boolean); +impl_append_struct_value!(Int8); +impl_append_struct_value!(Int64); +impl_append_struct_value!(Float32); +impl_append_struct_value!(Float64); +impl_append_struct_value!(Date32); +impl_append_struct_value!(Binary); +impl_append_struct_value!(TimestampNanosecond); + +macro_rules! impl_append_struct_null { + ($typ:ident) => { + paste::item! { + + fn []( + idx: usize, + builder: &mut StructBuilder, + ) { + type Builder = [<$typ Builder>]; + + builder + .field_builder::(idx) + .unwrap() + .append_null(); + println!("--append null, builder len: {}--", builder.len()); + } + } + }; +} + +impl_append_struct_null!(Boolean); +impl_append_struct_null!(Int8); +impl_append_struct_null!(Int64); +impl_append_struct_null!(Float32); +impl_append_struct_null!(Float64); +impl_append_struct_null!(Date32); +impl_append_struct_null!(Binary); +impl_append_struct_null!(TimestampNanosecond); + +pub fn append_struct_value( + idx: usize, + column: &ArrayRef, + builder: &mut StructBuilder, + decoder: &Decoder, +) -> Result<()> { + match column.data_type() { + arrow::datatypes::DataType::Boolean => { + append_struct_boolean_value(idx, column, builder); + } + arrow::datatypes::DataType::Int8 => append_struct_int8_value(idx, column, builder), + arrow::datatypes::DataType::Int64 => append_struct_int64_value(idx, column, builder), + arrow::datatypes::DataType::Float32 => append_struct_float32_value(idx, column, builder), + arrow::datatypes::DataType::Float64 => append_struct_float64_value(idx, column, builder), + arrow::datatypes::DataType::Timestamp(TimeUnit::Nanosecond, _) => { + append_struct_timestampnanosecond_value(idx, column, builder) + } + &arrow::datatypes::DataType::Binary => append_struct_binary_value(idx, column, builder), + arrow::datatypes::DataType::Utf8 => { + let values = column.as_any().downcast_ref::().unwrap(); + + match decoder { + Decoder::String(decoder) => match decoder { + StringDecoder::Direct(_) => { + for value in values { + builder + .field_builder::(idx) + .unwrap() + .append_option(value); + } + } + StringDecoder::Dictionary(_) => { + for value in values { + builder + .field_builder::>(idx) + .unwrap() + .append_option(value); + } + } + }, + _ => unreachable!(), + } + } + arrow::datatypes::DataType::Date32 => append_struct_date32_value(idx, column, builder), + + _ => unreachable!(), + } + + Ok(()) +} + +pub fn append_struct_null( + idx: usize, + field: &Field, + builder: &mut StructBuilder, + decoder: &Decoder, +) -> Result<()> { + match field.data_type() { + arrow::datatypes::DataType::Boolean => { + append_struct_boolean_null(idx, builder); + } + arrow::datatypes::DataType::Int8 => append_struct_int8_null(idx, builder), + arrow::datatypes::DataType::Int64 => append_struct_int64_null(idx, builder), + arrow::datatypes::DataType::Float32 => append_struct_float32_null(idx, builder), + arrow::datatypes::DataType::Float64 => append_struct_float64_null(idx, builder), + arrow::datatypes::DataType::Timestamp(TimeUnit::Nanosecond, _) => { + append_struct_timestampnanosecond_null(idx, builder) + } + &arrow::datatypes::DataType::Binary => append_struct_binary_null(idx, builder), + arrow::datatypes::DataType::Utf8 => match decoder { + Decoder::String(decoder) => match decoder { + StringDecoder::Direct(_) => { + builder + .field_builder::(idx) + .unwrap() + .append_null(); + } + StringDecoder::Dictionary(_) => { + builder + .field_builder::>(idx) + .unwrap() + .append_null(); + } + }, + _ => unreachable!(), + }, + arrow::datatypes::DataType::Date32 => append_struct_date32_null(idx, builder), + + _ => unreachable!(), + } + + Ok(()) +} + +impl Decoder { + pub fn new_array_builder(&self, capacity: usize) -> Box { + match self { + Decoder::Int64(_) => Box::new(PrimitiveBuilder::::with_capacity(capacity)), + Decoder::Int32(_) => Box::new(PrimitiveBuilder::::with_capacity(capacity)), + Decoder::Int16(_) => Box::new(PrimitiveBuilder::::with_capacity(capacity)), + Decoder::Int8(_) => Box::new(PrimitiveBuilder::::with_capacity(capacity)), + Decoder::Boolean(_) => Box::new(BooleanBuilder::with_capacity(capacity)), + Decoder::Float32(_) => { + Box::new(PrimitiveBuilder::::with_capacity(capacity)) + } + Decoder::Float64(_) => { + Box::new(PrimitiveBuilder::::with_capacity(capacity)) + } + Decoder::Timestamp(_) => Box::new( + PrimitiveBuilder::::with_capacity(capacity), + ), + Decoder::Date(_) => Box::new(PrimitiveBuilder::::with_capacity(capacity)), + Decoder::String(decoder) => match decoder { + StringDecoder::Direct(_) => Box::new(StringBuilder::new()), + StringDecoder::Dictionary((_, dictionary)) => { + // Safety: keys won't overflow + let builder = StringDictionaryBuilder::::new_with_dictionary( + capacity, dictionary, + ) + .unwrap(); + + Box::new(builder) + } + }, + Decoder::Binary(_) => Box::new(BinaryBuilder::new()), + Decoder::Struct(decoder) => decoder.new_builder(capacity), + } + } + + // returns true if has more. + pub fn append_value(&mut self, builder: &mut Box) -> Result { + let mut has_more = false; + match self { + Decoder::Int64(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Int32(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Int16(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Int8(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Boolean(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Float32(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Float64(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Timestamp(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value.map(|value| value.timestamp_nanos_opt().unwrap())); + } + } + Decoder::Date(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option( + value.map(|value| value.num_days_from_ce() - UNIX_EPOCH_FROM_CE), + ); + } + } + Decoder::String(decoder) => match decoder { + StringDecoder::Direct(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + StringDecoder::Dictionary((indexes, dictionary)) => { + let value = indexes.next().transpose()?; + if let Some(index) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::>() + .unwrap() + .append_option(index.map(|idx| dictionary.value(idx as usize))) + } + } + }, + Decoder::Binary(iter) => { + let value = iter.next().transpose()?; + if let Some(value) = value { + has_more = true; + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_option(value); + } + } + Decoder::Struct(iter) => { + let values = iter.next().transpose()?; + let builder = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + + if let Some(values) = values { + has_more = true; + + if let Some(values) = values { + builder.append(true); + + for (idx, column) in values.iter().enumerate() { + append_struct_value(idx, column, builder, &iter.decoders[idx])?; + } + } else { + builder.append_null(); + + for (idx, filed) in iter.fields.iter().enumerate() { + append_struct_null(idx, filed, builder, &iter.decoders[idx])?; + } + } + } + } + } + + Ok(has_more) + } + + pub fn append_null(&self, builder: &mut Box) { + match self { + Decoder::Int64(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Int32(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Int16(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Int8(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Boolean(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Float32(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Float64(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Timestamp(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Date(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::String(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Binary(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + Decoder::Struct(_) => { + builder + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_null(); + } + } + } +} + +impl BatchDecoder for Decoder { + fn next_batch(&mut self, chunk: usize) -> Result> { + let mut builder = self.new_array_builder(chunk); + + for _ in 0..chunk { + if !self.append_value(&mut builder)? { + break; + } + } + + let output = builder.finish(); + + if output.is_empty() { + Ok(None) + } else { + Ok(Some(output)) + } + } } pub struct NaiveStripeDecoder { @@ -168,162 +650,57 @@ impl Iterator for NaiveStripeDecoder { } } -impl NullableIterator { - fn collect_chunk(&mut self, chunk: usize) -> Option>>> { - let mut buf = Vec::with_capacity(chunk); - for _ in 0..chunk { - match self.next() { - Some(Ok(value)) => { - buf.push(value); - } - Some(Err(err)) => return Some(Err(err)), - None => break, - } - } - - Some(Ok(buf)) - } +pub trait BatchDecoder: Send { + fn next_batch(&mut self, chunk: usize) -> Result>; } -macro_rules! impl_decode_next_batch { - ($name:ident) => { - paste::item! { - fn []( - decoder: &mut NullableIterator<$name>, - chunk: usize, - ) -> Result> { - Ok(match decoder.collect_chunk(chunk).transpose()? { - Some(values) => Some(Arc::new(PrimitiveArray::from(values)) as ArrayRef), - None => None, - }) - } +pub fn reader_factory(col: &Column, stripe: &Stripe) -> Result { + let reader = match col.kind() { + crate::proto::r#type::Kind::Boolean => Decoder::Boolean(new_boolean_iter(col, stripe)?), + crate::proto::r#type::Kind::Byte => Decoder::Int8(new_i8_iter(col, stripe)?), + crate::proto::r#type::Kind::Short => Decoder::Int16(new_i64_iter(col, stripe)?), + crate::proto::r#type::Kind::Int => Decoder::Int32(new_i64_iter(col, stripe)?), + crate::proto::r#type::Kind::Long => Decoder::Int64(new_i64_iter(col, stripe)?), + crate::proto::r#type::Kind::Float => Decoder::Float32(new_f32_iter(col, stripe)?), + crate::proto::r#type::Kind::Double => Decoder::Float64(new_f64_iter(col, stripe)?), + crate::proto::r#type::Kind::String => Decoder::String(StringDecoder::new(col, stripe)?), + crate::proto::r#type::Kind::Binary => Decoder::Binary(new_binary_iterator(col, stripe)?), + crate::proto::r#type::Kind::Timestamp => { + Decoder::Timestamp(new_timestamp_iter(col, stripe)?) } + crate::proto::r#type::Kind::List => todo!(), + crate::proto::r#type::Kind::Map => todo!(), + crate::proto::r#type::Kind::Struct => Decoder::Struct(new_struct_iter(col, stripe)?), + crate::proto::r#type::Kind::Union => todo!(), + crate::proto::r#type::Kind::Decimal => todo!(), + crate::proto::r#type::Kind::Date => Decoder::Date(new_date_iter(col, stripe)?), + crate::proto::r#type::Kind::Varchar => Decoder::String(StringDecoder::new(col, stripe)?), + crate::proto::r#type::Kind::Char => Decoder::String(StringDecoder::new(col, stripe)?), + crate::proto::r#type::Kind::TimestampInstant => todo!(), }; -} -macro_rules! impl_decode_next_batch_cast { - ($target:ident,$tp:ident) => { - paste::item! { - fn []( - decoder: &mut NullableIterator, - chunk: usize, - ) -> Result> { - Ok(match decoder.collect_chunk(chunk).transpose()? { - Some(values) => { - let values = values - .into_iter() - .map(|v| v.map(|v| v as $target)) - .collect::>(); - Some(Arc::new(PrimitiveArray::<$tp>::from(values)) as ArrayRef) - } - None => None, - }) - } - } - }; + Ok(reader) } -impl_decode_next_batch_cast!(i64, Int64Type); -impl_decode_next_batch_cast!(i32, Int32Type); -impl_decode_next_batch_cast!(i16, Int16Type); -impl_decode_next_batch!(i8); -impl_decode_next_batch!(f32); -impl_decode_next_batch!(f64); - impl NaiveStripeDecoder { - fn decode_next_batch(&mut self) -> Result> { + fn inner_decode_next_batch(&mut self) -> Result> { let chunk = self.batch_size; let mut fields = Vec::with_capacity(self.stripe.columns.len()); for decoder in &mut self.decoders { - match decoder { - Decoder::Boolean(decoder) => { - match decoder.collect_chunk(chunk).transpose()? { - Some(values) => { - fields.push(Arc::new(BooleanArray::from(values)) as ArrayRef) - } - None => break, - }; - } - Decoder::Int64(decoder) => match decode_next_batch_i64(decoder, chunk)? { - Some(array) => fields.push(array), - None => break, - }, - Decoder::Int32(decoder) => match decode_next_batch_i32(decoder, chunk)? { - Some(array) => fields.push(array), - None => break, - }, - Decoder::Int16(decoder) => match decode_next_batch_i16(decoder, chunk)? { - Some(array) => fields.push(array), - None => break, - }, - Decoder::Int8(decoder) => match decode_next_batch_i8(decoder, chunk)? { - Some(array) => fields.push(array), - None => break, - }, - Decoder::Float32(decoder) => match decode_next_batch_f32(decoder, chunk)? { - Some(array) => fields.push(array), - None => break, - }, - Decoder::Float64(decoder) => match decode_next_batch_f64(decoder, chunk)? { - Some(array) => fields.push(array), - None => break, - }, - Decoder::Timestamp(decoder) => match decoder.collect_chunk(chunk).transpose()? { - Some(values) => { - let iter = values - .into_iter() - .filter_map(|value| value.map(|value| value.timestamp_nanos_opt())); - fields.push( - Arc::new(PrimitiveArray::::from_iter(iter)) - as ArrayRef, - ); - } - None => break, - }, - Decoder::Date(decoder) => match decoder.collect_chunk(chunk).transpose()? { - Some(values) => { - let iter = values.into_iter().map(|value| { - value.map(|value| value.num_days_from_ce() - UNIX_EPOCH_FROM_CE) - }); - fields.push( - Arc::new(PrimitiveArray::::from_iter(iter)) as ArrayRef - ); - } - None => break, - }, - Decoder::String(decoder) => match decoder { - StringDecoder::Direct(decoder) => { - match decoder.collect_chunk(chunk).transpose()? { - Some(values) => { - fields.push(Arc::new(StringArray::from(values)) as ArrayRef); - } - None => break, - } - } - StringDecoder::Dictionary((indexes, dictionary)) => { - match indexes.collect_chunk(chunk).transpose()? { - Some(indexes) => { - fields.push(Arc::new(DictionaryArray::::new( - indexes.into(), - dictionary.clone(), - ))); - } - None => break, - } - } - }, - Decoder::Binary(binary) => match binary.collect_chunk(chunk).transpose()? { - Some(values) => { - let ref_vec = values.iter().map(|opt| opt.as_deref()).collect::>(); - fields.push(Arc::new(BinaryArray::from_opt_vec(ref_vec)) as ArrayRef); - } - None => break, - }, + match decoder.next_batch(chunk)? { + Some(array) => fields.push(array), + None => break, } } + Ok(fields) + } + + fn decode_next_batch(&mut self) -> Result> { + let fields = self.inner_decode_next_batch()?; + if fields.is_empty() { Ok(None) } else { @@ -349,30 +726,9 @@ impl NaiveStripeDecoder { .first() .map(|c| c.number_of_rows()) .unwrap_or_default(); + for col in &stripe.columns { - let decoder = match col.kind() { - crate::proto::r#type::Kind::Boolean => Decoder::Boolean(new_boolean_iter(col)?), - crate::proto::r#type::Kind::Byte => Decoder::Int8(new_i8_iter(col)?), - crate::proto::r#type::Kind::Short => Decoder::Int16(new_i64_iter(col)?), - crate::proto::r#type::Kind::Int => Decoder::Int32(new_i64_iter(col)?), - crate::proto::r#type::Kind::Long => Decoder::Int64(new_i64_iter(col)?), - crate::proto::r#type::Kind::Float => Decoder::Float32(new_f32_iter(col)?), - crate::proto::r#type::Kind::Double => Decoder::Float64(new_f64_iter(col)?), - crate::proto::r#type::Kind::String => Decoder::String(StringDecoder::new(col)?), - crate::proto::r#type::Kind::Binary => Decoder::Binary(new_binary_iterator(col)?), - crate::proto::r#type::Kind::Timestamp => { - Decoder::Timestamp(new_timestamp_iter(col)?) - } - crate::proto::r#type::Kind::List => todo!(), - crate::proto::r#type::Kind::Map => todo!(), - crate::proto::r#type::Kind::Struct => todo!(), - crate::proto::r#type::Kind::Union => todo!(), - crate::proto::r#type::Kind::Decimal => todo!(), - crate::proto::r#type::Kind::Date => Decoder::Date(new_date_iter(col)?), - crate::proto::r#type::Kind::Varchar => Decoder::String(StringDecoder::new(col)?), - crate::proto::r#type::Kind::Char => Decoder::String(StringDecoder::new(col)?), - crate::proto::r#type::Kind::TimestampInstant => todo!(), - }; + let decoder = reader_factory(col, &stripe)?; decoders.push(decoder); } @@ -441,12 +797,31 @@ pub struct Stripe { pub(crate) columns: Vec, pub(crate) stripe_offset: usize, pub(crate) info: StripeInformation, + /// <(ColumnId, Kind), Bytes> + pub(crate) stream_map: Arc, +} + +#[derive(Debug)] +pub struct StreamMap { + pub inner: HashMap<(u32, Kind), Bytes>, + pub compression: Option, +} + +impl StreamMap { + pub fn get(&self, column: &Column, kind: Kind) -> Option { + let column_id = column.column_id(); + + self.inner + .get(&(column_id, kind)) + .cloned() + .map(|data| Decompressor::new(data, self.compression, vec![])) + } } impl Stripe { pub fn new( r: &mut Reader, - columns: &[(String, Arc)], + column_defs: &[(String, Arc)], stripe: usize, info: StripeInformation, ) -> Result { @@ -457,16 +832,34 @@ impl Stripe { r.metadata().postscript.compression_block_size, ); //TODO(weny): add tz - let columns = columns - .iter() - .map(|(name, typ)| Column::new(r, compression, name, typ, &footer, &info)) - .collect::>>()?; + let mut columns = Vec::with_capacity(column_defs.len()); + for (name, typ) in column_defs.iter() { + columns.push(Column::new(name, typ, &footer, &info)); + } + + let mut stream_map = HashMap::new(); + let mut stream_offset = info.offset(); + for stream in &footer.streams { + let length = stream.length(); + let column_id = stream.column(); + let kind = stream.kind(); + let data = Column::read_stream(r, stream_offset, length as usize)?; + + // TODO(weny): filter out unused streams. + stream_map.insert((column_id, kind), data); + + stream_offset += length; + } Ok(Self { footer, columns, stripe_offset: stripe, info, + stream_map: Arc::new(StreamMap { + inner: stream_map, + compression, + }), }) } diff --git a/src/arrow_reader/column.rs b/src/arrow_reader/column.rs index 5dd7cc8e..4e5446f1 100644 --- a/src/arrow_reader/column.rs +++ b/src/arrow_reader/column.rs @@ -1,6 +1,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; +use arrow::datatypes::Field; use bytes::Bytes; use snafu::{OptionExt, ResultExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -8,8 +9,8 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use crate::error::{self, Result}; use crate::proto::stream::Kind; use crate::proto::{ColumnEncoding, StripeFooter, StripeInformation}; -use crate::reader::decompress::{Compression, Decompressor}; -use crate::reader::schema::TypeDescription; + +use crate::reader::schema::{create_field, TypeDescription}; use crate::reader::Reader; pub mod binary; @@ -19,19 +20,24 @@ pub mod float; pub mod int; pub mod present; pub mod string; +pub mod struct_column; pub mod timestamp; pub mod tinyint; #[derive(Debug)] pub struct Column { - data: Bytes, number_of_rows: u64, - compression: Option, footer: Arc, name: String, column: Arc, } +impl From for Field { + fn from(value: Column) -> Self { + create_field((&value.name, &value.column)) + } +} + macro_rules! impl_read_stream { ($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{ $reader @@ -98,67 +104,18 @@ impl Column { Ok((start, length)) } - pub fn new( - reader: &mut Reader, - compression: Option, + pub fn new( name: &str, column: &Arc, footer: &Arc, stripe: &StripeInformation, - ) -> Result { - let (start, length) = Column::get_stream_info(name, column, footer, stripe)?; - let data = Column::read_stream(reader, start, length)?; - - Ok(Self { - data, + ) -> Self { + Self { number_of_rows: stripe.number_of_rows(), - compression, footer: footer.clone(), column: column.clone(), name: name.to_string(), - }) - } - - pub async fn new_async( - reader: &mut Reader, - compression: Option, - name: &str, - column: &Arc, - footer: &Arc, - stripe: &StripeInformation, - ) -> Result { - let (start, length) = Column::get_stream_info(name, column, footer, stripe)?; - let data = Column::read_stream_async(reader, start, length).await?; - - Ok(Self { - data, - number_of_rows: stripe.number_of_rows(), - compression, - footer: footer.clone(), - column: column.clone(), - name: name.to_string(), - }) - } - - pub fn stream(&self, kind: Kind) -> Option> { - let mut start = 0; // the start of the stream - - let column_id = self.column.column_id() as u32; - self.footer - .streams - .iter() - .filter(|stream| stream.column() == column_id && stream.kind() != Kind::RowIndex) - .map(|stream| { - start += stream.length() as usize; - stream - }) - .find(|stream| stream.kind() == kind) - .map(|stream| { - let length = stream.length() as usize; - let data = self.data.slice((start - length)..start); - Decompressor::new(data, self.compression, vec![]) - }) - .map(Ok) + } } pub fn dictionary_size(&self) -> usize { @@ -184,6 +141,27 @@ impl Column { pub fn name(&self) -> &str { &self.name } + + pub fn column_id(&self) -> u32 { + self.column.column_id() as u32 + } + + pub fn children(&self) -> Vec { + let children = self.column.children(); + + let mut columns = Vec::with_capacity(children.len()); + + for (name, column) in children { + columns.push(Column { + number_of_rows: self.number_of_rows, + footer: self.footer.clone(), + name, + column, + }); + } + + columns + } } pub struct NullableIterator { diff --git a/src/arrow_reader/column/binary.rs b/src/arrow_reader/column/binary.rs index 672ab5c0..2c486870 100644 --- a/src/arrow_reader/column/binary.rs +++ b/src/arrow_reader/column/binary.rs @@ -16,23 +16,28 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error; use crate::proto::stream::Kind; use crate::reader::decode::get_direct_unsigned_rle_reader; use crate::reader::decode::variable_length::Values; use crate::reader::decompress::Decompressor; -pub fn new_binary_iterator(column: &Column) -> error::Result>> { - let null_mask = new_present_iter(column)?.collect::>>()?; +pub fn new_binary_iterator( + column: &Column, + stripe: &Stripe, +) -> error::Result>> { + let null_mask = new_present_iter(column, stripe)?.collect::>>()?; - let values = column - .stream(Kind::Data) - .transpose()? + let values = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| Box::new(Values::new(reader, vec![]))) .context(error::InvalidColumnSnafu { name: &column.name })?; - let lengths = column - .stream(Kind::Length) - .transpose()? + + let lengths = stripe + .stream_map + .get(column, Kind::Length) .map(|reader| get_direct_unsigned_rle_reader(column, reader)) .context(error::InvalidColumnSnafu { name: &column.name })??; diff --git a/src/arrow_reader/column/boolean.rs b/src/arrow_reader/column/boolean.rs index 59433d7f..18df13f0 100644 --- a/src/arrow_reader/column/boolean.rs +++ b/src/arrow_reader/column/boolean.rs @@ -2,17 +2,18 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{InvalidColumnSnafu, Result}; use crate::proto::stream::Kind; use crate::reader::decode::boolean_rle::BooleanIter; -pub fn new_boolean_iter(column: &Column) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; +pub fn new_boolean_iter(column: &Column, stripe: &Stripe) -> Result> { + let present = new_present_iter(column, stripe)?.collect::>>()?; let rows: usize = present.iter().filter(|&p| *p).count(); - let iter = column - .stream(Kind::Data) - .transpose()? + let iter = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| { Box::new(BooleanIter::new(reader, rows)) as Box> + Send> diff --git a/src/arrow_reader/column/date.rs b/src/arrow_reader/column/date.rs index 8f98a025..6313acca 100644 --- a/src/arrow_reader/column/date.rs +++ b/src/arrow_reader/column/date.rs @@ -3,6 +3,7 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{self, Result}; use crate::proto::stream::Kind; use crate::reader::decode::get_direct_signed_rle_reader; @@ -37,12 +38,12 @@ impl Iterator for DateIterator { } } -pub fn new_date_iter(column: &Column) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; +pub fn new_date_iter(column: &Column, stripe: &Stripe) -> Result> { + let present = new_present_iter(column, stripe)?.collect::>>()?; - let data = column - .stream(Kind::Data) - .transpose()? + let data = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| get_direct_signed_rle_reader(column, reader)) .context(error::InvalidColumnSnafu { name: &column.name })??; diff --git a/src/arrow_reader/column/float.rs b/src/arrow_reader/column/float.rs index 694580e9..67d65b16 100644 --- a/src/arrow_reader/column/float.rs +++ b/src/arrow_reader/column/float.rs @@ -2,6 +2,7 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{InvalidColumnSnafu, Result}; use crate::proto::stream::Kind; use crate::reader::decode::float::FloatIter; @@ -9,12 +10,12 @@ use crate::reader::decode::float::FloatIter; macro_rules! impl_float_iter { ($tp:ident) => { paste::item! { - pub fn [] (column: &Column) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; + pub fn [] (column: &Column, stripe: &Stripe) -> Result> { + let present = new_present_iter(column, stripe)?.collect::>>()?; let rows: usize = present.iter().filter(|&p| *p).count(); - let iter = column - .stream(Kind::Data) - .transpose()? + let iter = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| Box::new(FloatIter::<$tp, _>::new(reader, rows))) .context(InvalidColumnSnafu { name: &column.name })?; diff --git a/src/arrow_reader/column/int.rs b/src/arrow_reader/column/int.rs index b01d8834..c24a6fe8 100644 --- a/src/arrow_reader/column/int.rs +++ b/src/arrow_reader/column/int.rs @@ -2,16 +2,17 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{InvalidColumnSnafu, Result}; use crate::proto::stream::Kind; use crate::reader::decode::get_direct_signed_rle_reader; -pub fn new_i64_iter(column: &Column) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; +pub fn new_i64_iter(column: &Column, stripe: &Stripe) -> Result> { + let present = new_present_iter(column, stripe)?.collect::>>()?; - let iter = column - .stream(Kind::Data) - .transpose()? + let iter = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| get_direct_signed_rle_reader(column, reader)) .context(InvalidColumnSnafu { name: &column.name })??; diff --git a/src/arrow_reader/column/present.rs b/src/arrow_reader/column/present.rs index 4d236e39..5864714c 100644 --- a/src/arrow_reader/column/present.rs +++ b/src/arrow_reader/column/present.rs @@ -1,13 +1,17 @@ use crate::arrow_reader::column::Column; +use crate::arrow_reader::Stripe; use crate::error::Result; use crate::proto::stream::Kind; use crate::reader::decode::boolean_rle::BooleanIter; -pub fn new_present_iter(column: &Column) -> Result>>> { +pub fn new_present_iter( + column: &Column, + stripe: &Stripe, +) -> Result>>> { let rows = column.number_of_rows as usize; - let iter = column - .stream(Kind::Present) - .transpose()? + let iter = stripe + .stream_map + .get(column, Kind::Present) .map(|reader| { Box::new(BooleanIter::new(reader, rows)) as Box>> }) diff --git a/src/arrow_reader/column/string.rs b/src/arrow_reader/column/string.rs index 51b5744a..2e730498 100644 --- a/src/arrow_reader/column/string.rs +++ b/src/arrow_reader/column/string.rs @@ -1,10 +1,11 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, StringArray}; +use arrow::array::StringArray; use snafu::{OptionExt, ResultExt}; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{self, Result}; use crate::proto::column_encoding::Kind as ColumnEncodingKind; use crate::proto::stream::Kind; @@ -38,18 +39,19 @@ impl Iterator for DirectStringIterator { pub fn new_direct_string_iter( column: &Column, rle_version: RleVersion, + stripe: &Stripe, ) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; + let present = new_present_iter(column, stripe)?.collect::>>()?; - let values = column - .stream(Kind::Data) - .transpose()? + let values = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| Box::new(Values::new(reader, vec![]))) .context(error::InvalidColumnSnafu { name: &column.name })?; - let lengths = column - .stream(Kind::Length) - .transpose()? + let lengths = stripe + .stream_map + .get(column, Kind::Length) .map(|reader| rle_version.get_unsigned_rle_reader(reader)) .context(error::InvalidColumnSnafu { name: &column.name })?; @@ -62,28 +64,29 @@ pub fn new_direct_string_iter( pub fn new_arrow_dict_string_decoder( column: &Column, rle_version: RleVersion, -) -> Result<(NullableIterator, ArrayRef)> { - let present = new_present_iter(column)?.collect::>>()?; + stripe: &Stripe, +) -> Result<(NullableIterator, Arc)> { + let present = new_present_iter(column, stripe)?.collect::>>()?; // DictionaryData - let values = column - .stream(Kind::DictionaryData) - .transpose()? + let values = stripe + .stream_map + .get(column, Kind::DictionaryData) .map(|reader| Box::new(Values::new(reader, vec![]))) .context(error::InvalidColumnSnafu { name: &column.name })?; - let lengths = column - .stream(Kind::Length) - .transpose()? + let lengths = stripe + .stream_map + .get(column, Kind::Length) .map(|reader| rle_version.get_unsigned_rle_reader(reader)) .context(error::InvalidColumnSnafu { name: &column.name })?; let iter = DirectStringIterator { values, lengths }; let values = iter.collect::>>()?; - let indexes = column - .stream(Kind::Data) - .transpose()? + let indexes = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| rle_version.get_unsigned_rle_reader(reader)) .context(error::InvalidColumnSnafu { name: &column.name })?; @@ -100,19 +103,24 @@ pub fn new_arrow_dict_string_decoder( pub enum StringDecoder { Direct(NullableIterator), - Dictionary((NullableIterator, ArrayRef)), + Dictionary((NullableIterator, Arc)), } impl StringDecoder { - pub fn new(column: &Column) -> Result { + pub fn new(column: &Column, stripe: &Stripe) -> Result { let kind = column.encoding().kind(); + match kind { ColumnEncodingKind::Direct | ColumnEncodingKind::DirectV2 => Ok(StringDecoder::Direct( - new_direct_string_iter(column, kind.into())?, + new_direct_string_iter(column, kind.into(), stripe)?, )), - ColumnEncodingKind::Dictionary | ColumnEncodingKind::DictionaryV2 => Ok( - StringDecoder::Dictionary(new_arrow_dict_string_decoder(column, kind.into())?), - ), + ColumnEncodingKind::Dictionary | ColumnEncodingKind::DictionaryV2 => { + Ok(StringDecoder::Dictionary(new_arrow_dict_string_decoder( + column, + kind.into(), + stripe, + )?)) + } } } } diff --git a/src/arrow_reader/column/struct_column.rs b/src/arrow_reader/column/struct_column.rs new file mode 100644 index 00000000..4d268695 --- /dev/null +++ b/src/arrow_reader/column/struct_column.rs @@ -0,0 +1,85 @@ +use crate::{ + arrow_reader::{reader_factory, Decoder, Stripe}, + error::Result, +}; +use std::sync::Arc; + +use arrow::{ + array::{ArrayBuilder, ArrayRef, StructBuilder}, + datatypes::{Field, Fields}, +}; + +use super::{present::new_present_iter, Column}; + +pub struct StructDecoder { + pub(crate) fields: Fields, + pub(crate) decoders: Vec, + present: Box + Send>, +} + +impl Iterator for StructDecoder { + type Item = Result>>; + + fn next(&mut self) -> Option { + match self.present.next() { + Some(present) => { + if present { + let mut builders = Vec::with_capacity(self.decoders.len()); + for decoder in &self.decoders { + let builder = decoder.new_array_builder(1); + builders.push(builder); + } + for (idx, decoder) in &mut self.decoders.iter_mut().enumerate() { + if let Err(err) = decoder.append_value(&mut builders[idx]) { + return Some(Err(err)); + } + } + + let output = builders + .into_iter() + .map(|mut b| b.finish()) + .collect::>(); + + Some(Ok(Some(output))) + } else { + Some(Ok(None)) + } + } + None => None, + } + } +} + +impl StructDecoder { + pub fn new_builder(&self, capacity: usize) -> Box { + let mut builders = Vec::with_capacity(self.decoders.len()); + for decoder in &self.decoders { + let builder = decoder.new_array_builder(capacity); + builders.push(builder); + } + + Box::new(StructBuilder::new(self.fields.clone(), builders)) + } +} + +pub fn new_struct_iter(column: &Column, stripe: &Stripe) -> Result { + let present = new_present_iter(column, stripe)?.collect::>>()?; + + let children = column.children(); + let mut decoders = Vec::with_capacity(children.len()); + for column in &children { + decoders.push(reader_factory(column, stripe)?); + } + + let fields = children + .into_iter() + .map(Field::from) + .map(Arc::new) + .collect::>(); + + Ok(StructDecoder { + fields: Fields::from_iter(fields), + decoders, + present: Box::new(present.into_iter()), + }) +} diff --git a/src/arrow_reader/column/timestamp.rs b/src/arrow_reader/column/timestamp.rs index add43ea3..8f6cd804 100644 --- a/src/arrow_reader/column/timestamp.rs +++ b/src/arrow_reader/column/timestamp.rs @@ -3,6 +3,7 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{self, Result}; use crate::proto::stream::Kind; use crate::reader::decode::{get_direct_signed_rle_reader, get_direct_unsigned_rle_reader}; @@ -44,18 +45,21 @@ impl Iterator for TimestampIterator { } } -pub fn new_timestamp_iter(column: &Column) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; +pub fn new_timestamp_iter( + column: &Column, + stripe: &Stripe, +) -> Result> { + let present = new_present_iter(column, stripe)?.collect::>>()?; - let data = column - .stream(Kind::Data) - .transpose()? + let data = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| get_direct_signed_rle_reader(column, reader)) .context(error::InvalidColumnSnafu { name: &column.name })??; - let secondary = column - .stream(Kind::Secondary) - .transpose()? + let secondary = stripe + .stream_map + .get(column, Kind::Secondary) .map(|reader| get_direct_unsigned_rle_reader(column, reader)) .context(error::InvalidColumnSnafu { name: &column.name })??; diff --git a/src/arrow_reader/column/tinyint.rs b/src/arrow_reader/column/tinyint.rs index 03915dcc..c948ca36 100644 --- a/src/arrow_reader/column/tinyint.rs +++ b/src/arrow_reader/column/tinyint.rs @@ -2,17 +2,18 @@ use snafu::OptionExt; use crate::arrow_reader::column::present::new_present_iter; use crate::arrow_reader::column::{Column, NullableIterator}; +use crate::arrow_reader::Stripe; use crate::error::{InvalidColumnSnafu, Result}; use crate::proto::stream::Kind; use crate::reader::decode::byte_rle::ByteRleIter; -pub fn new_i8_iter(column: &Column) -> Result> { - let present = new_present_iter(column)?.collect::>>()?; +pub fn new_i8_iter(column: &Column, stripe: &Stripe) -> Result> { + let present = new_present_iter(column, stripe)?.collect::>>()?; let rows: usize = present.iter().filter(|&p| *p).count(); - let iter = column - .stream(Kind::Data) - .transpose()? + let iter = stripe + .stream_map + .get(column, Kind::Data) .map(|reader| { Box::new(ByteRleIter::new(reader, rows).map(|value| value.map(|value| value as i8))) as _ diff --git a/src/async_arrow_reader.rs b/src/async_arrow_reader.rs index 6186f5d5..920dd9db 100644 --- a/src/async_arrow_reader.rs +++ b/src/async_arrow_reader.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::fmt::Formatter; use std::pin::Pin; use std::sync::Arc; @@ -13,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncSeek}; use crate::arrow_reader::column::Column; use crate::arrow_reader::{ - create_arrow_schema, Cursor, NaiveStripeDecoder, Stripe, DEFAULT_BATCH_SIZE, + create_arrow_schema, Cursor, NaiveStripeDecoder, StreamMap, Stripe, DEFAULT_BATCH_SIZE, }; use crate::error::Result; use crate::proto::StripeInformation; @@ -196,7 +197,21 @@ impl Stripe { //TODO(weny): add tz let mut columns = Vec::with_capacity(column_defs.len()); for (name, typ) in column_defs.iter() { - columns.push(Column::new_async(r, compression, name, typ, &footer, &info).await?); + columns.push(Column::new(name, typ, &footer, &info)); + } + + let mut stream_map = HashMap::new(); + let mut stream_offset = info.offset(); + for stream in &footer.streams { + let length = stream.length(); + let column_id = stream.column(); + let kind = stream.kind(); + let data = Column::read_stream_async(r, stream_offset, length as usize).await?; + + // TODO(weny): filter out unused streams. + stream_map.insert((column_id, kind), data); + + stream_offset += length; } Ok(Stripe { @@ -204,6 +219,10 @@ impl Stripe { columns, stripe_offset: stripe, info, + stream_map: Arc::new(StreamMap { + inner: stream_map, + compression, + }), }) } } diff --git a/src/error.rs b/src/error.rs index 8bb37f56..a5d90fd5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,6 +29,12 @@ pub enum Error { #[snafu(display("Out of sepc, message: {}", msg))] OutOfSpec { msg: String, location: Location }, + #[snafu(display("failed to new string builder: {}", source))] + StringBuilder { + source: arrow::error::ArrowError, + location: Location, + }, + #[snafu(display("Failed to decode float, source: {}", source))] DecodeFloat { location: Location, diff --git a/src/reader/schema.rs b/src/reader/schema.rs index 859d0d6e..988ed4f1 100644 --- a/src/reader/schema.rs +++ b/src/reader/schema.rs @@ -1,7 +1,8 @@ use std::sync::{Arc, Mutex, Weak}; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, Fields, UnionFields, UnionMode}; use lazy_static::lazy_static; +use snafu::ensure; use crate::error::{self, Result}; use crate::proto::r#type::Kind; @@ -33,7 +34,8 @@ impl Category { } pub fn create_field((name, typ): (&str, &Arc)) -> Field { - match typ.inner.lock().unwrap().category.kind { + let kind = typ.kind(); + match kind { Kind::Boolean => Field::new(name, DataType::Boolean, true), Kind::Byte => Field::new(name, DataType::Int8, true), Kind::Short => Field::new(name, DataType::Int16, true), @@ -49,10 +51,46 @@ pub fn create_field((name, typ): (&str, &Arc)) -> Field { DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None), true, ), - Kind::List => todo!(), - Kind::Map => todo!(), - Kind::Struct => todo!(), - Kind::Union => todo!(), + Kind::List => { + let children = typ.children(); + assert_eq!(children.len(), 1); + + let (name, typ) = &children[0]; + let value = create_field((name, typ)); + + Field::new(name, DataType::List(Arc::new(value)), true) + } + Kind::Map => { + let children = typ.children(); + assert_eq!(children.len(), 2); + + let (name, typ) = &children[1]; + let value = create_field((name, typ)); + + Field::new(name, DataType::Map(Arc::new(value), false), true) + } + Kind::Struct => { + let children = typ.children(); + let mut fields = Vec::with_capacity(children.len()); + for (name, child) in &children { + fields.push(create_field((name, child))); + } + + Field::new(name, DataType::Struct(Fields::from(fields)), true) + } + Kind::Union => { + let children = typ.children(); + let mut fields = Vec::with_capacity(children.len()); + for (idx, (name, child)) in children.iter().enumerate() { + fields.push((idx as i8, Arc::new(create_field((name, child))))); + } + + Field::new( + name, + DataType::Union(UnionFields::from_iter(fields), UnionMode::Sparse), + true, + ) + } Kind::Decimal => { let inner = typ.inner.lock().unwrap(); Field::new( @@ -123,6 +161,16 @@ impl TypeDescription { pub fn kind(&self) -> Kind { self.inner.lock().unwrap().category.kind } + + pub fn children(&self) -> Vec<(String, Arc)> { + let inner = self.inner.lock().unwrap(); + + let children = inner.children.clone().unwrap_or_default(); + + let names = inner.field_names.clone(); + + names.into_iter().zip(children).collect() + } } #[derive(Debug)] @@ -220,12 +268,54 @@ pub fn create_schema(types: &[Type], root_column: usize) -> Result Ok(Arc::new(TypeDescription::new(BINARY.clone(), root_column))), - Kind::List => Ok(Arc::new(TypeDescription::new(ARRAY.clone(), root_column))), - Kind::Map => Ok(Arc::new(TypeDescription::new(MAP.clone(), root_column))), - Kind::Union => Ok(Arc::new(TypeDescription::new( - UNIONTYPE.clone(), - root_column, - ))), + Kind::List => { + let sub_types = &root.subtypes; + ensure!( + sub_types.len() == 1, + error::UnexpectedSnafu { + msg: format!("unexpected number of subtypes for list: {:?}", sub_types) + } + ); + + let td = Arc::new(TypeDescription::new(ARRAY.clone(), root_column)); + let fields = &root.field_names; + for (idx, column) in sub_types.iter().enumerate() { + let child = create_schema(types, *column as usize)?; + td.add_field(fields[idx].to_string(), child); + } + + Ok(td) + } + Kind::Map => { + let sub_types = &root.subtypes; + ensure!( + sub_types.len() == 2, + error::UnexpectedSnafu { + msg: format!("unexpected number of subtypes for map: {:?}", sub_types) + } + ); + + let td = Arc::new(TypeDescription::new(MAP.clone(), root_column)); + let fields = &root.field_names; + for (idx, column) in sub_types.iter().enumerate() { + let child = create_schema(types, *column as usize)?; + td.add_field(fields[idx].to_string(), child); + } + + Ok(td) + } + Kind::Union => { + let td = Arc::new(TypeDescription::new(UNIONTYPE.clone(), root_column)); + + let sub_types = &root.subtypes; + let fields = &root.field_names; + for (idx, column) in sub_types.iter().enumerate() { + let child = create_schema(types, *column as usize)?; + td.add_field(fields[idx].to_string(), child); + } + + Ok(td) + } Kind::Decimal => Ok(Arc::new(TypeDescription::new(DECIMAL.clone(), root_column))), Kind::TimestampInstant => todo!(), } diff --git a/tests/basic/data/f32_long_long_gzip.orc b/tests/basic/data/f32_long_long_gzip.orc index daa1214b..99546ce5 100644 Binary files a/tests/basic/data/f32_long_long_gzip.orc and b/tests/basic/data/f32_long_long_gzip.orc differ diff --git a/tests/basic/data/nested_struct.orc b/tests/basic/data/nested_struct.orc new file mode 100644 index 00000000..d81db47d Binary files /dev/null and b/tests/basic/data/nested_struct.orc differ diff --git a/tests/basic/data/write.py b/tests/basic/data/write.py index 0ef46f7e..1defc48e 100644 --- a/tests/basic/data/write.py +++ b/tests/basic/data/write.py @@ -38,6 +38,8 @@ def infer_schema(data): dt = "boolean" elif dt == str: dt = "string" + elif dt == dict: + dt = infer_schema(value[0]) elif key.startswith("timestamp"): dt = "timestamp" elif key.startswith("date"): @@ -84,6 +86,17 @@ def _write( reader = pyorc.Reader(f) list(reader) +nested_struct = { + "nest": [ + (1.0,True), + (3.0,None), + (None,None), + None, + (-3.0,None) + ], +} + +_write("struct>", nested_struct, "nested_struct.orc") _write( infer_schema(data), diff --git a/tests/basic/main.rs b/tests/basic/main.rs index 1c178e3f..1361dcb2 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -223,6 +223,26 @@ pub fn basic_test_3() { ) } +#[test] +pub fn basic_test_nested_struct() { + let path = basic_path("nested_struct.orc"); + let reader = new_arrow_reader_root(&path); + let batch = reader.collect::, _>>().unwrap(); + let expected = r#"+-------------------+ +| nest | ++-------------------+ +| {a: 1.0, b: true} | +| {a: 3.0, b: } | +| {a: , b: } | +| | +| {a: -3.0, b: } | ++-------------------+"#; + assert_eq!( + expected, + pretty::pretty_format_batches(&batch).unwrap().to_string() + ) +} + #[test] pub fn basic_test_0() { let path = basic_path("test.orc");