Skip to content

Commit a8c4232

Browse files
tustvoldalamb
andauthored
Add IPC StreamDecoder (#5531)
* Add IPC StreamDecoder * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Test EOS --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 37d1d3d commit a8c4232

File tree

7 files changed

+401
-15
lines changed

7 files changed

+401
-15
lines changed

arrow-buffer/src/buffer/immutable.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -171,23 +171,33 @@ impl Buffer {
171171

172172
/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
173173
/// Doing so allows the same memory region to be shared between buffers.
174+
///
174175
/// # Panics
176+
///
175177
/// Panics iff `offset` is larger than `len`.
176178
pub fn slice(&self, offset: usize) -> Self {
179+
let mut s = self.clone();
180+
s.advance(offset);
181+
s
182+
}
183+
184+
/// Increases the offset of this buffer by `offset`
185+
///
186+
/// # Panics
187+
///
188+
/// Panics iff `offset` is larger than `len`.
189+
#[inline]
190+
pub fn advance(&mut self, offset: usize) {
177191
assert!(
178192
offset <= self.length,
179193
"the offset of the new Buffer cannot exceed the existing length"
180194
);
195+
self.length -= offset;
181196
// Safety:
182197
// This cannot overflow as
183198
// `self.offset + self.length < self.data.len()`
184199
// `offset < self.length`
185-
let ptr = unsafe { self.ptr.add(offset) };
186-
Self {
187-
data: self.data.clone(),
188-
length: self.length - offset,
189-
ptr,
190-
}
200+
self.ptr = unsafe { self.ptr.add(offset) };
191201
}
192202

193203
/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,

arrow-integration-testing/tests/ipc_reader.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
//! in `testing/arrow-ipc-stream/integration/...`
2020
2121
use arrow::error::ArrowError;
22-
use arrow::ipc::reader::{FileReader, StreamReader};
22+
use arrow::ipc::reader::{FileReader, StreamDecoder, StreamReader};
2323
use arrow::util::test_util::arrow_test_data;
24+
use arrow_buffer::Buffer;
2425
use arrow_integration_testing::read_gzip_json;
2526
use std::fs::File;
27+
use std::io::Read;
2628

2729
#[test]
2830
fn read_0_1_4() {
@@ -182,18 +184,45 @@ fn verify_arrow_stream(testdata: &str, version: &str, path: &str) {
182184
let filename = format!("{testdata}/arrow-ipc-stream/integration/{version}/{path}.stream");
183185
println!("Verifying {filename}");
184186

187+
// read expected JSON output
188+
let arrow_json = read_gzip_json(version, path);
189+
185190
// Compare contents to the expected output format in JSON
186191
{
187192
println!(" verifying content");
188193
let file = File::open(&filename).unwrap();
189194
let mut reader = StreamReader::try_new(file, None).unwrap();
190195

191-
// read expected JSON output
192-
let arrow_json = read_gzip_json(version, path);
193196
assert!(arrow_json.equals_reader(&mut reader).unwrap());
194197
// the next batch must be empty
195198
assert!(reader.next().is_none());
196199
// the stream must indicate that it's finished
197200
assert!(reader.is_finished());
198201
}
202+
203+
// Test stream decoder
204+
let expected = arrow_json.get_record_batches().unwrap();
205+
for chunk_sizes in [1, 2, 8, 123] {
206+
let mut decoder = StreamDecoder::new();
207+
let stream = chunked_file(&filename, chunk_sizes);
208+
let mut actual = Vec::with_capacity(expected.len());
209+
for mut x in stream {
210+
while !x.is_empty() {
211+
if let Some(x) = decoder.decode(&mut x).unwrap() {
212+
actual.push(x);
213+
}
214+
}
215+
}
216+
decoder.finish().unwrap();
217+
assert_eq!(expected, actual);
218+
}
219+
}
220+
221+
fn chunked_file(filename: &str, chunk_size: u64) -> impl Iterator<Item = Buffer> {
222+
let mut file = File::open(filename).unwrap();
223+
std::iter::from_fn(move || {
224+
let mut buf = vec![];
225+
let read = (&mut file).take(chunk_size).read_to_end(&mut buf).unwrap();
226+
(read != 0).then(|| Buffer::from_vec(buf))
227+
})
199228
}

arrow-ipc/src/convert.rs

+46-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@
1717

1818
//! Utilities for converting between IPC types and native Arrow types
1919
20+
use arrow_buffer::Buffer;
2021
use arrow_schema::*;
21-
use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset};
22+
use flatbuffers::{
23+
FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24+
VerifierOptions, WIPOffset,
25+
};
2226
use std::collections::HashMap;
27+
use std::fmt::{Debug, Formatter};
2328
use std::sync::Arc;
2429

25-
use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER};
30+
use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER};
2631
use DataType::*;
2732

2833
/// Serialize a schema in IPC format
@@ -806,6 +811,45 @@ pub(crate) fn get_fb_dictionary<'a>(
806811
builder.finish()
807812
}
808813

814+
/// An owned container for a validated [`Message`]
815+
///
816+
/// Safely decoding a flatbuffer requires validating the various embedded offsets,
817+
/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable
818+
/// to only do this once. [`crate::root_as_message`] performs this validation on construction,
819+
/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents
820+
/// storing this [`Message`] in the same data structure that owns the buffer, as this
821+
/// would require self-referential borrows.
822+
///
823+
/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`]
824+
/// without a lifetime bound.
825+
#[derive(Clone)]
826+
pub struct MessageBuffer(Buffer);
827+
828+
impl Debug for MessageBuffer {
829+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
830+
self.as_ref().fmt(f)
831+
}
832+
}
833+
834+
impl MessageBuffer {
835+
/// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
836+
pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
837+
let opts = VerifierOptions::default();
838+
let mut v = Verifier::new(&opts, &buf);
839+
<ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
840+
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
841+
})?;
842+
Ok(Self(buf))
843+
}
844+
845+
/// Return the [`Message`]
846+
#[inline]
847+
pub fn as_ref(&self) -> Message<'_> {
848+
// SAFETY: Run verifier on construction
849+
unsafe { crate::root_as_message_unchecked(&self.0) }
850+
}
851+
}
852+
809853
#[cfg(test)]
810854
mod tests {
811855
use super::*;

arrow-ipc/src/reader.rs

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
//! The `FileReader` and `StreamReader` have similar interfaces,
2121
//! however the `FileReader` expects a reader that supports `Seek`ing
2222
23+
mod stream;
24+
25+
pub use stream::*;
26+
2327
use flatbuffers::{VectorIter, VerifierOptions};
2428
use std::collections::HashMap;
2529
use std::fmt;

0 commit comments

Comments
 (0)