From cae10149f4b2234c632e1ba67776c6ff6b6289b6 Mon Sep 17 00:00:00 2001 From: Jeffrey <22608443+Jefffrey@users.noreply.github.com> Date: Sat, 18 Nov 2023 16:35:19 +1100 Subject: [PATCH] Refactor to decouple from relying directly on proto (#44) * Refactor to decouple from relying directly on proto * Fix --- src/arrow_reader.rs | 46 +++++------- src/arrow_reader/column.rs | 4 +- src/async_arrow_reader.rs | 19 ++--- src/lib.rs | 2 + src/reader.rs | 48 +++--------- src/reader/metadata.rs | 103 +++++++++++++++++++++----- src/statistics.rs | 146 +++++++++++++++++++++++++++++++++++++ src/stripe.rs | 69 ++++++++++++++++++ 8 files changed, 339 insertions(+), 98 deletions(-) create mode 100644 src/statistics.rs create mode 100644 src/stripe.rs diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index d52eda4c..801b7098 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -38,10 +38,11 @@ use crate::arrow_reader::column::NullableIterator; use crate::builder::BoxedArrayBuilder; use crate::error::{self, InvalidColumnSnafu, Result}; use crate::proto::stream::Kind; -use crate::proto::{StripeFooter, StripeInformation}; +use crate::proto::StripeFooter; use crate::reader::decompress::{Compression, Decompressor}; use crate::reader::schema::{create_field, TypeDescription}; use crate::reader::Reader; +use crate::stripe::StripeMetadata; pub struct ArrowReader { cursor: Cursor, @@ -65,7 +66,7 @@ impl ArrowReader { } pub fn total_row_count(&self) -> u64 { - self.cursor.reader.metadata().footer.number_of_rows() + self.cursor.reader.metadata().number_of_rows() } } @@ -97,15 +98,9 @@ pub fn create_arrow_schema(cursor: &Cursor) -> Schema { let metadata = cursor .reader .metadata() - .footer - .metadata + .user_custom_metadata() .iter() - .map(|kv| { - ( - kv.name().to_string(), - String::from_utf8_lossy(kv.value()).to_string(), - ) - }) + .map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string())) .collect::>(); let fields = cursor @@ -830,7 +825,8 @@ impl Cursor { let mut columns = Vec::with_capacity(fields.len()); for name in fields { let field = r - .schema + .metadata() + .type_description() .field(name.as_ref()) .context(error::FieldNotFoundSnafu { name: name.as_ref(), @@ -845,9 +841,12 @@ impl Cursor { } pub fn root(r: Reader) -> Result { - let root = &r.metadata().footer.types[0]; - let fields = &root.field_names.clone(); - Self::new(r, fields) + let columns = r.metadata().type_description().children(); + Ok(Self { + reader: r, + columns: Arc::new(columns), + stripe_offset: 0, + }) } } @@ -855,8 +854,8 @@ impl Iterator for Cursor { type Item = Result; fn next(&mut self) -> Option { - if let Some(info) = self.reader.stripe(self.stripe_offset) { - let stripe = Stripe::new(&mut self.reader, &self.columns, self.stripe_offset, info); + if let Some(info) = self.reader.stripe(self.stripe_offset).cloned() { + let stripe = Stripe::new(&mut self.reader, &self.columns, self.stripe_offset, &info); self.stripe_offset += 1; @@ -872,7 +871,6 @@ pub struct Stripe { pub(crate) footer: Arc, pub(crate) columns: Vec, pub(crate) stripe_offset: usize, - pub(crate) info: StripeInformation, /// <(ColumnId, Kind), Bytes> pub(crate) stream_map: Arc, } @@ -905,18 +903,15 @@ impl Stripe { r: &mut Reader, column_defs: &[(String, Arc)], stripe: usize, - info: StripeInformation, + info: &StripeMetadata, ) -> Result { let footer = Arc::new(r.stripe_footer(stripe).clone()); - let compression = Compression::from_proto( - r.metadata().postscript.compression(), - r.metadata().postscript.compression_block_size, - ); + let compression = r.metadata().compression(); //TODO(weny): add tz let mut columns = Vec::with_capacity(column_defs.len()); for (name, typ) in column_defs.iter() { - columns.push(Column::new(name, typ, &footer, &info)); + columns.push(Column::new(name, typ, &footer, info.number_of_rows())); } let mut stream_map = HashMap::new(); @@ -937,7 +932,6 @@ impl Stripe { footer, columns, stripe_offset: stripe, - info, stream_map: Arc::new(StreamMap { inner: stream_map, compression, @@ -952,8 +946,4 @@ impl Stripe { pub fn stripe_offset(&self) -> usize { self.stripe_offset } - - pub fn stripe_info(&self) -> &StripeInformation { - &self.info - } } diff --git a/src/arrow_reader/column.rs b/src/arrow_reader/column.rs index 761293df..53a242c1 100644 --- a/src/arrow_reader/column.rs +++ b/src/arrow_reader/column.rs @@ -115,10 +115,10 @@ impl Column { name: &str, column: &Arc, footer: &Arc, - stripe: &StripeInformation, + number_of_rows: u64, ) -> Self { Self { - number_of_rows: stripe.number_of_rows(), + number_of_rows, footer: footer.clone(), column: column.clone(), name: name.to_string(), diff --git a/src/async_arrow_reader.rs b/src/async_arrow_reader.rs index 920dd9db..6dd7082e 100644 --- a/src/async_arrow_reader.rs +++ b/src/async_arrow_reader.rs @@ -17,10 +17,9 @@ use crate::arrow_reader::{ create_arrow_schema, Cursor, NaiveStripeDecoder, StreamMap, Stripe, DEFAULT_BATCH_SIZE, }; use crate::error::Result; -use crate::proto::StripeInformation; -use crate::reader::decompress::Compression; use crate::reader::schema::TypeDescription; use crate::reader::Reader; +use crate::stripe::StripeMetadata; pub type BoxedDecoder = Box> + Send>; @@ -68,7 +67,7 @@ pub struct ArrowStreamReader { } impl StripeFactory { - pub async fn read_next_stripe_inner(&mut self, info: StripeInformation) -> Result { + pub async fn read_next_stripe_inner(&mut self, info: &StripeMetadata) -> Result { let inner = &mut self.inner; let column_defs = inner.columns.clone(); @@ -79,10 +78,10 @@ impl StripeFactory { } pub async fn read_next_stripe(mut self) -> Result<(Self, Option)> { - let info = self.inner.reader.stripe(self.inner.stripe_offset); + let info = self.inner.reader.stripe(self.inner.stripe_offset).cloned(); if let Some(info) = info { - match self.read_next_stripe_inner(info).await { + match self.read_next_stripe_inner(&info).await { Ok(stripe) => Ok((self, Some(stripe))), Err(err) => Err(err), } @@ -186,18 +185,15 @@ impl Stripe { r: &mut Reader, column_defs: Arc)>>, stripe: usize, - info: StripeInformation, + info: &StripeMetadata, ) -> Result { let footer = Arc::new(r.stripe_footer(stripe).clone()); - let compression = Compression::from_proto( - r.metadata().postscript.compression(), - r.metadata().postscript.compression_block_size, - ); + let compression = r.metadata().compression(); //TODO(weny): add tz let mut columns = Vec::with_capacity(column_defs.len()); for (name, typ) in column_defs.iter() { - columns.push(Column::new(name, typ, &footer, &info)); + columns.push(Column::new(name, typ, &footer, info.number_of_rows())); } let mut stream_map = HashMap::new(); @@ -218,7 +214,6 @@ impl Stripe { footer, columns, stripe_offset: stripe, - info, stream_map: Arc::new(StreamMap { inner: stream_map, compression, diff --git a/src/lib.rs b/src/lib.rs index cc711f19..1ab9f46b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ pub(crate) mod builder; pub mod error; pub mod proto; pub mod reader; +pub mod statistics; +pub mod stripe; pub use arrow_reader::{ArrowReader, Cursor}; pub use async_arrow_reader::ArrowStreamReader; diff --git a/src/reader.rs b/src/reader.rs index df739e96..e7f6ea2e 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -5,49 +5,25 @@ pub mod schema; use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom}; -use std::sync::Arc; use tokio::io::{AsyncRead, AsyncSeek}; use self::metadata::{read_metadata, FileMetadata}; -use self::schema::{create_schema, TypeDescription}; -use crate::arrow_reader::Cursor; +use self::schema::TypeDescription; use crate::error::Result; -use crate::proto::{StripeFooter, StripeInformation}; +use crate::proto::StripeFooter; use crate::reader::metadata::read_metadata_async; +use crate::stripe::StripeMetadata; pub struct Reader { pub(crate) inner: R, metadata: Box, - pub(crate) schema: Arc, } impl Reader { pub fn new(mut r: R) -> Result { let metadata = Box::new(read_metadata(&mut r)?); - let schema = create_schema(&metadata.footer.types, 0)?; - - Ok(Self { - inner: r, - metadata, - schema, - }) - } -} - -impl Reader { - pub fn new_with_metadata(r: R, metadata: FileMetadata) -> Result { - let schema = create_schema(&metadata.footer.types, 0)?; - - Ok(Self { - inner: r, - metadata: Box::new(metadata), - schema, - }) - } - - pub fn select(self, fields: &[&str]) -> Result> { - Cursor::new(self, fields) + Ok(Self { inner: r, metadata }) } } @@ -57,28 +33,22 @@ impl Reader { } pub fn schema(&self) -> &TypeDescription { - &self.schema + self.metadata.type_description() } - pub fn stripe(&self, index: usize) -> Option { - self.metadata.footer.stripes.get(index).cloned() + pub fn stripe(&self, index: usize) -> Option<&StripeMetadata> { + self.metadata.stripe_metadatas().get(index) } pub fn stripe_footer(&mut self, stripe: usize) -> &StripeFooter { - &self.metadata.stripe_footers[stripe] + &self.metadata.stripe_footers()[stripe] } } impl Reader { pub async fn new_async(mut r: R) -> Result { let metadata = Box::new(read_metadata_async(&mut r).await?); - let schema = create_schema(&metadata.footer.types, 0)?; - - Ok(Self { - inner: r, - metadata, - schema, - }) + Ok(Self { inner: r, metadata }) } } diff --git a/src/reader/metadata.rs b/src/reader/metadata.rs index fa33354f..844f938e 100644 --- a/src/reader/metadata.rs +++ b/src/reader/metadata.rs @@ -22,7 +22,9 @@ //! If they are compressed then their lengths indicate their //! compressed lengths. +use std::collections::HashMap; use std::io::{Read, SeekFrom}; +use std::sync::Arc; use bytes::Bytes; use prost::Message; @@ -30,10 +32,13 @@ use snafu::{OptionExt, ResultExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use crate::error::{self, Result}; -use crate::proto::{Footer, Metadata, PostScript, StripeFooter}; +use crate::proto::{self, Footer, Metadata, PostScript, StripeFooter}; use crate::reader::decompress::Decompressor; +use crate::statistics::ColumnStatistics; +use crate::stripe::StripeMetadata; use super::decompress::Compression; +use super::schema::{create_schema, TypeDescription}; use super::ChunkReader; const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024; @@ -41,10 +46,84 @@ const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024; /// The file's metadata. #[derive(Debug)] pub struct FileMetadata { - pub postscript: PostScript, - pub footer: Footer, - pub metadata: Metadata, - pub stripe_footers: Vec, + compression: Option, + type_description: Arc, + number_of_rows: u64, + /// Statistics of columns across entire file + column_statistics: Vec, + stripes: Vec, + user_custom_metadata: HashMap>, + // TODO: for now keeping this, but ideally won't want all stripe footers here + // since don't want to require parsing all stripe footers in file unless actually required + stripe_footers: Vec, +} + +impl FileMetadata { + fn from_proto( + postscript: &proto::PostScript, + footer: &proto::Footer, + metadata: &proto::Metadata, + stripe_footers: Vec, + ) -> Result { + let compression = + Compression::from_proto(postscript.compression(), postscript.compression_block_size); + let type_description = create_schema(&footer.types, 0)?; + let number_of_rows = footer.number_of_rows(); + let column_statistics = footer + .statistics + .iter() + .map(TryFrom::try_from) + .collect::>>()?; + let stripes = footer + .stripes + .iter() + .zip(metadata.stripe_stats.iter()) + .map(TryFrom::try_from) + .collect::>>()?; + let user_custom_metadata = footer + .metadata + .iter() + .map(|kv| (kv.name().to_owned(), kv.value().to_vec())) + .collect::>(); + + Ok(Self { + compression, + type_description, + number_of_rows, + column_statistics, + stripes, + user_custom_metadata, + stripe_footers, + }) + } + + pub fn number_of_rows(&self) -> u64 { + self.number_of_rows + } + + pub fn compression(&self) -> Option { + self.compression + } + + pub fn type_description(&self) -> &Arc { + &self.type_description + } + + pub fn column_file_statistics(&self) -> &[ColumnStatistics] { + &self.column_statistics + } + + pub fn stripe_metadatas(&self) -> &[StripeMetadata] { + &self.stripes + } + + pub fn stripe_footers(&self) -> &[StripeFooter] { + &self.stripe_footers + } + + pub fn user_custom_metadata(&self) -> &HashMap> { + &self.user_custom_metadata + } } pub fn read_metadata(reader: &mut R) -> Result @@ -121,12 +200,7 @@ where stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); } - Ok(FileMetadata { - postscript, - footer, - metadata, - stripe_footers, - }) + FileMetadata::from_proto(&postscript, &footer, &metadata, stripe_footers) } // TODO: refactor like for sync @@ -241,12 +315,7 @@ where stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); } - Ok(FileMetadata { - postscript, - footer, - metadata, - stripe_footers, - }) + FileMetadata::from_proto(&postscript, &footer, &metadata, stripe_footers) } fn deserialize_footer(bytes: &[u8], compression: Option) -> Result