Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Queue sendable #681

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions crates/dispatch2/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

#![allow(missing_docs, non_camel_case_types)]

use core::ffi::{c_long, c_uint, c_ulong, c_void};
use core::ptr::addr_of;
use core::{
ffi::{c_long, c_uint, c_ulong, c_void},
ptr::addr_of,
};

#[cfg(feature = "objc2")]
use objc2::encode::{Encode, Encoding, RefEncode};
use objc2::{
encode::{Encode, Encoding, RefEncode},
Message,
};

// Try to generate as much as possible.
pub use crate::generated::*;
Expand All @@ -29,6 +34,10 @@ macro_rules! create_opaque_type {
unsafe impl RefEncode for $type_name {
const ENCODING_REF: Encoding = Encoding::Object;
}

#[cfg(feature = "objc2")]
// SAFETY: Dispatch types respond to objc messages.
unsafe impl Message for $type_name {}
};
}

Expand Down Expand Up @@ -108,10 +117,13 @@ create_opaque_type!(dispatch_io_s, dispatch_io_t);

/// A dispatch queue that executes blocks serially in FIFO order.
pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = core::ptr::null_mut();

/// A dispatch queue that executes blocks concurrently.
pub static DISPATCH_QUEUE_CONCURRENT: &dispatch_queue_attr_s = {
pub static DISPATCH_QUEUE_CONCURRENT: ImmutableStatic<dispatch_queue_attr_t> = {
// Safety: immutable external definition
unsafe { &_dispatch_queue_attr_concurrent }
ImmutableStatic(unsafe {
&_dispatch_queue_attr_concurrent as *const _ as dispatch_queue_attr_t
})
};

pub const DISPATCH_APPLY_AUTO: dispatch_queue_t = core::ptr::null_mut();
Expand Down Expand Up @@ -241,3 +253,13 @@ pub extern "C" fn dispatch_get_main_queue() -> dispatch_queue_main_t {
// SAFETY: Always safe to get pointer from static, only needed for MSRV.
unsafe { addr_of!(_dispatch_main_q) as dispatch_queue_main_t }
}

/// Wrapper type for immutable static variables exported from C,
/// that are documented to be safe for sharing and passing between threads.
#[repr(transparent)]
#[derive(Debug)]
pub struct ImmutableStatic<T>(pub T);
// Safety: safety is guaranteed by the external type.
unsafe impl<T> Sync for ImmutableStatic<T> {}
// Safety: safety is guaranteed by the external type.
unsafe impl<T> Send for ImmutableStatic<T> {}
62 changes: 30 additions & 32 deletions crates/dispatch2/src/group.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,25 @@
//! Dispatch group definition.

use alloc::boxed::Box;
use core::ffi::c_void;
use core::time::Duration;
use core::{ffi::c_void, time::Duration};

use super::object::DispatchObject;
use super::queue::Queue;
use super::utils::function_wrapper;
use super::{ffi::*, WaitError};
use super::{ffi::*, function_wrapper, queue::Queue, rc::Retained, AsRawDispatchObject, WaitError};

/// Dispatch group.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct Group {
dispatch_object: DispatchObject<dispatch_group_s>,
_inner: [u8; 0],
}

/// Dispatch group guard.
#[derive(Debug)]
pub struct GroupGuard(Group, bool);

impl Group {
/// Creates a new [Group].
pub fn new() -> Option<Self> {
pub fn new() -> Option<Retained<Self>> {
// Safety: valid to call.
let object = unsafe { dispatch_group_create() };
assert!(!object.is_null());

if object.is_null() {
return None;
}

// Safety: object cannot be null.
let dispatch_object = unsafe { DispatchObject::new_owned(object.cast()) };

Some(Group { dispatch_object })
// Safety: object must be valid.
unsafe { Retained::from_raw(object.cast()) }
}

/// Submit a function to a [Queue] and associates it with the [Group].
Expand Down Expand Up @@ -101,28 +88,39 @@ impl Group {
dispatch_group_enter(self.as_raw());
}

GroupGuard(self.clone(), false)
}
let group =
// Safety: group cannot be null.
unsafe { Retained::retain(self.as_raw().cast()) }.expect("failed to retain semaphore");

/// Set the finalizer function for the object.
pub fn set_finalizer<F>(&mut self, destructor: F)
where
F: Send + FnOnce(),
{
self.dispatch_object.set_finalizer(destructor);
GroupGuard(group, false)
}

/// Get the raw [dispatch_group_t] value.
///
/// # Safety
///
/// - Object shouldn't be released manually.
pub const unsafe fn as_raw(&self) -> dispatch_group_t {
// SAFETY: Upheld by caller
unsafe { self.dispatch_object.as_raw() }
pub fn as_raw(&self) -> dispatch_group_t {
self as *const Self as _
}
}

impl AsRawDispatchObject for Group {
fn as_raw_object(&self) -> dispatch_object_t {
self.as_raw().cast()
}
}

// Safety: group is inherently safe to move between threads.
unsafe impl Send for Group {}

// Safety: group is inherently safe to share between threads.
unsafe impl Sync for Group {}

/// Dispatch group guard.
#[derive(Debug)]
pub struct GroupGuard(Retained<Group>, bool);

impl GroupGuard {
/// Explicitly indicates that the function in the [Group] finished executing.
pub fn leave(mut self) {
Expand Down
70 changes: 59 additions & 11 deletions crates/dispatch2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ extern crate alloc;
#[cfg(feature = "std")]
extern crate std;

use self::ffi::dispatch_qos_class_t;

pub mod ffi;
#[allow(clippy::undocumented_unsafe_blocks)]
mod generated;
Expand All @@ -44,8 +42,23 @@ mod main_thread_bound;
pub mod object;
mod once;
pub mod queue;
pub mod rc;
pub mod semaphore;
mod utils;
pub mod workloop;

#[cfg(feature = "objc2")]
pub use self::main_thread_bound::{run_on_main, MainThreadBound};
pub use self::once::*;
pub use group::*;
pub use object::*;
pub use queue::*;
pub use semaphore::*;
pub use workloop::*;

use alloc::boxed::Box;
use core::{ffi::c_void, time::Duration};

use ffi::{dispatch_qos_class_t, dispatch_time, dispatch_time_t, DISPATCH_TIME_NOW};

/// Wait error.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
Expand All @@ -58,14 +71,15 @@ pub enum WaitError {
}

/// Quality of service that specify the priorities for executing tasks.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[non_exhaustive]
pub enum QualityOfServiceClass {
/// Quality of service for user-interactive tasks.
UserInteractive,
/// Quality of service for tasks that prevent the user from actively using your app.
UserInitiated,
/// Default Quality of service.
#[default]
Default,
/// Quality of service for tasks that the user does not track actively.
Utility,
Expand All @@ -91,10 +105,44 @@ impl From<QualityOfServiceClass> for dispatch_qos_class_t {
}
}

pub use self::group::*;
#[cfg(feature = "objc2")]
pub use self::main_thread_bound::{run_on_main, MainThreadBound};
pub use self::object::*;
pub use self::once::*;
pub use self::queue::*;
pub use self::semaphore::*;
impl TryFrom<Duration> for dispatch_time_t {
type Error = TryFromDurationError;

fn try_from(value: Duration) -> Result<Self, Self::Error> {
let secs = value.as_secs() as i64;

secs.checked_mul(1_000_000_000)
.and_then(|x| x.checked_add(i64::from(value.subsec_nanos())))
.map(|delta| {
// Safety: delta cannot overflow
unsafe { dispatch_time(DISPATCH_TIME_NOW, delta) }
})
.ok_or(Self::Error::TimeOverflow)
}
}

/// Error returned by [Queue::after].
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[non_exhaustive]
pub enum TryFromDurationError {
/// The given timeout value will result in an overflow when converting to dispatch time.
TimeOverflow,
}

/// Error returned by [Queue::set_qos_class_floor] or [WorkloopQueue::set_qos_class_floor].
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[non_exhaustive]
pub enum QualityOfServiceClassFloorError {
/// The relative priority is invalid.
InvalidRelativePriority,
}

pub(crate) extern "C" fn function_wrapper<F>(work_boxed: *mut c_void)
where
F: FnOnce(),
{
// Safety: we reconstruct from a Box.
let work = unsafe { Box::from_raw(work_boxed.cast::<F>()) };

(*work)();
}
Loading
Loading