Skip to content

Commit f9c66ff

Browse files
committed
Add safe zero-copy converion from bytes::Bytes (#4254)
1 parent a9b9c60 commit f9c66ff

File tree

5 files changed

+37
-3
lines changed

5 files changed

+37
-3
lines changed

arrow-buffer/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ name = "arrow_buffer"
3333
path = "src/lib.rs"
3434
bench = false
3535

36+
[package.metadata.docs.rs]
37+
features = ["bytes"]
38+
3639
[dependencies]
40+
bytes = { version = "1.4", optional = true }
3741
num = { version = "0.4", default-features = false, features = ["std"] }
3842
half = { version = "2.1", default-features = false }
3943

arrow-buffer/src/bytes.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,32 @@ impl Debug for Bytes {
148148
write!(f, " }}")
149149
}
150150
}
151+
152+
#[cfg(feature = "bytes")]
153+
impl From<bytes::Bytes> for Bytes {
154+
fn from(value: bytes::Bytes) -> Self {
155+
Self {
156+
len: value.len(),
157+
ptr: NonNull::new(value.as_ptr() as _).unwrap(),
158+
deallocation: Deallocation::Custom(std::sync::Arc::new(value)),
159+
}
160+
}
161+
}
162+
163+
#[cfg(all(test, feature = "bytes"))]
164+
mod tests {
165+
use super::*;
166+
167+
#[test]
168+
fn test_from_bytes() {
169+
let bytes = bytes::Bytes::from(vec![1, 2, 3, 4]);
170+
let arrow_bytes: Bytes = bytes.clone().into();
171+
172+
assert_eq!(bytes.as_ptr(), arrow_bytes.as_ptr());
173+
174+
drop(bytes);
175+
drop(arrow_bytes);
176+
177+
let _ = Bytes::from(bytes::Bytes::new());
178+
}
179+
}

arrow-flight/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ license = { workspace = true }
2828

2929
[dependencies]
3030
arrow-array = { workspace = true }
31-
arrow-buffer = { workspace = true }
31+
arrow-buffer = { workspace = true, features = ["bytes"] }
3232
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
3333
arrow-cast = { workspace = true }
3434
arrow-ipc = { workspace = true }

arrow-flight/src/decode.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::{utils::flight_data_to_arrow_batch, FlightData};
1919
use arrow_array::{ArrayRef, RecordBatch};
20+
use arrow_buffer::Buffer;
2021
use arrow_schema::{Schema, SchemaRef};
2122
use bytes::Bytes;
2223
use futures::{ready, stream::BoxStream, Stream, StreamExt};
@@ -258,7 +259,7 @@ impl FlightDataDecoder {
258259
));
259260
};
260261

261-
let buffer: arrow_buffer::Buffer = data.data_body.into();
262+
let buffer = Buffer::from_bytes(data.data_body.into());
262263
let dictionary_batch =
263264
message.header_as_dictionary_batch().ok_or_else(|| {
264265
FlightError::protocol(

arrow-flight/src/sql/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ pub fn arrow_data_from_flight_data(
520520

521521
let dictionaries_by_field = HashMap::new();
522522
let record_batch = read_record_batch(
523-
&Buffer::from(&flight_data.data_body),
523+
&Buffer::from_bytes(flight_data.data_body.into()),
524524
ipc_record_batch,
525525
arrow_schema_ref.clone(),
526526
&dictionaries_by_field,

0 commit comments

Comments
 (0)