Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to struct datatype #26

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
772 changes: 598 additions & 174 deletions src/arrow_reader.rs

Large diffs are not rendered by default.

113 changes: 56 additions & 57 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::Field;
use bytes::Bytes;
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::proto::{ColumnEncoding, StripeFooter, StripeInformation};
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::schema::TypeDescription;

use crate::reader::schema::{create_field, TypeDescription};
use crate::reader::Reader;

pub mod binary;
Expand All @@ -19,19 +20,24 @@ pub mod float;
pub mod int;
pub mod present;
pub mod string;
pub mod struct_column;
pub mod timestamp;
pub mod tinyint;

#[derive(Debug)]
pub struct Column {
data: Bytes,
number_of_rows: u64,
compression: Option<Compression>,
footer: Arc<StripeFooter>,
name: String,
column: Arc<TypeDescription>,
}

impl From<Column> for Field {
fn from(value: Column) -> Self {
create_field((&value.name, &value.column))
}
}

macro_rules! impl_read_stream {
($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{
$reader
Expand Down Expand Up @@ -98,67 +104,18 @@ impl Column {
Ok((start, length))
}

pub fn new<R: Read + Seek>(
reader: &mut Reader<R>,
compression: Option<Compression>,
pub fn new(
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
) -> Result<Self> {
let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
let data = Column::read_stream(reader, start, length)?;

Ok(Self {
data,
) -> Self {
Self {
number_of_rows: stripe.number_of_rows(),
compression,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
})
}

pub async fn new_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
reader: &mut Reader<R>,
compression: Option<Compression>,
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
) -> Result<Self> {
let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
let data = Column::read_stream_async(reader, start, length).await?;

Ok(Self {
data,
number_of_rows: stripe.number_of_rows(),
compression,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
})
}

pub fn stream(&self, kind: Kind) -> Option<Result<Decompressor>> {
let mut start = 0; // the start of the stream

let column_id = self.column.column_id() as u32;
self.footer
.streams
.iter()
.filter(|stream| stream.column() == column_id && stream.kind() != Kind::RowIndex)
.map(|stream| {
start += stream.length() as usize;
stream
})
.find(|stream| stream.kind() == kind)
.map(|stream| {
let length = stream.length() as usize;
let data = self.data.slice((start - length)..start);
Decompressor::new(data, self.compression, vec![])
})
.map(Ok)
}
}

pub fn dictionary_size(&self) -> usize {
Expand All @@ -184,6 +141,27 @@ impl Column {
pub fn name(&self) -> &str {
&self.name
}

pub fn column_id(&self) -> u32 {
self.column.column_id() as u32
}

pub fn children(&self) -> Vec<Column> {
let children = self.column.children();

let mut columns = Vec::with_capacity(children.len());

for (name, column) in children {
columns.push(Column {
number_of_rows: self.number_of_rows,
footer: self.footer.clone(),
name,
column,
});
}

columns
}
}

pub struct NullableIterator<T> {
Expand Down Expand Up @@ -211,3 +189,24 @@ impl<T> Iterator for NullableIterator<T> {
}
}
}

impl<T> NullableIterator<T> {
pub fn collect_chunk(&mut self, chunk: usize) -> Option<Result<Vec<Option<T>>>> {
let mut buf = Vec::with_capacity(chunk);
for _ in 0..chunk {
match self.next() {
Some(Ok(value)) => {
buf.push(value);
}
Some(Err(err)) => return Some(Err(err)),
None => break,
}
}

if buf.is_empty() {
return None;
}

Some(Ok(buf))
}
}
21 changes: 13 additions & 8 deletions src/arrow_reader/column/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error;
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_unsigned_rle_reader;
use crate::reader::decode::variable_length::Values;
use crate::reader::decompress::Decompressor;

pub fn new_binary_iterator(column: &Column) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column)?.collect::<error::Result<Vec<_>>>()?;
pub fn new_binary_iterator(
column: &Column,
stripe: &Stripe,
) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column, stripe)?.collect::<error::Result<Vec<_>>>()?;

let values = column
.stream(Kind::Data)
.transpose()?
let values = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| Box::new(Values::new(reader, vec![])))
.context(error::InvalidColumnSnafu { name: &column.name })?;
let lengths = column
.stream(Kind::Length)
.transpose()?

let lengths = stripe
.stream_map
.get(column, Kind::Length)
.map(|reader| get_direct_unsigned_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_boolean_iter(column: &Column) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_boolean_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows))
as Box<dyn Iterator<Item = Result<bool>> + Send>
Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_signed_rle_reader;
Expand Down Expand Up @@ -37,12 +38,12 @@ impl Iterator for DateIterator {
}
}

pub fn new_date_iter(column: &Column) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_date_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;

let data = column
.stream(Kind::Data)
.transpose()?
let data = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::float::FloatIter;

macro_rules! impl_float_iter {
($tp:ident) => {
paste::item! {
pub fn [<new_ $tp _iter>] (column: &Column) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn [<new_ $tp _iter>] (column: &Column, stripe: &Stripe) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();
let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| Box::new(FloatIter::<$tp, _>::new(reader, rows)))
.context(InvalidColumnSnafu { name: &column.name })?;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_signed_rle_reader;

pub fn new_i64_iter(column: &Column) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_i64_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;

let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(InvalidColumnSnafu { name: &column.name })??;

Expand Down
12 changes: 8 additions & 4 deletions src/arrow_reader/column/present.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::arrow_reader::column::Column;
use crate::arrow_reader::Stripe;
use crate::error::Result;
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_present_iter(column: &Column) -> Result<Box<dyn Iterator<Item = Result<bool>>>> {
pub fn new_present_iter(
column: &Column,
stripe: &Stripe,
) -> Result<Box<dyn Iterator<Item = Result<bool>>>> {
let rows = column.number_of_rows as usize;
let iter = column
.stream(Kind::Present)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Present)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows)) as Box<dyn Iterator<Item = Result<bool>>>
})
Expand Down
Loading
Loading