From f9e091b70fed7e3f3481aae818717d50c313ec0b Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 10 Feb 2025 11:56:17 -0800 Subject: [PATCH 1/2] Avoid use of flatbuffers::size_prefixed_root --- arrow-ipc/src/convert.rs | 61 ++++++++++++++++++++++++---------------- arrow-ipc/src/reader.rs | 47 +++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 28 deletions(-) diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index aadeb7703371..79dd1726ed70 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -28,7 +28,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use crate::writer::DictionaryTracker; -use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER}; +use crate::{KeyValue, Message, CONTINUATION_MARKER}; use DataType::*; /// Low level Arrow [Schema] to IPC bytes converter @@ -255,32 +255,43 @@ pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result { // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix // 4 bytes - the byte length of the payload // a flatbuffer Message whose header is the Schema - if buffer.len() >= 4 { - // check continuation marker - let continuation_marker = &buffer[0..4]; - let begin_offset: usize = if continuation_marker.eq(&CONTINUATION_MARKER) { - // 4 bytes: CONTINUATION_MARKER - // 4 bytes: length - // buffer - 4 - } else { - // backward compatibility for buffer without the continuation marker - // 4 bytes: length - // buffer - 0 - }; - let msg = size_prefixed_root_as_message(&buffer[begin_offset..]).map_err(|err| { - ArrowError::ParseError(format!("Unable to convert flight info to a message: {err}")) - })?; - let ipc_schema = msg.header_as_schema().ok_or_else(|| { - ArrowError::ParseError("Unable to convert flight info to a schema".to_string()) - })?; - Ok(fb_to_schema(ipc_schema)) - } else { - Err(ArrowError::ParseError( + if buffer.len() < 4 { + return Err(ArrowError::ParseError( "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string() - )) + )); + } + + let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER { + if buffer.len() < 8 { + return Err(ArrowError::ParseError( + "The buffer length is less than 8 and missing the length of buffer".to_string(), + )); + } + buffer[4..].split_at(4) + } else { + buffer.split_at(4) + }; + + let len = ::from_le_bytes(len.try_into().unwrap()); + if len < 0 { + return Err(ArrowError::ParseError(format!( + "The encapsulated message's reported length is negative ({len})" + ))); + } + + if buffer.len() < len as usize { + let actual_len = buffer.len(); + return Err(ArrowError::ParseError( + format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})") + )); } + + let msg = crate::root_as_message(buffer) + .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?; + let ipc_schema = msg.header_as_schema().ok_or_else(|| { + ArrowError::ParseError("Unable to convert flight info to a schema".to_string()) + })?; + Ok(fb_to_schema(ipc_schema)) } /// Get the Arrow data type from the flatbuffer Field table diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index ca0d09e2282f..722b6cb738b3 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1480,12 +1480,14 @@ impl RecordBatchReader for StreamReader { #[cfg(test)] mod tests { - use crate::writer::{unslice_run_array, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; + use crate::convert::fb_to_schema; + use crate::writer::{ + unslice_run_array, write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, + }; use super::*; - use crate::convert::fb_to_schema; - use crate::{root_as_footer, root_as_message}; + use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message}; use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder}; use arrow_array::types::*; use arrow_buffer::{NullBuffer, OffsetBuffer}; @@ -2617,4 +2619,43 @@ mod tests { let err = read_ipc_with_decoder(buf).unwrap_err(); assert_eq!(err.to_string(), expected_err); } + + #[test] + fn test_roundtrip_schema() { + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + ), + Field::new( + "b", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + ), + ]); + + let options = IpcWriteOptions::default(); + let data_gen = IpcDataGenerator::default(); + let mut dict_tracker = DictionaryTracker::new(false); + let encoded_data = + data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options); + let mut schema_bytes = vec![]; + write_message(&mut schema_bytes, encoded_data, &options).expect("write_message"); + + let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) { + 4 + } else { + 0 + }; + + size_prefixed_root_as_message(&schema_bytes[begin_offset..]) + .expect_err("size_prefixed_root_as_message"); + + let msg = parse_message(&schema_bytes).expect("parse_message"); + let ipc_schema = msg.header_as_schema().expect("header_as_schema"); + let new_schema = fb_to_schema(ipc_schema); + + assert_eq!(schema, new_schema); + } } From c39c9d6b6a015523039e327a990702243a3dc668 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Mar 2025 13:29:18 -0500 Subject: [PATCH 2/2] Add schema encoding reprodicer --- arrow-flight/src/lib.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index 1dd2700794f3..1af89b70dbbf 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -892,4 +892,26 @@ mod tests { let des_schema: Schema = (&result).try_into().unwrap(); assert_eq!(schema, des_schema); } + + #[test] + fn test_dict_schema() { + // Test for https://github.com/apache/arrow-rs/issues/7058 + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + ), + Field::new( + "b", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + ), + ]); + + let flight_info = FlightInfo::new().try_with_schema(&schema).unwrap(); + + let new_schema = Schema::try_from(flight_info).unwrap(); + assert_eq!(schema, new_schema); + } }