Skip to content

Commit

Permalink
Merge pull request #25 from datafusion-contrib/fix/read_entire_file
Browse files Browse the repository at this point in the history
fix: read entire file with ArrowReader
  • Loading branch information
WenyXu authored Nov 5, 2023
2 parents 2b7a2b0 + e0aa734 commit de88a24
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 25 deletions.
54 changes: 33 additions & 21 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,30 @@ impl<R: Read> ArrowReader<R> {
}
}

impl<R: Read + Seek> ArrowReader<R> {
fn try_advance_stripe(&mut self) -> Option<std::result::Result<RecordBatch, ArrowError>> {
match self
.cursor
.next()
.map(|r| r.map_err(|err| ArrowError::ExternalError(Box::new(err))))
{
Some(Ok(stripe)) => {
match NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
{
Ok(decoder) => {
self.current_stripe = Some(Box::new(decoder));
self.next()
}
Err(err) => Some(Err(err)),
}
}
Some(Err(err)) => Some(Err(err)),
None => None,
}
}
}

pub fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
let metadata = cursor
.reader
Expand Down Expand Up @@ -93,28 +117,16 @@ impl<R: Read + Seek> Iterator for ArrowReader<R> {

fn next(&mut self) -> Option<Self::Item> {
match self.current_stripe.as_mut() {
Some(stripe) => stripe
.next()
.map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err)))),
None => match self
.cursor
.next()
.map(|r| r.map_err(|err| ArrowError::ExternalError(Box::new(err))))
{
Some(Ok(stripe)) => {
match NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
{
Ok(decoder) => {
self.current_stripe = Some(Box::new(decoder));
self.next()
}
Err(err) => Some(Err(err)),
}
Some(stripe) => {
match stripe
.next()
.map(|batch| batch.map_err(|err| ArrowError::ExternalError(Box::new(err))))
{
Some(rb) => Some(rb),
None => self.try_advance_stripe(),
}
Some(Err(err)) => Some(Err(err)),
None => None,
},
}
None => self.try_advance_stripe(),
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions tests/basic/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,19 @@ pub async fn async_basic_test_0() {
pub fn v0_file_test() {
let path = basic_path("demo-11-zlib.orc");
let reader = new_arrow_reader_root(&path);
let _expected_row_count = reader.total_row_count();
let expected_row_count = reader.total_row_count();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let _total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
// TODO: not reading entire file, debug
// assert_eq!(expected_row_count as usize, total_rows);
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(expected_row_count as usize, total_rows);
}

#[tokio::test]
pub async fn v0_file_test_async() {
let path = basic_path("demo-11-zlib.orc");
let reader = new_arrow_stream_reader_root(&path).await;
let batches = reader.try_collect::<Vec<_>>().await.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(1_920_800, total_rows);
}

#[test]
Expand Down

0 comments on commit de88a24

Please sign in to comment.