Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zero-copy make_mut #695

Merged
merged 10 commits into from
May 5, 2024
150 changes: 149 additions & 1 deletion src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::buf::IntoIter;
#[allow(unused)]
use crate::loom::sync::atomic::AtomicMut;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::Buf;
use crate::{Buf, BytesMut};

/// A cheaply cloneable and sliceable chunk of contiguous memory.
///
Expand Down Expand Up @@ -113,6 +113,7 @@ pub(crate) struct Vtable {
///
/// takes `Bytes` to value
pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec<u8>,
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
/// fn(data)
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
/// fn(data, ptr, len)
Expand Down Expand Up @@ -507,6 +508,49 @@ impl Bytes {
self.truncate(0);
}

/// Try to convert self into `BytesMut`.
///
/// If `self` is unique for the entire original buffer, this will succeed
/// and return a `BytesMut` with the contents of `self` without copying.
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// # Examples
///
/// ```
/// use bytes::{Bytes, BytesMut};
///
/// let bytes = Bytes::from(b"hello".to_vec());
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
/// ```
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
if self.is_unique() {
Ok(self.make_mut())
} else {
Err(self)
}
}

/// Convert self into `BytesMut`.
///
/// If `self` is unique for the entire original buffer, this will return a
/// `BytesMut` with the contents of `self` without copying.
/// If `self` is not unique for the entire original buffer, this will make
/// a copy of `self` subset of the original buffer in a new `BytesMut`.
///
/// # Examples
///
/// ```
/// use bytes::{Bytes, BytesMut};
///
/// let bytes = Bytes::from(b"hello".to_vec());
/// assert_eq!(bytes.make_mut(), BytesMut::from(&b"hello"[..]));
/// ```
pub fn make_mut(self) -> BytesMut {
let bytes = ManuallyDrop::new(self);
unsafe { (bytes.vtable.to_mut)(&bytes.data, bytes.ptr, bytes.len) }
}

#[inline]
pub(crate) unsafe fn with_vtable(
ptr: *const u8,
Expand Down Expand Up @@ -917,6 +961,7 @@ impl fmt::Debug for Vtable {
const STATIC_VTABLE: Vtable = Vtable {
clone: static_clone,
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
drop: static_drop,
};
Expand All @@ -931,6 +976,11 @@ unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8
slice.to_vec()
}

unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let slice = slice::from_raw_parts(ptr, len);
BytesMut::from(slice)
}

fn static_is_unique(_: &AtomicPtr<()>) -> bool {
false
}
Expand All @@ -944,13 +994,15 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
to_vec: promotable_even_to_vec,
to_mut: promotable_even_to_mut,
is_unique: promotable_is_unique,
drop: promotable_even_drop,
};

static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
clone: promotable_odd_clone,
to_vec: promotable_odd_to_vec,
to_mut: promotable_odd_to_mut,
is_unique: promotable_is_unique,
drop: promotable_odd_drop,
};
Expand Down Expand Up @@ -994,12 +1046,47 @@ unsafe fn promotable_to_vec(
}
}

unsafe fn promotable_to_mut(
data: &AtomicPtr<()>,
ptr: *const u8,
len: usize,
f: fn(*mut ()) -> *mut u8,
) -> BytesMut {
let shared = data.load(Ordering::Acquire);
let kind = shared as usize & KIND_MASK;

if kind == KIND_ARC {
shared_to_mut_impl(shared.cast(), ptr, len)
} else {
// KIND_VEC is a view of an underlying buffer at a certain offset.
// The ptr + len always represents the end of that buffer.
// Before truncating it, it is first promoted to KIND_ARC.
// Thus, we can safely reconstruct a Vec from it without leaking memory.
debug_assert_eq!(kind, KIND_VEC);

let buf = f(shared);
let off = offset_from(ptr, buf);
let cap = off + len;
let v = Vec::from_raw_parts(buf, cap, cap);
Sytten marked this conversation as resolved.
Show resolved Hide resolved

let mut b = BytesMut::from_vec(v);
b.advance_unchecked(off);
b
}
}

unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
promotable_to_vec(data, ptr, len, |shared| {
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
})
}

unsafe fn promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
promotable_to_mut(data, ptr, len, |shared| {
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
})
}

unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
data.with_mut(|shared| {
let shared = *shared;
Expand Down Expand Up @@ -1031,6 +1118,10 @@ unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize
promotable_to_vec(data, ptr, len, |shared| shared.cast())
}

unsafe fn promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
promotable_to_mut(data, ptr, len, |shared| shared.cast())
}

unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
data.with_mut(|shared| {
let shared = *shared;
Expand Down Expand Up @@ -1087,6 +1178,7 @@ const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignm
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_clone,
to_vec: shared_to_vec,
to_mut: shared_to_mut,
is_unique: shared_is_unique,
drop: shared_drop,
};
Expand Down Expand Up @@ -1133,6 +1225,45 @@ unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec
shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
}

unsafe fn shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut {
// The goal is to check if the current handle is the only handle
// that currently has access to the buffer. This is done by
// checking if the `ref_cnt` is currently 1.
//
// The `Acquire` ordering synchronizes with the `Release` as
// part of the `fetch_sub` in `release_shared`. The `fetch_sub`
// operation guarantees that any mutations done in other threads
// are ordered before the `ref_cnt` is decremented. As such,
// this `Acquire` will guarantee that those mutations are
// visible to the current thread.
//
// Otherwise, we take the other branch, copy the data and call `release_shared`.
if (*shared).ref_cnt.load(Ordering::Acquire) == 1 {
// Deallocate the `Shared` instance without running its destructor.
let shared = *Box::from_raw(shared);
let shared = ManuallyDrop::new(shared);
let buf = shared.buf;
let cap = shared.cap;

// Rebuild Vec
let off = offset_from(ptr, buf);
let v = Vec::from_raw_parts(buf, len + off, cap);

let mut b = BytesMut::from_vec(v);
b.advance_unchecked(off);
b
} else {
// Copy the data from Shared in a new Vec, then release it
let v = slice::from_raw_parts(ptr, len).to_vec();
release_shared(shared);
BytesMut::from_vec(v)
Sytten marked this conversation as resolved.
Show resolved Hide resolved
}
}

unsafe fn shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
shared_to_mut_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
}

pub(crate) unsafe fn shared_is_unique(data: &AtomicPtr<()>) -> bool {
let shared = data.load(Ordering::Acquire);
let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed);
Expand Down Expand Up @@ -1291,6 +1422,23 @@ where
new_addr as *mut u8
}

/// Precondition: dst >= original
///
/// The following line is equivalent to:
///
/// ```rust,ignore
/// self.ptr.as_ptr().offset_from(ptr) as usize;
/// ```
///
/// But due to min rust is 1.39 and it is only stabilized
/// in 1.47, we cannot use it.
#[inline]
fn offset_from(dst: *const u8, original: *const u8) -> usize {
debug_assert!(dst >= original);

dst as usize - original as usize
}
Sytten marked this conversation as resolved.
Show resolved Hide resolved

// compile-fails

/// ```compile_fail
Expand Down
32 changes: 31 additions & 1 deletion src/bytes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ impl BytesMut {
/// # SAFETY
///
/// The caller must ensure that `count` <= `self.cap`.
unsafe fn advance_unchecked(&mut self, count: usize) {
pub(crate) unsafe fn advance_unchecked(&mut self, count: usize) {
// Setting the start to 0 is a no-op, so return early if this is the
// case.
if count == 0 {
Expand Down Expand Up @@ -1713,6 +1713,7 @@ unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize)
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_v_clone,
to_vec: shared_v_to_vec,
to_mut: shared_v_to_mut,
is_unique: crate::bytes::shared_is_unique,
drop: shared_v_drop,
};
Expand Down Expand Up @@ -1747,6 +1748,35 @@ unsafe fn shared_v_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> V
}
}

unsafe fn shared_v_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let shared: *mut Shared = data.load(Ordering::Relaxed).cast();

if (*shared).is_unique() {
let shared = &mut *shared;

// The capacity is always the original capacity of the buffer
// minus the offset from the start of the buffer
let v = &mut shared.vec;
let v_capacity = v.capacity();
let v_ptr = v.as_mut_ptr();
let offset = offset_from(ptr as *mut u8, v_ptr);
let cap = v_capacity - offset;

let ptr = vptr(ptr as *mut u8);

BytesMut {
ptr,
len,
cap,
data: shared,
}
} else {
let v = slice::from_raw_parts(ptr, len).to_vec();
release_shared(shared);
BytesMut::from_vec(v)
}
}

unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
data.with_mut(|shared| {
release_shared(*shared as *mut Shared);
Expand Down
Loading