Skip to content

Commit 368f823

Browse files
committed
hide api change + add require_alignment to StreamDecoder too
1 parent a1cb1a6 commit 368f823

File tree

7 files changed

+62
-20
lines changed

7 files changed

+62
-20
lines changed

arrow-flight/src/decode.rs

-1
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ impl FlightDataDecoder {
308308
&state.schema,
309309
&mut state.dictionaries_by_field,
310310
&message.version(),
311-
false,
312311
)
313312
.map_err(|e| {
314313
FlightError::DecodeError(format!("Error decoding ipc dictionary: {e}"))

arrow-flight/src/sql/client.rs

-1
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,6 @@ pub fn arrow_data_from_flight_data(
613613
&dictionaries_by_field,
614614
None,
615615
&ipc_message.version(),
616-
false,
617616
)?;
618617
Ok(ArrowFlightData::RecordBatch(record_batch))
619618
}

arrow-flight/src/utils.rs

-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ pub fn flight_data_to_arrow_batch(
9999
dictionaries_by_id,
100100
None,
101101
&message.version(),
102-
false,
103102
)
104103
})?
105104
}

arrow-integration-testing/src/flight_client_scenarios/integration_test.rs

-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ async fn receive_batch_flight_data(
269269
&schema,
270270
dictionaries_by_id,
271271
&message.version(),
272-
false,
273272
)
274273
.expect("Error reading dictionary");
275274

arrow-integration-testing/src/flight_server_scenarios/integration_test.rs

-2
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ async fn record_batch_from_message(
308308
dictionaries_by_id,
309309
None,
310310
&message.version(),
311-
false,
312311
);
313312

314313
arrow_batch_result
@@ -331,7 +330,6 @@ async fn dictionary_from_message(
331330
&schema_ref,
332331
dictionaries_by_id,
333332
&message.version(),
334-
false,
335333
);
336334
dictionary_batch_result
337335
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {e:?}")))

arrow-ipc/src/reader.rs

+38-9
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,35 @@ impl<'a> ArrayReader<'a> {
485485
}
486486
}
487487

488+
pub fn read_record_batch(
489+
buf: &Buffer,
490+
batch: crate::RecordBatch,
491+
schema: SchemaRef,
492+
dictionaries_by_id: &HashMap<i64, ArrayRef>,
493+
projection: Option<&[usize]>,
494+
metadata: &MetadataVersion,
495+
) -> Result<RecordBatch, ArrowError> {
496+
read_record_batch2(
497+
buf,
498+
batch,
499+
schema,
500+
dictionaries_by_id,
501+
projection,
502+
metadata,
503+
false,
504+
)
505+
}
506+
507+
pub fn read_dictionary(
508+
buf: &Buffer,
509+
batch: crate::DictionaryBatch,
510+
schema: &Schema,
511+
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
512+
metadata: &MetadataVersion,
513+
) -> Result<(), ArrowError> {
514+
read_dictionary2(buf, batch, schema, dictionaries_by_id, metadata, false)
515+
}
516+
488517
/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
489518
///
490519
/// If `require_alignment` is true, this function will return an error if any array data in the
@@ -495,7 +524,7 @@ impl<'a> ArrayReader<'a> {
495524
/// and copy over the data if any array data in the input `buf` is not properly aligned.
496525
/// (Properly aligned array data will remain zero-copy.)
497526
/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
498-
pub fn read_record_batch(
527+
fn read_record_batch2(
499528
buf: &Buffer,
500529
batch: crate::RecordBatch,
501530
schema: SchemaRef,
@@ -564,7 +593,7 @@ pub fn read_record_batch(
564593

565594
/// Read the dictionary from the buffer and provided metadata,
566595
/// updating the `dictionaries_by_id` with the resulting dictionary
567-
pub fn read_dictionary(
596+
fn read_dictionary2(
568597
buf: &Buffer,
569598
batch: crate::DictionaryBatch,
570599
schema: &Schema,
@@ -593,7 +622,7 @@ pub fn read_dictionary(
593622
let value = value_type.as_ref().clone();
594623
let schema = Schema::new(vec![Field::new("", value, true)]);
595624
// Read a single column
596-
let record_batch = read_record_batch(
625+
let record_batch = read_record_batch2(
597626
buf,
598627
batch.data().unwrap(),
599628
Arc::new(schema),
@@ -781,7 +810,7 @@ impl FileDecoder {
781810
match message.header_type() {
782811
crate::MessageHeader::DictionaryBatch => {
783812
let batch = message.header_as_dictionary_batch().unwrap();
784-
read_dictionary(
813+
read_dictionary2(
785814
&buf.slice(block.metaDataLength() as _),
786815
batch,
787816
&self.schema,
@@ -812,7 +841,7 @@ impl FileDecoder {
812841
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
813842
})?;
814843
// read the block that makes up the record batch into a buffer
815-
read_record_batch(
844+
read_record_batch2(
816845
&buf.slice(block.metaDataLength() as _),
817846
batch,
818847
self.schema.clone(),
@@ -1255,7 +1284,7 @@ impl<R: Read> StreamReader<R> {
12551284
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
12561285
self.reader.read_exact(&mut buf)?;
12571286

1258-
read_record_batch(
1287+
read_record_batch2(
12591288
&buf.into(),
12601289
batch,
12611290
self.schema(),
@@ -1276,7 +1305,7 @@ impl<R: Read> StreamReader<R> {
12761305
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
12771306
self.reader.read_exact(&mut buf)?;
12781307

1279-
read_dictionary(
1308+
read_dictionary2(
12801309
&buf.into(),
12811310
batch,
12821311
&self.schema,
@@ -2048,7 +2077,7 @@ mod tests {
20482077
assert_ne!(b.as_ptr().align_offset(8), 0);
20492078

20502079
let ipc_batch = message.header_as_record_batch().unwrap();
2051-
let roundtrip = read_record_batch(
2080+
let roundtrip = read_record_batch2(
20522081
&b,
20532082
ipc_batch,
20542083
batch.schema(),
@@ -2085,7 +2114,7 @@ mod tests {
20852114
assert_ne!(b.as_ptr().align_offset(8), 0);
20862115

20872116
let ipc_batch = message.header_as_record_batch().unwrap();
2088-
let result = read_record_batch(
2117+
let result = read_record_batch2(
20892118
&b,
20902119
ipc_batch,
20912120
batch.schema(),

arrow-ipc/src/reader/stream.rs

+24-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
2424
use arrow_schema::{ArrowError, SchemaRef};
2525

2626
use crate::convert::MessageBuffer;
27-
use crate::reader::{read_dictionary, read_record_batch};
27+
use crate::reader::{read_dictionary2, read_record_batch2};
2828
use crate::{MessageHeader, CONTINUATION_MARKER};
2929

3030
/// A low-level interface for reading [`RecordBatch`] data from a stream of bytes
@@ -40,6 +40,8 @@ pub struct StreamDecoder {
4040
state: DecoderState,
4141
/// A scratch buffer when a read is split across multiple `Buffer`
4242
buf: MutableBuffer,
43+
/// Whether or not array data in input buffers are required to be aligned
44+
require_alignment: bool,
4345
}
4446

4547
#[derive(Debug)]
@@ -83,6 +85,23 @@ impl StreamDecoder {
8385
Self::default()
8486
}
8587

88+
/// Specifies whether or not array data in input buffers is required to be properly aligned.
89+
///
90+
/// If `require_alignment` is true, this decoder will return an error if any array data in the
91+
/// input `buf` is not properly aligned.
92+
/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
93+
/// [`arrow_data::ArrayData`].
94+
///
95+
/// If `require_alignment` is false (the default), this decoder will automatically allocate a
96+
/// new aligned buffer and copy over the data if any array data in the input `buf` is not
97+
/// properly aligned. (Properly aligned array data will remain zero-copy.)
98+
/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
99+
/// [`arrow_data::ArrayData`].
100+
pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
101+
self.require_alignment = require_alignment;
102+
self
103+
}
104+
86105
/// Try to read the next [`RecordBatch`] from the provided [`Buffer`]
87106
///
88107
/// [`Buffer::advance`] will be called on `buffer` for any consumed bytes.
@@ -192,14 +211,14 @@ impl StreamDecoder {
192211
let schema = self.schema.clone().ok_or_else(|| {
193212
ArrowError::IpcError("Missing schema".to_string())
194213
})?;
195-
let batch = read_record_batch(
214+
let batch = read_record_batch2(
196215
&body,
197216
batch,
198217
schema,
199218
&self.dictionaries,
200219
None,
201220
&version,
202-
false,
221+
self.require_alignment,
203222
)?;
204223
self.state = DecoderState::default();
205224
return Ok(Some(batch));
@@ -209,13 +228,13 @@ impl StreamDecoder {
209228
let schema = self.schema.as_deref().ok_or_else(|| {
210229
ArrowError::IpcError("Missing schema".to_string())
211230
})?;
212-
read_dictionary(
231+
read_dictionary2(
213232
&body,
214233
dictionary,
215234
schema,
216235
&mut self.dictionaries,
217236
&version,
218-
false,
237+
self.require_alignment,
219238
)?;
220239
self.state = DecoderState::default();
221240
}

0 commit comments

Comments
 (0)