Skip to content

Commit 1c9583a

Browse files
XiangpengHaoalamb
andauthored
Avoid unecessary copy when reading arrow files (#11840)
* avoid copy * fmt --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 86030a1 commit 1c9583a

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::physical_plan::{
3131
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
3232
};
3333

34+
use arrow::buffer::Buffer;
3435
use arrow_ipc::reader::FileDecoder;
3536
use arrow_schema::SchemaRef;
3637
use datafusion_common::config::ConfigOptions;
@@ -296,7 +297,10 @@ impl FileOpener for ArrowOpener {
296297
for (dict_block, dict_result) in
297298
footer.dictionaries().iter().flatten().zip(dict_results)
298299
{
299-
decoder.read_dictionary(dict_block, &dict_result.into())?;
300+
decoder.read_dictionary(
301+
dict_block,
302+
&Buffer::from_bytes(dict_result.into()),
303+
)?;
300304
}
301305

302306
// filter recordbatches according to range
@@ -332,7 +336,10 @@ impl FileOpener for ArrowOpener {
332336
.zip(recordbatch_results)
333337
.filter_map(move |(block, data)| {
334338
decoder
335-
.read_record_batch(&block, &data.into())
339+
.read_record_batch(
340+
&block,
341+
&Buffer::from_bytes(data.into()),
342+
)
336343
.transpose()
337344
}),
338345
)

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
408408
"Error IPC message while deserializing ScalarValue::List: {e}"
409409
))
410410
})?;
411-
let buffer = Buffer::from(arrow_data);
411+
let buffer = Buffer::from(arrow_data.as_slice());
412412

413413
let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
414414
Error::General(
@@ -423,7 +423,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
423423
"Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
424424
))
425425
})?;
426-
let buffer = Buffer::from(arrow_data);
426+
let buffer = Buffer::from(arrow_data.as_slice());
427427

428428
let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
429429
Error::General(

0 commit comments

Comments
 (0)