Skip to content

Commit

Permalink
feat: support to map datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 11, 2023
1 parent 85baf80 commit 424b021
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 36 deletions.
60 changes: 29 additions & 31 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::array::{
Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder,
Date32Array, Date32Builder, Float32Array, Float32Builder, Float64Builder, Int16Array,
Int16Builder, Int32Array, Int32Builder, Int64Array, Int64Builder, Int8Array, Int8Builder,
ListBuilder, PrimitiveBuilder, StringArray, StringBuilder, StringDictionaryBuilder,
ListBuilder, MapBuilder, PrimitiveBuilder, StringArray, StringBuilder, StringDictionaryBuilder,
StructBuilder, TimestampNanosecondBuilder,
};
use arrow::array::{Float64Array, TimestampNanosecondArray};
Expand All @@ -24,6 +24,7 @@ use chrono::{Datelike, NaiveDate, NaiveDateTime};
use snafu::{OptionExt, ResultExt};

use self::column::list::{new_list_iter, ListDecoder};
use self::column::map::{new_map_iter, MapDecoder};
use self::column::struct_column::new_struct_iter;
use self::column::tinyint::new_i8_iter;
use self::column::Column;
Expand All @@ -36,6 +37,7 @@ use crate::arrow_reader::column::string::StringDecoder;
use crate::arrow_reader::column::struct_column::StructDecoder;
use crate::arrow_reader::column::timestamp::new_timestamp_iter;
use crate::arrow_reader::column::NullableIterator;
use crate::builder::BoxedArrayBuilder;
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::proto::{StripeFooter, StripeInformation};
Expand Down Expand Up @@ -156,6 +158,7 @@ pub enum Decoder {
Binary(NullableIterator<Vec<u8>>),
Struct(StructDecoder),
List(ListDecoder),
Map(MapDecoder),
}

macro_rules! impl_append_struct_value {
Expand Down Expand Up @@ -321,35 +324,6 @@ pub fn append_struct_null(
Ok(())
}

pub struct BoxedArrayBuilder {
pub(crate) builder: Box<dyn ArrayBuilder>,
}
impl ArrayBuilder for BoxedArrayBuilder {
fn len(&self) -> usize {
self.builder.len()
}

fn finish(&mut self) -> ArrayRef {
self.builder.finish()
}

fn finish_cloned(&self) -> ArrayRef {
self.builder.finish_cloned()
}

fn as_any(&self) -> &dyn std::any::Any {
self.builder.as_any()
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self.builder.as_any_mut()
}

fn into_box_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self.builder.into_box_any()
}
}

impl Decoder {
pub fn new_array_builder(&self, capacity: usize) -> Box<dyn ArrayBuilder> {
match self {
Expand Down Expand Up @@ -386,6 +360,15 @@ impl Decoder {
let builder = decoder.inner.new_array_builder(capacity);
Box::new(ListBuilder::new(BoxedArrayBuilder { builder }))
}
Decoder::Map(decoder) => {
let key = BoxedArrayBuilder {
builder: decoder.key.new_array_builder(capacity),
};
let value = BoxedArrayBuilder {
builder: decoder.value.new_array_builder(capacity),
};
Box::new(MapBuilder::new(None, key, value))
}
}
}

Expand Down Expand Up @@ -581,6 +564,14 @@ impl Decoder {

has_more = iter.collect_chunk(builder, chunk).transpose()?.is_some();
}
Decoder::Map(iter) => {
let builder = builder
.as_any_mut()
.downcast_mut::<MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>>()
.unwrap();

has_more = iter.collect_chunk(builder, chunk).transpose()?.is_some();
}
}

Ok(has_more)
Expand Down Expand Up @@ -684,6 +675,13 @@ impl Decoder {
.unwrap()
.append_null();
}
Decoder::Map(_) => {
let _ = builder
.as_any_mut()
.downcast_mut::<MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>>()
.unwrap()
.append(false);
}
}

Ok(())
Expand Down Expand Up @@ -750,7 +748,7 @@ pub fn reader_factory(col: &Column, stripe: &Stripe) -> Result<Decoder> {
Decoder::Timestamp(new_timestamp_iter(col, stripe)?)
}
crate::proto::r#type::Kind::List => Decoder::List(new_list_iter(col, stripe)?),
crate::proto::r#type::Kind::Map => todo!(),
crate::proto::r#type::Kind::Map => Decoder::Map(new_map_iter(col, stripe)?),
crate::proto::r#type::Kind::Struct => Decoder::Struct(new_struct_iter(col, stripe)?),
crate::proto::r#type::Kind::Union => todo!(),
crate::proto::r#type::Kind::Decimal => todo!(),
Expand Down
1 change: 1 addition & 0 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod date;
pub mod float;
pub mod int;
pub mod list;
pub mod map;
pub mod present;
pub mod string;
pub mod struct_column;
Expand Down
95 changes: 95 additions & 0 deletions src/arrow_reader/column/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use arrow::array::MapBuilder;
use snafu::{OptionExt, ResultExt};

use super::present::new_present_iter;
use super::Column;
use crate::arrow_reader::{reader_factory, BoxedArrayBuilder, Decoder, Stripe};
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::RleVersion;

pub struct MapDecoder {
pub(crate) key: Box<Decoder>,
pub(crate) value: Box<Decoder>,
present: Box<dyn Iterator<Item = bool> + Send>,
lengths: Box<dyn Iterator<Item = Result<u64>> + Send>,
}

impl MapDecoder {
fn append_value(
&mut self,
root_builder: &mut MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>,
) -> Result<()> {
let len = self.lengths.next().unwrap()?;
let _ = self
.key
.append_value(&mut root_builder.keys().builder, len as usize);
let _ = self
.value
.append_value(&mut root_builder.values().builder, len as usize);
Ok(())
}

fn next(
&mut self,
root_builder: &mut MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>,
) -> Option<Result<()>> {
match self.present.next() {
Some(present) => {
if present {
if let Err(err) = self.append_value(root_builder) {
return Some(Err(err));
}
}
if let Err(err) = root_builder.append(present).context(error::MapBuilderSnafu) {
return Some(Err(err));
};
}
None => return None,
}

Some(Ok(()))
}

pub fn collect_chunk(
&mut self,
root_builder: &mut MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>,
chunk: usize,
) -> Option<Result<()>> {
for _ in 0..chunk {
match self.next(root_builder) {
Some(Ok(_)) => {
// continue
}
Some(Err(err)) => return Some(Err(err)),
None => break,
}
}

Some(Ok(()))
}
}

pub fn new_map_iter(column: &Column, stripe: &Stripe) -> Result<MapDecoder> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let version: RleVersion = column.encoding().kind().into();
let lengths = stripe
.stream_map
.get(column, Kind::Length)
.map(|reader| version.get_unsigned_rle_reader(reader))
.context(error::InvalidColumnSnafu { name: &column.name })?;

let children = column.children();
let key = &children[0];
let value = &children[1];

let key = reader_factory(key, stripe)?;
let value = reader_factory(value, stripe)?;

Ok(MapDecoder {
key: Box::new(key),
value: Box::new(value),
present: Box::new(present.into_iter()),
lengths,
})
}
34 changes: 34 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use arrow::array::{ArrayBuilder, ArrayRef};

/// BoxedArrayBuilder
///
/// It implements the [ArrayBuilder] and [Sized] trait.
pub struct BoxedArrayBuilder {
pub(crate) builder: Box<dyn ArrayBuilder>,
}

impl ArrayBuilder for BoxedArrayBuilder {
fn len(&self) -> usize {
self.builder.len()
}

fn finish(&mut self) -> ArrayRef {
self.builder.finish()
}

fn finish_cloned(&self) -> ArrayRef {
self.builder.finish_cloned()
}

fn as_any(&self) -> &dyn std::any::Any {
self.builder.as_any()
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self.builder.as_any_mut()
}

fn into_box_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self.builder.into_box_any()
}
}
8 changes: 7 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ pub enum Error {
#[snafu(display("Out of sepc, message: {}", msg))]
OutOfSpec { msg: String, location: Location },

#[snafu(display("failed to new string builder: {}", source))]
#[snafu(display("Error from map builder: {}", source))]
MapBuilder {
source: arrow::error::ArrowError,
location: Location,
},

#[snafu(display("Failed to create new string builder: {}", source))]
StringBuilder {
source: arrow::error::ArrowError,
location: Location,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod arrow_reader;
pub mod async_arrow_reader;
pub(crate) mod builder;
pub mod error;
pub mod proto;
pub mod reader;
Expand Down
14 changes: 10 additions & 4 deletions src/reader/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ pub fn create_field((name, typ): (&str, &Arc<TypeDescription>)) -> Field {
let children = typ.children();
assert_eq!(children.len(), 2);

let (name, typ) = &children[1];
let value = create_field((name, typ));
let (_, typ) = &children[0];
let key = create_field(("key", typ));

Field::new(name, DataType::Map(Arc::new(value), false), true)
let (_, typ) = &children[1];
let value = create_field(("value", typ));
let fields = vec![key, value];

let data_type = DataType::Struct(Fields::from(fields));
let field = Field::new(name, data_type, true);
Field::new(name, DataType::Map(Arc::new(field), false), true)
}
Kind::Struct => {
let children = typ.children();
Expand Down Expand Up @@ -296,7 +302,7 @@ pub fn create_schema(types: &[Type], root_column: usize) -> Result<Arc<TypeDescr
);

let td = Arc::new(TypeDescription::new(MAP.clone(), root_column));
let fields = &root.field_names;
let fields = &["key", "value"];
for (idx, column) in sub_types.iter().enumerate() {
let child = create_schema(types, *column as usize)?;
td.add_field(fields[idx].to_string(), child);
Expand Down
Binary file modified tests/basic/data/f32_long_long_gzip.orc
Binary file not shown.
Binary file added tests/basic/data/nested_map.orc
Binary file not shown.
11 changes: 11 additions & 0 deletions tests/basic/data/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ def _write(

_write("struct<value:array<int>>", nested_array, "nested_array.orc")

nested_map = {
"map": [
{"zero": 0, "one": 1},
None,
{"two": 2, "tree": 3},
{"one": 1, "two": 2, "nill": None},
],
}

_write("struct<map:map<string,int>>", nested_map, "nested_map.orc")


_write(
infer_schema(data),
Expand Down
20 changes: 20 additions & 0 deletions tests/basic/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,26 @@ pub fn basic_test_nested_array() {
)
}

#[test]
pub fn basic_test_nested_map() {
let path = basic_path("nested_map.orc");
let reader = new_arrow_reader_root(&path);
let batch = reader.collect::<Result<Vec<_>, _>>().unwrap();

let expected = r#"+--------------------------+
| map |
+--------------------------+
| {zero: 0, one: 1} |
| |
| {two: 2, tree: 3} |
| {one: 1, two: 2, nill: } |
+--------------------------+"#;
assert_eq!(
expected,
pretty::pretty_format_batches(&batch).unwrap().to_string()
)
}

#[test]
pub fn basic_test_0() {
let path = basic_path("test.orc");
Expand Down

0 comments on commit 424b021

Please sign in to comment.