Skip to content

feat: create Bytes from std::sync::Arc #775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 134 additions & 14 deletions src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use alloc::sync::Arc;
use core::iter::FromIterator;
use core::mem::{self, ManuallyDrop};
use core::ops::{Deref, RangeBounds};
Expand Down Expand Up @@ -289,6 +290,74 @@ impl Bytes {
ret
}

/// Creates a new `Bytes` from an [`Arc<T>`] owner and a function that
/// returns the buffer given a reference to the contained `T`.
///
/// `T` must be [`Sized`] rather than a trait object or slice.
///
/// The returned `Bytes` can be cloned via `Arc` referencing counting,
/// whereas `Bytes` created via conversion from [`Vec`] must perform a new
/// allocation internally on first clone to hold the reference count.
/// This optimization is most significant if many `Bytes` instances are
/// created from the same `owner` and subsequently cloned.
///
/// ```
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// struct Pieces(Vec<Vec<u8>>);
/// let pieces = Arc::new(Pieces(vec![b"hello".to_vec(), b"world".to_vec()]));
/// let bytes: Vec<Bytes> = (0..2).map(|i| {
/// Bytes::from_arc_projection(pieces.clone(), |p| &p.0[i])
/// }).collect();
/// let bytes_cloned = bytes.clone();
/// assert_eq!(bytes[0], b"hello"[..]);
/// assert_eq!(bytes_cloned[1], b"world"[..]);
/// ```
///
/// See also [`Bytes::try_from_arc_projection`] for a fallible version.
pub fn from_arc_projection<T: Sync + 'static>(
owner: Arc<T>,
projection: impl FnOnce(&T) -> &[u8],
) -> Self {
let buf = projection(&*owner);
Bytes {
ptr: buf.as_ptr(),
len: buf.len(),
data: AtomicPtr::new(Arc::into_raw(owner) as *mut ()),
vtable: arcproj_vtable::<T>(),
}
}

/// Tries to creates a new `Bytes` from an [`Arc`] owner and a function that
/// returns the buffer given a reference to the contained `T` or fails.
///
/// This is similar to [`Bytes::from_arc_projection`] but fallible.
///
/// ```
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// struct Pieces(Vec<Vec<u8>>);
/// let pieces = Arc::new(Pieces(vec![b"hello".to_vec(), b"world".to_vec()]));
/// let bytes: Vec<Result<Bytes, &str>> = (0..3).map(|i| {
/// Bytes::try_from_arc_projection(pieces.clone(), |p| {
/// p.0.get(i).map(|v| &**v).ok_or("out of bounds")
/// })
/// }).collect();
/// assert_eq!(bytes, [Ok(b"hello"[..].into()), Ok(b"world"[..].into()), Err("out of bounds")]);
/// ```
pub fn try_from_arc_projection<T: Sync + 'static, E>(
owner: Arc<T>,
projection: impl FnOnce(&T) -> Result<&[u8], E>,
) -> Result<Self, E> {
let buf = projection(&*owner)?;
Ok(Bytes {
ptr: buf.as_ptr(),
len: buf.len(),
data: AtomicPtr::new(Arc::into_raw(owner) as *mut ()),
vtable: arcproj_vtable::<T>(),
})
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -322,8 +391,9 @@ impl Bytes {
/// Returns true if this is the only reference to the data and
/// `Into<BytesMut>` would avoid cloning the underlying buffer.
///
/// Always returns false if the data is backed by a [static slice](Bytes::from_static),
/// or an [owner](Bytes::from_owner).
/// Always returns false if the data is backed by a
/// [static slice](Bytes::from_static), [owner](Bytes::from_owner),
/// or [Arc projection](Bytes::from_arc_projection).
///
/// The result of this method may be invalidated immediately if another
/// thread clones this value while this is being called. Ensure you have
Expand Down Expand Up @@ -627,8 +697,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via either
/// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static).
/// Always fails if the data is backed by a
/// [static slice](Bytes::from_static), [owner](Bytes::from_owner),
/// or [Arc projection](Bytes::from_arc_projection).
///
/// # Examples
///
Expand Down Expand Up @@ -1073,13 +1144,17 @@ impl fmt::Debug for Vtable {
}
}

fn never_unique(_: &AtomicPtr<()>) -> bool {
false
}

// ===== impl StaticVtable =====

const STATIC_VTABLE: Vtable = Vtable {
clone: static_clone,
into_vec: static_to_vec,
into_mut: static_to_mut,
is_unique: static_is_unique,
is_unique: never_unique,
drop: static_drop,
};

Expand All @@ -1098,10 +1173,6 @@ unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesM
BytesMut::from(slice)
}

fn static_is_unique(_: &AtomicPtr<()>) -> bool {
false
}

unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}
Expand Down Expand Up @@ -1152,10 +1223,6 @@ unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Byte
BytesMut::from_vec(owned_to_vec(data, ptr, len))
}

unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool {
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;
Expand Down Expand Up @@ -1183,7 +1250,7 @@ static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
into_vec: owned_to_vec,
into_mut: owned_to_mut,
is_unique: owned_is_unique,
is_unique: never_unique,
drop: owned_drop,
};

Expand Down Expand Up @@ -1489,6 +1556,59 @@ unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) ->
}
}

fn arcproj_vtable<T: Sync>() -> &'static Vtable {
// Produce vtable via const promotion to &'static.
// <https://users.rust-lang.org/t/custom-vtables-with-integers/78508/2>
trait V {
const VTABLE: Vtable;
}
impl<T: Sync> V for T {
const VTABLE: Vtable = Vtable {
clone: arcproj_clone::<T>,
into_vec: arcproj_into_vec::<T>,
into_mut: arcproj_into_mut::<T>,
is_unique: never_unique,
drop: arcproj_drop::<T>,
};
}
&<T as V>::VTABLE
}

unsafe fn arcproj_clone<T: Sync>(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let arc = data.load(Ordering::Relaxed);

// Replicate `Arc::increment_strong_count`, which has a MSRV of 1.51.0.
let _ = core::mem::ManuallyDrop::new(Arc::<T>::from_raw(arc as *const T)).clone();

Bytes {
ptr,
len,
data: AtomicPtr::new(arc),
vtable: arcproj_vtable::<T>(),
}
}

unsafe fn arcproj_into_vec<T: Sync>(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let vec = slice::from_raw_parts(ptr, len).to_vec();
arcproj_drop_impl::<T>(data);
vec
}

unsafe fn arcproj_into_mut<T: Sync>(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let out = BytesMut::from(slice::from_raw_parts(ptr, len));
arcproj_drop_impl::<T>(data);
out
}

unsafe fn arcproj_drop<T: Sync>(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
arcproj_drop_impl::<T>(data);
}

unsafe fn arcproj_drop_impl<T: Sync>(data: &AtomicPtr<()>) {
// Replicate `Arc::decrement_strong_count`, which has a MSRV of 1.51.0.
drop(Arc::from_raw(data.load(Ordering::Relaxed) as *const T));
}

#[cold]
unsafe fn shallow_clone_vec(
atom: &AtomicPtr<()>,
Expand Down
99 changes: 99 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1647,3 +1647,102 @@ fn owned_safe_drop_on_as_ref_panic() {
assert!(result.is_err());
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn arcproj_is_unique_always_false() {
let b1 = Bytes::from_arc_projection(Arc::new([1, 2, 3, 4, 5, 6, 7]), |v| &v[..]);
assert!(!b1.is_unique()); // even if ref_cnt == 1
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
drop(b1);
assert!(!b2.is_unique()); // even if ref_cnt == 1
}

#[test]
fn arcproj_buf_sharing() {
let buf = [1, 2, 3, 4, 5, 6, 7];
let b1 = Bytes::from_arc_projection(Arc::new(buf), |v| &v[..]);
let b2 = b1.clone();
assert_eq!(&buf[..], &b1[..]);
assert_eq!(&buf[..], &b2[..]);
assert_eq!(b1.as_ptr(), b2.as_ptr());
assert_eq!(b1.len(), b2.len());
assert_eq!(b1.len(), buf.len());
}

#[test]
fn arcproj_buf_slicing() {
let b1 = Bytes::from_arc_projection(Arc::new(Vec::from(SHORT)), |v| &v[..]);
assert_eq!(SHORT, &b1[..]);
let b2 = b1.slice(1..(b1.len() - 1));
assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2);
assert_eq!(unsafe { b1.as_ptr().add(1) }, b2.as_ptr());
assert_eq!(SHORT.len() - 2, b2.len());
}

#[test]
fn arcproj_dropped_exactly_once() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());
let b2 = b1.clone();
assert_eq!(drop_counter.get(), 0);
drop(b1);
assert_eq!(drop_counter.get(), 0);
let b3 = b2.slice(1..b2.len() - 1);
drop(b2);
assert_eq!(drop_counter.get(), 0);
drop(b3);
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn arcproj_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance has a ref_cnt == 1.
let b1 = b1.try_into_mut().unwrap_err();

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

// `.into::<BytesMut>()` has correctly dropped the owner
assert_eq!(drop_counter.get(), 1);
}

#[test]
fn arcproj_into_vec() {
let drop_counter = SharedAtomicCounter::new();
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());

let v1: Vec<u8> = b1.into();
assert_eq!(&v1[..], &buf[..]);
// into() vec will copy out of the owner and drop it
assert_eq!(drop_counter.get(), 1);
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn arcproj_safe_drop_on_as_ref_panic() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = SharedAtomicCounter::new();
let mut owner = OwnedTester::new(buf, drop_counter.clone());
owner.panic_as_ref = true;

let result = panic::catch_unwind(AssertUnwindSafe(|| {
let _ = Bytes::from_arc_projection(Arc::new(owner), |o| o.as_ref());
}));

assert!(result.is_err());
assert_eq!(drop_counter.get(), 1);
}
Loading