Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to struct datatype #26

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
763 changes: 589 additions & 174 deletions src/arrow_reader.rs

Large diffs are not rendered by default.

113 changes: 56 additions & 57 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::Field;
use bytes::Bytes;
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::proto::{ColumnEncoding, StripeFooter, StripeInformation};
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::schema::TypeDescription;

use crate::reader::schema::{create_field, TypeDescription};
use crate::reader::Reader;

pub mod binary;
Expand All @@ -19,19 +20,24 @@
pub mod int;
pub mod present;
pub mod string;
pub mod struct_column;
pub mod timestamp;
pub mod tinyint;

#[derive(Debug)]
pub struct Column {
data: Bytes,
number_of_rows: u64,
compression: Option<Compression>,
footer: Arc<StripeFooter>,
name: String,
column: Arc<TypeDescription>,
}

impl From<Column> for Field {
fn from(value: Column) -> Self {
create_field((&value.name, &value.column))
}
}

macro_rules! impl_read_stream {
($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{
$reader
Expand Down Expand Up @@ -98,67 +104,18 @@
Ok((start, length))
}

pub fn new<R: Read + Seek>(
reader: &mut Reader<R>,
compression: Option<Compression>,
pub fn new(
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
) -> Result<Self> {
let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
let data = Column::read_stream(reader, start, length)?;

Ok(Self {
data,
) -> Self {
Self {
number_of_rows: stripe.number_of_rows(),
compression,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
})
}

pub async fn new_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
reader: &mut Reader<R>,
compression: Option<Compression>,
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
) -> Result<Self> {
let (start, length) = Column::get_stream_info(name, column, footer, stripe)?;
let data = Column::read_stream_async(reader, start, length).await?;

Ok(Self {
data,
number_of_rows: stripe.number_of_rows(),
compression,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
})
}

pub fn stream(&self, kind: Kind) -> Option<Result<Decompressor>> {
let mut start = 0; // the start of the stream

let column_id = self.column.column_id() as u32;
self.footer
.streams
.iter()
.filter(|stream| stream.column() == column_id && stream.kind() != Kind::RowIndex)
.map(|stream| {
start += stream.length() as usize;
stream
})
.find(|stream| stream.kind() == kind)
.map(|stream| {
let length = stream.length() as usize;
let data = self.data.slice((start - length)..start);
Decompressor::new(data, self.compression, vec![])
})
.map(Ok)
}
}

pub fn dictionary_size(&self) -> usize {
Expand All @@ -181,9 +138,30 @@
self.column.kind()
}

pub fn name(&self) -> &str {
&self.name
}

Check warning on line 143 in src/arrow_reader/column.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_reader/column.rs#L141-L143

Added lines #L141 - L143 were not covered by tests

pub fn column_id(&self) -> u32 {
self.column.column_id() as u32
}

pub fn children(&self) -> Vec<Column> {
let children = self.column.children();

let mut columns = Vec::with_capacity(children.len());

for (name, column) in children {
columns.push(Column {
number_of_rows: self.number_of_rows,
footer: self.footer.clone(),
name,
column,
});
}

columns
}
}

pub struct NullableIterator<T> {
Expand Down Expand Up @@ -211,3 +189,24 @@
}
}
}

impl<T> NullableIterator<T> {
pub fn collect_chunk(&mut self, chunk: usize) -> Option<Result<Vec<Option<T>>>> {
let mut buf = Vec::with_capacity(chunk);
for _ in 0..chunk {
match self.next() {
Some(Ok(value)) => {
buf.push(value);
}
Some(Err(err)) => return Some(Err(err)),
None => break,

Check warning on line 202 in src/arrow_reader/column.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_reader/column.rs#L201-L202

Added lines #L201 - L202 were not covered by tests
}
}

if buf.is_empty() {
return None;

Check warning on line 207 in src/arrow_reader/column.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_reader/column.rs#L207

Added line #L207 was not covered by tests
}

Some(Ok(buf))
}
}
21 changes: 13 additions & 8 deletions src/arrow_reader/column/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error;
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_unsigned_rle_reader;
use crate::reader::decode::variable_length::Values;
use crate::reader::decompress::Decompressor;

pub fn new_binary_iterator(column: &Column) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column)?.collect::<error::Result<Vec<_>>>()?;
pub fn new_binary_iterator(
column: &Column,
stripe: &Stripe,
) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column, stripe)?.collect::<error::Result<Vec<_>>>()?;

let values = column
.stream(Kind::Data)
.transpose()?
let values = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| Box::new(Values::new(reader, vec![])))
.context(error::InvalidColumnSnafu { name: &column.name })?;
let lengths = column
.stream(Kind::Length)
.transpose()?

let lengths = stripe
.stream_map
.get(column, Kind::Length)
.map(|reader| get_direct_unsigned_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_boolean_iter(column: &Column) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_boolean_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows))
as Box<dyn Iterator<Item = Result<bool>> + Send>
Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{self, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_signed_rle_reader;
Expand Down Expand Up @@ -37,12 +38,12 @@ impl Iterator for DateIterator {
}
}

pub fn new_date_iter(column: &Column) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_date_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;

let data = column
.stream(Kind::Data)
.transpose()?
let data = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(error::InvalidColumnSnafu { name: &column.name })??;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::float::FloatIter;

macro_rules! impl_float_iter {
($tp:ident) => {
paste::item! {
pub fn [<new_ $tp _iter>] (column: &Column) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn [<new_ $tp _iter>] (column: &Column, stripe: &Stripe) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();
let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| Box::new(FloatIter::<$tp, _>::new(reader, rows)))
.context(InvalidColumnSnafu { name: &column.name })?;

Expand Down
11 changes: 6 additions & 5 deletions src/arrow_reader/column/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::arrow_reader::Stripe;
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::get_direct_signed_rle_reader;

pub fn new_i64_iter(column: &Column) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
pub fn new_i64_iter(column: &Column, stripe: &Stripe) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column, stripe)?.collect::<Result<Vec<_>>>()?;

let iter = column
.stream(Kind::Data)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Data)
.map(|reader| get_direct_signed_rle_reader(column, reader))
.context(InvalidColumnSnafu { name: &column.name })??;

Expand Down
12 changes: 8 additions & 4 deletions src/arrow_reader/column/present.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::arrow_reader::column::Column;
use crate::arrow_reader::Stripe;
use crate::error::Result;
use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_present_iter(column: &Column) -> Result<Box<dyn Iterator<Item = Result<bool>>>> {
pub fn new_present_iter(
column: &Column,
stripe: &Stripe,
) -> Result<Box<dyn Iterator<Item = Result<bool>>>> {
let rows = column.number_of_rows as usize;
let iter = column
.stream(Kind::Present)
.transpose()?
let iter = stripe
.stream_map
.get(column, Kind::Present)
.map(|reader| {
Box::new(BooleanIter::new(reader, rows)) as Box<dyn Iterator<Item = Result<bool>>>
})
Expand Down
Loading
Loading