From 8f1b58a93d7e6063c8489874d559773a42d096be Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Wed, 5 Mar 2025 17:41:34 -0800 Subject: [PATCH] feat: create `Bytes` from `std::sync::Arc` ...via infallible (`Bytes::from_arc_projection`) and fallible (`Bytes::try_from_arc_projection`) constructors. https://github.com/tokio-rs/bytes/issues/359#issuecomment-640812016 https://github.com/tokio-rs/bytes/issues/437#issuecomment-2701699739 --- src/bytes.rs | 148 +++++++++++++++++++++++++++++++++++++++----- tests/test_bytes.rs | 99 +++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+), 14 deletions(-) diff --git a/src/bytes.rs b/src/bytes.rs index e2c187cc1..cb9b04da1 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -1,3 +1,4 @@ +use alloc::sync::Arc; use core::iter::FromIterator; use core::mem::{self, ManuallyDrop}; use core::ops::{Deref, RangeBounds}; @@ -289,6 +290,74 @@ impl Bytes { ret } + /// Creates a new `Bytes` from an [`Arc`] owner and a function that + /// returns the buffer given a reference to the contained `T`. + /// + /// `T` must be [`Sized`] rather than a trait object or slice. + /// + /// The returned `Bytes` can be cloned via `Arc` referencing counting, + /// whereas `Bytes` created via conversion from [`Vec`] must perform a new + /// allocation internally on first clone to hold the reference count. + /// This optimization is most significant if many `Bytes` instances are + /// created from the same `owner` and subsequently cloned. + /// + /// ``` + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// struct Pieces(Vec>); + /// let pieces = Arc::new(Pieces(vec![b"hello".to_vec(), b"world".to_vec()])); + /// let bytes: Vec = (0..2).map(|i| { + /// Bytes::from_arc_projection(pieces.clone(), |p| &p.0[i]) + /// }).collect(); + /// let bytes_cloned = bytes.clone(); + /// assert_eq!(bytes[0], b"hello"[..]); + /// assert_eq!(bytes_cloned[1], b"world"[..]); + /// ``` + /// + /// See also [`Bytes::try_from_arc_projection`] for a fallible version. + pub fn from_arc_projection( + owner: Arc, + projection: impl FnOnce(&T) -> &[u8], + ) -> Self { + let buf = projection(&*owner); + Bytes { + ptr: buf.as_ptr(), + len: buf.len(), + data: AtomicPtr::new(Arc::into_raw(owner) as *mut ()), + vtable: arcproj_vtable::(), + } + } + + /// Tries to creates a new `Bytes` from an [`Arc`] owner and a function that + /// returns the buffer given a reference to the contained `T` or fails. + /// + /// This is similar to [`Bytes::from_arc_projection`] but fallible. + /// + /// ``` + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// struct Pieces(Vec>); + /// let pieces = Arc::new(Pieces(vec![b"hello".to_vec(), b"world".to_vec()])); + /// let bytes: Vec> = (0..3).map(|i| { + /// Bytes::try_from_arc_projection(pieces.clone(), |p| { + /// p.0.get(i).map(|v| &**v).ok_or("out of bounds") + /// }) + /// }).collect(); + /// assert_eq!(bytes, [Ok(b"hello"[..].into()), Ok(b"world"[..].into()), Err("out of bounds")]); + /// ``` + pub fn try_from_arc_projection( + owner: Arc, + projection: impl FnOnce(&T) -> Result<&[u8], E>, + ) -> Result { + let buf = projection(&*owner)?; + Ok(Bytes { + ptr: buf.as_ptr(), + len: buf.len(), + data: AtomicPtr::new(Arc::into_raw(owner) as *mut ()), + vtable: arcproj_vtable::(), + }) + } + /// Returns the number of bytes contained in this `Bytes`. /// /// # Examples @@ -322,8 +391,9 @@ impl Bytes { /// Returns true if this is the only reference to the data and /// `Into` would avoid cloning the underlying buffer. /// - /// Always returns false if the data is backed by a [static slice](Bytes::from_static), - /// or an [owner](Bytes::from_owner). + /// Always returns false if the data is backed by a + /// [static slice](Bytes::from_static), [owner](Bytes::from_owner), + /// or [Arc projection](Bytes::from_arc_projection). /// /// The result of this method may be invalidated immediately if another /// thread clones this value while this is being called. Ensure you have @@ -627,8 +697,9 @@ impl Bytes { /// If `self` is not unique for the entire original buffer, this will fail /// and return self. /// - /// This will also always fail if the buffer was constructed via either - /// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static). + /// Always fails if the data is backed by a + /// [static slice](Bytes::from_static), [owner](Bytes::from_owner), + /// or [Arc projection](Bytes::from_arc_projection). /// /// # Examples /// @@ -1073,13 +1144,17 @@ impl fmt::Debug for Vtable { } } +fn never_unique(_: &AtomicPtr<()>) -> bool { + false +} + // ===== impl StaticVtable ===== const STATIC_VTABLE: Vtable = Vtable { clone: static_clone, into_vec: static_to_vec, into_mut: static_to_mut, - is_unique: static_is_unique, + is_unique: never_unique, drop: static_drop, }; @@ -1098,10 +1173,6 @@ unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesM BytesMut::from(slice) } -fn static_is_unique(_: &AtomicPtr<()>) -> bool { - false -} - unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) { // nothing to drop for &'static [u8] } @@ -1152,10 +1223,6 @@ unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Byte BytesMut::from_vec(owned_to_vec(data, ptr, len)) } -unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool { - false -} - unsafe fn owned_drop_impl(owned: *mut ()) { let lifetime = owned.cast::(); let ref_cnt = &(*lifetime).ref_cnt; @@ -1183,7 +1250,7 @@ static OWNED_VTABLE: Vtable = Vtable { clone: owned_clone, into_vec: owned_to_vec, into_mut: owned_to_mut, - is_unique: owned_is_unique, + is_unique: never_unique, drop: owned_drop, }; @@ -1489,6 +1556,59 @@ unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> } } +fn arcproj_vtable() -> &'static Vtable { + // Produce vtable via const promotion to &'static. + // + trait V { + const VTABLE: Vtable; + } + impl V for T { + const VTABLE: Vtable = Vtable { + clone: arcproj_clone::, + into_vec: arcproj_into_vec::, + into_mut: arcproj_into_mut::, + is_unique: never_unique, + drop: arcproj_drop::, + }; + } + &::VTABLE +} + +unsafe fn arcproj_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { + let arc = data.load(Ordering::Relaxed); + + // Replicate `Arc::increment_strong_count`, which has a MSRV of 1.51.0. + let _ = core::mem::ManuallyDrop::new(Arc::::from_raw(arc as *const T)).clone(); + + Bytes { + ptr, + len, + data: AtomicPtr::new(arc), + vtable: arcproj_vtable::(), + } +} + +unsafe fn arcproj_into_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec { + let vec = slice::from_raw_parts(ptr, len).to_vec(); + arcproj_drop_impl::(data); + vec +} + +unsafe fn arcproj_into_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut { + let out = BytesMut::from(slice::from_raw_parts(ptr, len)); + arcproj_drop_impl::(data); + out +} + +unsafe fn arcproj_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { + arcproj_drop_impl::(data); +} + +unsafe fn arcproj_drop_impl(data: &AtomicPtr<()>) { + // Replicate `Arc::decrement_strong_count`, which has a MSRV of 1.51.0. + drop(Arc::from_raw(data.load(Ordering::Relaxed) as *const T)); +} + #[cold] unsafe fn shallow_clone_vec( atom: &AtomicPtr<()>, diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 613efc886..a16417698 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -1647,3 +1647,102 @@ fn owned_safe_drop_on_as_ref_panic() { assert!(result.is_err()); assert_eq!(drop_counter.get(), 1); } + +#[test] +fn arcproj_is_unique_always_false() { + let b1 = Bytes::from_arc_projection(Arc::new([1, 2, 3, 4, 5, 6, 7]), |v| &v[..]); + assert!(!b1.is_unique()); // even if ref_cnt == 1 + let b2 = b1.clone(); + assert!(!b1.is_unique()); + assert!(!b2.is_unique()); + drop(b1); + assert!(!b2.is_unique()); // even if ref_cnt == 1 +} + +#[test] +fn arcproj_buf_sharing() { + let buf = [1, 2, 3, 4, 5, 6, 7]; + let b1 = Bytes::from_arc_projection(Arc::new(buf), |v| &v[..]); + let b2 = b1.clone(); + assert_eq!(&buf[..], &b1[..]); + assert_eq!(&buf[..], &b2[..]); + assert_eq!(b1.as_ptr(), b2.as_ptr()); + assert_eq!(b1.len(), b2.len()); + assert_eq!(b1.len(), buf.len()); +} + +#[test] +fn arcproj_buf_slicing() { + let b1 = Bytes::from_arc_projection(Arc::new(Vec::from(SHORT)), |v| &v[..]); + assert_eq!(SHORT, &b1[..]); + let b2 = b1.slice(1..(b1.len() - 1)); + assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2); + assert_eq!(unsafe { b1.as_ptr().add(1) }, b2.as_ptr()); + assert_eq!(SHORT.len() - 2, b2.len()); +} + +#[test] +fn arcproj_dropped_exactly_once() { + let buf: [u8; 5] = [1, 2, 3, 4, 5]; + let drop_counter = SharedAtomicCounter::new(); + let owner = OwnedTester::new(buf, drop_counter.clone()); + let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref()); + let b2 = b1.clone(); + assert_eq!(drop_counter.get(), 0); + drop(b1); + assert_eq!(drop_counter.get(), 0); + let b3 = b2.slice(1..b2.len() - 1); + drop(b2); + assert_eq!(drop_counter.get(), 0); + drop(b3); + assert_eq!(drop_counter.get(), 1); +} + +#[test] +fn arcproj_to_mut() { + let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let drop_counter = SharedAtomicCounter::new(); + let owner = OwnedTester::new(buf, drop_counter.clone()); + let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref()); + + // Holding an owner will fail converting to a BytesMut, + // even when the bytes instance has a ref_cnt == 1. + let b1 = b1.try_into_mut().unwrap_err(); + + // That said, it's still possible, just not cheap. + let bm1: BytesMut = b1.into(); + let new_buf = &bm1[..]; + assert_eq!(new_buf, &buf[..]); + + // `.into::()` has correctly dropped the owner + assert_eq!(drop_counter.get(), 1); +} + +#[test] +fn arcproj_into_vec() { + let drop_counter = SharedAtomicCounter::new(); + let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let owner = OwnedTester::new(buf, drop_counter.clone()); + let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref()); + + let v1: Vec = b1.into(); + assert_eq!(&v1[..], &buf[..]); + // into() vec will copy out of the owner and drop it + assert_eq!(drop_counter.get(), 1); +} + +#[test] +#[cfg_attr(not(panic = "unwind"), ignore)] +fn arcproj_safe_drop_on_as_ref_panic() { + let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let drop_counter = SharedAtomicCounter::new(); + let mut owner = OwnedTester::new(buf, drop_counter.clone()); + owner.panic_as_ref = true; + + let result = panic::catch_unwind(AssertUnwindSafe(|| { + let _ = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref()); + })); + + assert!(result.is_err()); + assert_eq!(drop_counter.get(), 1); +}