Skip to content

Commit 0f81a7b

Browse files
committed
Add safe zero-copy converion from bytes::Bytes (#4254)
1 parent a9b9c60 commit 0f81a7b

File tree

5 files changed

+38
-3
lines changed

5 files changed

+38
-3
lines changed

arrow-buffer/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ name = "arrow_buffer"
3333
path = "src/lib.rs"
3434
bench = false
3535

36+
[package.metadata.docs.rs]
37+
features = ["bytes"]
38+
3639
[dependencies]
40+
bytes = { version = "1.4", optional = true }
3741
num = { version = "0.4", default-features = false, features = ["std"] }
3842
half = { version = "2.1", default-features = false }
3943

arrow-buffer/src/bytes.rs

+30
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
2222
use core::slice;
2323
use std::ptr::NonNull;
24+
use std::sync::Arc;
2425
use std::{fmt::Debug, fmt::Formatter};
2526

2627
use crate::alloc::Deallocation;
@@ -148,3 +149,32 @@ impl Debug for Bytes {
148149
write!(f, " }}")
149150
}
150151
}
152+
153+
#[cfg(feature = "bytes")]
154+
impl From<bytes::Bytes> for Bytes {
155+
fn from(value: bytes::Bytes) -> Self {
156+
Self {
157+
len: value.len(),
158+
ptr: NonNull::new(value.as_ptr() as _).unwrap(),
159+
deallocation: Deallocation::Custom(Arc::new(value)),
160+
}
161+
}
162+
}
163+
164+
#[cfg(all(test, feature = "bytes"))]
165+
mod tests {
166+
use super::*;
167+
168+
#[test]
169+
fn test_from_bytes() {
170+
let bytes = bytes::Bytes::from(vec![1, 2, 3, 4]);
171+
let arrow_bytes: Bytes = bytes.clone().into();
172+
173+
assert_eq!(bytes.as_ptr(), arrow_bytes.as_ptr());
174+
175+
drop(bytes);
176+
drop(arrow_bytes);
177+
178+
let _ = Bytes::from(bytes::Bytes::new());
179+
}
180+
}

arrow-flight/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ license = { workspace = true }
2828

2929
[dependencies]
3030
arrow-array = { workspace = true }
31-
arrow-buffer = { workspace = true }
31+
arrow-buffer = { workspace = true, features = ["bytes"] }
3232
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
3333
arrow-cast = { workspace = true }
3434
arrow-ipc = { workspace = true }

arrow-flight/src/decode.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::{utils::flight_data_to_arrow_batch, FlightData};
1919
use arrow_array::{ArrayRef, RecordBatch};
20+
use arrow_buffer::Buffer;
2021
use arrow_schema::{Schema, SchemaRef};
2122
use bytes::Bytes;
2223
use futures::{ready, stream::BoxStream, Stream, StreamExt};
@@ -258,7 +259,7 @@ impl FlightDataDecoder {
258259
));
259260
};
260261

261-
let buffer: arrow_buffer::Buffer = data.data_body.into();
262+
let buffer = Buffer::from_bytes(data.data_body.into());
262263
let dictionary_batch =
263264
message.header_as_dictionary_batch().ok_or_else(|| {
264265
FlightError::protocol(

arrow-flight/src/sql/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ pub fn arrow_data_from_flight_data(
520520

521521
let dictionaries_by_field = HashMap::new();
522522
let record_batch = read_record_batch(
523-
&Buffer::from(&flight_data.data_body),
523+
&Buffer::from_bytes(flight_data.data_body.into()),
524524
ipc_record_batch,
525525
arrow_schema_ref.clone(),
526526
&dictionaries_by_field,

0 commit comments

Comments
 (0)