From a5defdc9c5446e39521de455d7265e16f54741bb Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Sun, 5 Nov 2023 21:50:14 +1100 Subject: [PATCH 1/2] Fix bug where not reading all stripes in file --- src/arrow_reader.rs | 54 +++++++++++++++++++++++++++------------------ tests/basic/main.rs | 7 +++--- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index 48ee1c5e..49ab601f 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -58,6 +58,30 @@ impl ArrowReader { } } +impl ArrowReader { + fn try_advance_stripe(&mut self) -> Option> { + match self + .cursor + .next() + .map(|r| r.map_err(|err| ArrowError::ExternalError(Box::new(err)))) + { + Some(Ok(stripe)) => { + match NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size) + .map_err(|err| ArrowError::ExternalError(Box::new(err))) + { + Ok(decoder) => { + self.current_stripe = Some(Box::new(decoder)); + self.next() + } + Err(err) => Some(Err(err)), + } + } + Some(Err(err)) => Some(Err(err)), + None => None, + } + } +} + pub fn create_arrow_schema(cursor: &Cursor) -> Schema { let metadata = cursor .reader @@ -93,28 +117,16 @@ impl Iterator for ArrowReader { fn next(&mut self) -> Option { match self.current_stripe.as_mut() { - Some(stripe) => stripe - .next() - .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err)))), - None => match self - .cursor - .next() - .map(|r| r.map_err(|err| ArrowError::ExternalError(Box::new(err)))) - { - Some(Ok(stripe)) => { - match NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size) - .map_err(|err| ArrowError::ExternalError(Box::new(err))) - { - Ok(decoder) => { - self.current_stripe = Some(Box::new(decoder)); - self.next() - } - Err(err) => Some(Err(err)), - } + Some(stripe) => { + match stripe + .next() + .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err)))) + { + Some(rb) => Some(rb), + None => self.try_advance_stripe(), } - Some(Err(err)) => Some(Err(err)), - None => None, - }, + } + None => self.try_advance_stripe(), } } } diff --git a/tests/basic/main.rs b/tests/basic/main.rs index a0b9df06..3722afc7 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -269,11 +269,10 @@ pub async fn async_basic_test_0() { pub fn v0_file_test() { let path = basic_path("demo-11-zlib.orc"); let reader = new_arrow_reader_root(&path); - let _expected_row_count = reader.total_row_count(); + let expected_row_count = reader.total_row_count(); let batches = reader.collect::, _>>().unwrap(); - let _total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - // TODO: not reading entire file, debug - // assert_eq!(expected_row_count as usize, total_rows); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(expected_row_count as usize, total_rows); } #[test] From e0aa734ca2acc59183d4eafcdf789cc0a49ff665 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Sun, 5 Nov 2023 21:56:14 +1100 Subject: [PATCH 2/2] Add async test --- tests/basic/main.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/basic/main.rs b/tests/basic/main.rs index 3722afc7..1c178e3f 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -275,6 +275,15 @@ pub fn v0_file_test() { assert_eq!(expected_row_count as usize, total_rows); } +#[tokio::test] +pub async fn v0_file_test_async() { + let path = basic_path("demo-11-zlib.orc"); + let reader = new_arrow_stream_reader_root(&path).await; + let batches = reader.try_collect::>().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(1_920_800, total_rows); +} + #[test] pub fn alltypes_test() { let compressions = ["none", "snappy", "zlib", "lzo", "zstd", "lz4"];