Skip to content

Commit

Permalink
Refactor to decouple from relying directly on proto (#44)
Browse files Browse the repository at this point in the history
* Refactor to decouple from relying directly on proto

* Fix
  • Loading branch information
Jefffrey committed Nov 18, 2023
1 parent eed416c commit cae1014
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 98 deletions.
46 changes: 18 additions & 28 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ use crate::arrow_reader::column::NullableIterator;
use crate::builder::BoxedArrayBuilder;
use crate::error::{self, InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::proto::{StripeFooter, StripeInformation};
use crate::proto::StripeFooter;
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::schema::{create_field, TypeDescription};
use crate::reader::Reader;
use crate::stripe::StripeMetadata;

pub struct ArrowReader<R: Read> {
cursor: Cursor<R>,
Expand All @@ -65,7 +66,7 @@ impl<R: Read> ArrowReader<R> {
}

pub fn total_row_count(&self) -> u64 {
self.cursor.reader.metadata().footer.number_of_rows()
self.cursor.reader.metadata().number_of_rows()
}
}

Expand Down Expand Up @@ -97,15 +98,9 @@ pub fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
let metadata = cursor
.reader
.metadata()
.footer
.metadata
.user_custom_metadata()
.iter()
.map(|kv| {
(
kv.name().to_string(),
String::from_utf8_lossy(kv.value()).to_string(),
)
})
.map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
.collect::<HashMap<_, _>>();

let fields = cursor
Expand Down Expand Up @@ -830,7 +825,8 @@ impl<R> Cursor<R> {
let mut columns = Vec::with_capacity(fields.len());
for name in fields {
let field = r
.schema
.metadata()
.type_description()
.field(name.as_ref())
.context(error::FieldNotFoundSnafu {
name: name.as_ref(),
Expand All @@ -845,18 +841,21 @@ impl<R> Cursor<R> {
}

pub fn root(r: Reader<R>) -> Result<Self> {
let root = &r.metadata().footer.types[0];
let fields = &root.field_names.clone();
Self::new(r, fields)
let columns = r.metadata().type_description().children();
Ok(Self {
reader: r,
columns: Arc::new(columns),
stripe_offset: 0,
})
}
}

impl<R: Read + Seek> Iterator for Cursor<R> {
type Item = Result<Stripe>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(info) = self.reader.stripe(self.stripe_offset) {
let stripe = Stripe::new(&mut self.reader, &self.columns, self.stripe_offset, info);
if let Some(info) = self.reader.stripe(self.stripe_offset).cloned() {
let stripe = Stripe::new(&mut self.reader, &self.columns, self.stripe_offset, &info);

self.stripe_offset += 1;

Expand All @@ -872,7 +871,6 @@ pub struct Stripe {
pub(crate) footer: Arc<StripeFooter>,
pub(crate) columns: Vec<Column>,
pub(crate) stripe_offset: usize,
pub(crate) info: StripeInformation,
/// <(ColumnId, Kind), Bytes>
pub(crate) stream_map: Arc<StreamMap>,
}
Expand Down Expand Up @@ -905,18 +903,15 @@ impl Stripe {
r: &mut Reader<R>,
column_defs: &[(String, Arc<TypeDescription>)],
stripe: usize,
info: StripeInformation,
info: &StripeMetadata,
) -> Result<Self> {
let footer = Arc::new(r.stripe_footer(stripe).clone());

let compression = Compression::from_proto(
r.metadata().postscript.compression(),
r.metadata().postscript.compression_block_size,
);
let compression = r.metadata().compression();
//TODO(weny): add tz
let mut columns = Vec::with_capacity(column_defs.len());
for (name, typ) in column_defs.iter() {
columns.push(Column::new(name, typ, &footer, &info));
columns.push(Column::new(name, typ, &footer, info.number_of_rows()));
}

let mut stream_map = HashMap::new();
Expand All @@ -937,7 +932,6 @@ impl Stripe {
footer,
columns,
stripe_offset: stripe,
info,
stream_map: Arc::new(StreamMap {
inner: stream_map,
compression,
Expand All @@ -952,8 +946,4 @@ impl Stripe {
pub fn stripe_offset(&self) -> usize {
self.stripe_offset
}

pub fn stripe_info(&self) -> &StripeInformation {
&self.info
}
}
4 changes: 2 additions & 2 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ impl Column {
name: &str,
column: &Arc<TypeDescription>,
footer: &Arc<StripeFooter>,
stripe: &StripeInformation,
number_of_rows: u64,
) -> Self {
Self {
number_of_rows: stripe.number_of_rows(),
number_of_rows,
footer: footer.clone(),
column: column.clone(),
name: name.to_string(),
Expand Down
19 changes: 7 additions & 12 deletions src/async_arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use crate::arrow_reader::{
create_arrow_schema, Cursor, NaiveStripeDecoder, StreamMap, Stripe, DEFAULT_BATCH_SIZE,
};
use crate::error::Result;
use crate::proto::StripeInformation;
use crate::reader::decompress::Compression;
use crate::reader::schema::TypeDescription;
use crate::reader::Reader;
use crate::stripe::StripeMetadata;

pub type BoxedDecoder = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;

Expand Down Expand Up @@ -68,7 +67,7 @@ pub struct ArrowStreamReader<R: AsyncRead + AsyncSeek + Unpin + Send> {
}

impl<R: AsyncRead + AsyncSeek + Unpin + Send + 'static> StripeFactory<R> {
pub async fn read_next_stripe_inner(&mut self, info: StripeInformation) -> Result<Stripe> {
pub async fn read_next_stripe_inner(&mut self, info: &StripeMetadata) -> Result<Stripe> {
let inner = &mut self.inner;

let column_defs = inner.columns.clone();
Expand All @@ -79,10 +78,10 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + 'static> StripeFactory<R> {
}

pub async fn read_next_stripe(mut self) -> Result<(Self, Option<Stripe>)> {
let info = self.inner.reader.stripe(self.inner.stripe_offset);
let info = self.inner.reader.stripe(self.inner.stripe_offset).cloned();

if let Some(info) = info {
match self.read_next_stripe_inner(info).await {
match self.read_next_stripe_inner(&info).await {
Ok(stripe) => Ok((self, Some(stripe))),
Err(err) => Err(err),
}
Expand Down Expand Up @@ -186,18 +185,15 @@ impl Stripe {
r: &mut Reader<R>,
column_defs: Arc<Vec<(String, Arc<TypeDescription>)>>,
stripe: usize,
info: StripeInformation,
info: &StripeMetadata,
) -> Result<Self> {
let footer = Arc::new(r.stripe_footer(stripe).clone());

let compression = Compression::from_proto(
r.metadata().postscript.compression(),
r.metadata().postscript.compression_block_size,
);
let compression = r.metadata().compression();
//TODO(weny): add tz
let mut columns = Vec::with_capacity(column_defs.len());
for (name, typ) in column_defs.iter() {
columns.push(Column::new(name, typ, &footer, &info));
columns.push(Column::new(name, typ, &footer, info.number_of_rows()));
}

let mut stream_map = HashMap::new();
Expand All @@ -218,7 +214,6 @@ impl Stripe {
footer,
columns,
stripe_offset: stripe,
info,
stream_map: Arc::new(StreamMap {
inner: stream_map,
compression,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub(crate) mod builder;
pub mod error;
pub mod proto;
pub mod reader;
pub mod statistics;
pub mod stripe;

pub use arrow_reader::{ArrowReader, Cursor};
pub use async_arrow_reader::ArrowStreamReader;
Expand Down
48 changes: 9 additions & 39 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,25 @@ pub mod schema;

use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use tokio::io::{AsyncRead, AsyncSeek};

use self::metadata::{read_metadata, FileMetadata};
use self::schema::{create_schema, TypeDescription};
use crate::arrow_reader::Cursor;
use self::schema::TypeDescription;
use crate::error::Result;
use crate::proto::{StripeFooter, StripeInformation};
use crate::proto::StripeFooter;
use crate::reader::metadata::read_metadata_async;
use crate::stripe::StripeMetadata;

pub struct Reader<R> {
pub(crate) inner: R,
metadata: Box<FileMetadata>,
pub(crate) schema: Arc<TypeDescription>,
}

impl<R: ChunkReader> Reader<R> {
pub fn new(mut r: R) -> Result<Self> {
let metadata = Box::new(read_metadata(&mut r)?);
let schema = create_schema(&metadata.footer.types, 0)?;

Ok(Self {
inner: r,
metadata,
schema,
})
}
}

impl<R: Read> Reader<R> {
pub fn new_with_metadata(r: R, metadata: FileMetadata) -> Result<Self> {
let schema = create_schema(&metadata.footer.types, 0)?;

Ok(Self {
inner: r,
metadata: Box::new(metadata),
schema,
})
}

pub fn select(self, fields: &[&str]) -> Result<Cursor<R>> {
Cursor::new(self, fields)
Ok(Self { inner: r, metadata })
}
}

Expand All @@ -57,28 +33,22 @@ impl<R> Reader<R> {
}

pub fn schema(&self) -> &TypeDescription {
&self.schema
self.metadata.type_description()
}

pub fn stripe(&self, index: usize) -> Option<StripeInformation> {
self.metadata.footer.stripes.get(index).cloned()
pub fn stripe(&self, index: usize) -> Option<&StripeMetadata> {
self.metadata.stripe_metadatas().get(index)
}

pub fn stripe_footer(&mut self, stripe: usize) -> &StripeFooter {
&self.metadata.stripe_footers[stripe]
&self.metadata.stripe_footers()[stripe]
}
}

impl<R: AsyncRead + AsyncSeek + Unpin + Send> Reader<R> {
pub async fn new_async(mut r: R) -> Result<Self> {
let metadata = Box::new(read_metadata_async(&mut r).await?);
let schema = create_schema(&metadata.footer.types, 0)?;

Ok(Self {
inner: r,
metadata,
schema,
})
Ok(Self { inner: r, metadata })
}
}

Expand Down
Loading

0 comments on commit cae1014

Please sign in to comment.