diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index 48ee1c5e..49ab601f 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -58,6 +58,30 @@ impl ArrowReader { } } +impl ArrowReader { + fn try_advance_stripe(&mut self) -> Option> { + match self + .cursor + .next() + .map(|r| r.map_err(|err| ArrowError::ExternalError(Box::new(err)))) + { + Some(Ok(stripe)) => { + match NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size) + .map_err(|err| ArrowError::ExternalError(Box::new(err))) + { + Ok(decoder) => { + self.current_stripe = Some(Box::new(decoder)); + self.next() + } + Err(err) => Some(Err(err)), + } + } + Some(Err(err)) => Some(Err(err)), + None => None, + } + } +} + pub fn create_arrow_schema(cursor: &Cursor) -> Schema { let metadata = cursor .reader @@ -93,28 +117,16 @@ impl Iterator for ArrowReader { fn next(&mut self) -> Option { match self.current_stripe.as_mut() { - Some(stripe) => stripe - .next() - .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err)))), - None => match self - .cursor - .next() - .map(|r| r.map_err(|err| ArrowError::ExternalError(Box::new(err)))) - { - Some(Ok(stripe)) => { - match NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size) - .map_err(|err| ArrowError::ExternalError(Box::new(err))) - { - Ok(decoder) => { - self.current_stripe = Some(Box::new(decoder)); - self.next() - } - Err(err) => Some(Err(err)), - } + Some(stripe) => { + match stripe + .next() + .map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err)))) + { + Some(rb) => Some(rb), + None => self.try_advance_stripe(), } - Some(Err(err)) => Some(Err(err)), - None => None, - }, + } + None => self.try_advance_stripe(), } } } diff --git a/tests/basic/main.rs b/tests/basic/main.rs index a0b9df06..1c178e3f 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -269,11 +269,19 @@ pub async fn async_basic_test_0() { pub fn v0_file_test() { let path = basic_path("demo-11-zlib.orc"); let reader = new_arrow_reader_root(&path); - let _expected_row_count = reader.total_row_count(); + let expected_row_count = reader.total_row_count(); let batches = reader.collect::, _>>().unwrap(); - let _total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - // TODO: not reading entire file, debug - // assert_eq!(expected_row_count as usize, total_rows); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(expected_row_count as usize, total_rows); +} + +#[tokio::test] +pub async fn v0_file_test_async() { + let path = basic_path("demo-11-zlib.orc"); + let reader = new_arrow_stream_reader_root(&path).await; + let batches = reader.try_collect::>().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(1_920_800, total_rows); } #[test]