From e0a87fc3a46e0b18691f8af2891d02f8ad668aa2 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 16 Feb 2024 15:59:31 +0000 Subject: [PATCH 01/10] Fix `EventLoopFuture` and `EventLoopPromise` under strict concurrency checking # Motivation We need to tackle the remaining strict concurrency checking related `Sendable` warnings in NIO. The first place to start is making sure that `EventLoopFuture` and `EventLoopPromise` are properly annotated. # Modification In a previous https://github.com/apple/swift-nio/pull/2496, @weissi changed the `@unchecked Sendable` conformances of `EventLoopFuture/Promise` to be conditional on the sendability of the generic `Value` type. After having looked at all the APIs on the future and promise types as well as reading the latest Concurrency evolution proposals, specifically the [Region based Isolation](https://github.com/apple/swift-evolution/blob/main/proposals/0414-region-based-isolation.md), I came to the conclusion that the previous `@unchecked Sendable` annotations were correct. The reasoning for this is: 1. An `EventLoopPromise` and `EventLoopFuture` pair are tied to a specific `EventLoop` 2. An `EventLoop` represents an isolation region and values tied to its isolation are not allowed to be shared outside of it unless they are disconnected from the region 3. The `value` used to succeed a promise often come from outside the isolation domain of the `EventLoop` hence they must be transferred into the promise. 4. The isolation region of the event loop is enforced through `@Sendable` annotations on all closures that receive the value in some kind of transformation e.g. `map()` 5. Any method on `EventLoopFuture` that combines itself with another future must require `Sendable` of the other futures `Value` since we cannot statically enforce that futures are bound to the same event loop i.e. to the same isolation domain Due to the above rules, this PR adds back the `@unchecked Sendable` conformances to both types. Furthermore, this PR revisits every single method on `EventLoopPromise/Future` and adds missing `Sendable` and `@Sendable` annotation where necessary to uphold the above rules. A few important things to call out: - Since `transferring` is currently not available this PR requires a `Sendable` conformance for some methods on `EventLoopPromise/Future` that should rather take a `transffering` argument - To enable the common case where a value from the same event loop is used to succeed a promise I added two additional methods that take a `eventLoopBoundResult` and enforce dynamic isolation checking. We might have to do this for more methods once we adopt those changes in other targets/packages. # Result After this PR has landed our lowest level building block should be inline with what the rest of the language enforces in Concurrency. The `EventLoopFuture.swift` produces no more warnings under strict concurrency checking on the latest 5.10 snapshots. --- Sources/NIOCore/AsyncAwaitSupport.swift | 13 +- .../NIOCore/DispatchQueue+WithFuture.swift | 5 +- Sources/NIOCore/EventLoop.swift | 50 +- .../NIOCore/EventLoopFuture+Deprecated.swift | 68 +- .../EventLoopFuture+WithEventLoop.swift | 13 +- Sources/NIOCore/EventLoopFuture.swift | 587 ++++++++++++------ Tests/NIOPosixTests/EventLoopFutureTest.swift | 8 +- 7 files changed, 470 insertions(+), 274 deletions(-) diff --git a/Sources/NIOCore/AsyncAwaitSupport.swift b/Sources/NIOCore/AsyncAwaitSupport.swift index bcdc7a848d..105ce1c3c5 100644 --- a/Sources/NIOCore/AsyncAwaitSupport.swift +++ b/Sources/NIOCore/AsyncAwaitSupport.swift @@ -18,8 +18,9 @@ extension EventLoopFuture { /// This function can be used to bridge an `EventLoopFuture` into the `async` world. Ie. if you're in an `async` /// function and want to get the result of this future. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + @preconcurrency @inlinable - public func get() async throws -> Value { + public func get() async throws -> Value where Value: Sendable { return try await withUnsafeThrowingContinuation { (cont: UnsafeContinuation, Error>) in self.whenComplete { result in switch result { @@ -60,8 +61,11 @@ extension EventLoopPromise { /// - returns: A `Task` which was created to `await` the `body`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @discardableResult + @preconcurrency @inlinable - public func completeWithTask(_ body: @escaping @Sendable () async throws -> Value) -> Task { + public func completeWithTask( + _ body: @escaping @Sendable () async throws -> Value + ) -> Task where Value: Sendable { Task { do { let value = try await body() @@ -333,8 +337,11 @@ struct AsyncSequenceFromIterator: AsyncSeq @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension EventLoop { + @preconcurrency @inlinable - public func makeFutureWithTask(_ body: @Sendable @escaping () async throws -> Return) -> EventLoopFuture { + public func makeFutureWithTask( + _ body: @Sendable @escaping () async throws -> Return + ) -> EventLoopFuture { let promise = self.makePromise(of: Return.self) promise.completeWithTask(body) return promise.futureResult diff --git a/Sources/NIOCore/DispatchQueue+WithFuture.swift b/Sources/NIOCore/DispatchQueue+WithFuture.swift index f85ea96dbf..beb0e2c74e 100644 --- a/Sources/NIOCore/DispatchQueue+WithFuture.swift +++ b/Sources/NIOCore/DispatchQueue+WithFuture.swift @@ -28,9 +28,10 @@ extension DispatchQueue { /// - callbackMayBlock: The scheduled callback for the IO / task. /// - returns a new `EventLoopFuture` with value returned by the `block` parameter. @inlinable - public func asyncWithFuture( + @preconcurrency + public func asyncWithFuture( eventLoop: EventLoop, - _ callbackMayBlock: @escaping () throws -> NewValue + _ callbackMayBlock: @escaping @Sendable () throws -> NewValue ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: NewValue.self) diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index 50b90ed925..b2fe4c8f14 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -713,12 +713,6 @@ extension EventLoop { @inlinable @preconcurrency public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { - _submit(task) - } - @usableFromInline typealias SubmitCallback = @Sendable () throws -> T - - @inlinable - func _submit(_ task: @escaping SubmitCallback) -> EventLoopFuture { let promise: EventLoopPromise = makePromise(file: #fileID, line: #line) self.execute { @@ -742,18 +736,15 @@ extension EventLoop { /// - returns: An `EventLoopFuture` identical to the `EventLoopFuture` returned from `task`. @inlinable @preconcurrency - public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { - self._flatSubmit(task) - } - @usableFromInline typealias FlatSubmitCallback = @Sendable () -> EventLoopFuture - - @inlinable - func _flatSubmit(_ task: @escaping FlatSubmitCallback) -> EventLoopFuture { + public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { // TODO: This should take a closure that returns fresh self.submit(task).flatMap { $0 } } /// Schedule a `task` that is executed by this `EventLoop` at the given time. /// + /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and + /// this event loop might differ. + /// /// - parameters: /// - task: The asynchronous task to run. As with everything that runs on the `EventLoop`, it must not block. /// - returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait @@ -763,23 +754,11 @@ extension EventLoop { @discardableResult @inlinable @preconcurrency - public func flatScheduleTask( + public func flatScheduleTask( deadline: NIODeadline, file: StaticString = #fileID, line: UInt = #line, _ task: @escaping @Sendable () throws -> EventLoopFuture - ) -> Scheduled { - self._flatScheduleTask(deadline: deadline, file: file, line: line, task) - } - @usableFromInline typealias FlatScheduleTaskDeadlineCallback = () throws -> EventLoopFuture - - @discardableResult - @inlinable - func _flatScheduleTask( - deadline: NIODeadline, - file: StaticString, - line: UInt, - _ task: @escaping FlatScheduleTaskDelayCallback ) -> Scheduled { let promise: EventLoopPromise = self.makePromise(file: file, line: line) let scheduled = self.scheduleTask(deadline: deadline, task) @@ -790,6 +769,9 @@ extension EventLoop { /// Schedule a `task` that is executed by this `EventLoop` after the given amount of time. /// + /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and + /// this event loop might differ. + /// /// - parameters: /// - task: The asynchronous task to run. As everything that runs on the `EventLoop`, it must not block. /// - returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait @@ -799,23 +781,11 @@ extension EventLoop { @discardableResult @inlinable @preconcurrency - public func flatScheduleTask( + public func flatScheduleTask( in delay: TimeAmount, file: StaticString = #fileID, line: UInt = #line, _ task: @escaping @Sendable () throws -> EventLoopFuture - ) -> Scheduled { - self._flatScheduleTask(in: delay, file: file, line: line, task) - } - - @usableFromInline typealias FlatScheduleTaskDelayCallback = @Sendable () throws -> EventLoopFuture - - @inlinable - func _flatScheduleTask( - in delay: TimeAmount, - file: StaticString, - line: UInt, - _ task: @escaping FlatScheduleTaskDelayCallback ) -> Scheduled { let promise: EventLoopPromise = self.makePromise(file: file, line: line) let scheduled = self.scheduleTask(in: delay, task) @@ -951,7 +921,7 @@ extension EventLoop { notifying promise: EventLoopPromise?, _ task: @escaping ScheduleRepeatedTaskCallback ) -> RepeatedTask { - let futureTask: (RepeatedTask) -> EventLoopFuture = { repeatedTask in + let futureTask: @Sendable (RepeatedTask) -> EventLoopFuture = { repeatedTask in do { try task(repeatedTask) return self.makeSucceededFuture(()) diff --git a/Sources/NIOCore/EventLoopFuture+Deprecated.swift b/Sources/NIOCore/EventLoopFuture+Deprecated.swift index 75cfb07162..d2e370ebe4 100644 --- a/Sources/NIOCore/EventLoopFuture+Deprecated.swift +++ b/Sources/NIOCore/EventLoopFuture+Deprecated.swift @@ -13,65 +13,99 @@ //===----------------------------------------------------------------------===// extension EventLoopFuture { + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMap(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Value) -> EventLoopFuture) -> EventLoopFuture { + public func flatMap( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Value) -> EventLoopFuture + ) -> EventLoopFuture { return self.flatMap(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapThrowing(file: StaticString = #fileID, - line: UInt = #line, - _ callback: @escaping (Value) throws -> NewValue) -> EventLoopFuture { + public func flatMapThrowing( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Value) throws -> NewValue + ) -> EventLoopFuture { return self.flatMapThrowing(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapErrorThrowing(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) throws -> Value) -> EventLoopFuture { + public func flatMapErrorThrowing( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Error) throws -> Value + ) -> EventLoopFuture { return self.flatMapErrorThrowing(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func map(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Value) -> (NewValue)) -> EventLoopFuture { + public func map( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { return self.map(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapError(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapError( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Error) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { return self.flatMapError(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapResult(file: StaticString = #fileID, - line: UInt = #line, - _ body: @escaping (Value) -> Result) -> EventLoopFuture { + public func flatMapResult( + file: StaticString = #fileID, + line: UInt = #line, + _ body: @escaping @Sendable (Value) -> Result + ) -> EventLoopFuture { return self.flatMapResult(body) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func recover(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) -> Value) -> EventLoopFuture { + public func recover( + file: StaticString = #fileID, + line: UInt = #line, + _ callback: @escaping @Sendable (Error) -> Value + ) -> EventLoopFuture { return self.recover(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and(_ other: EventLoopFuture, - file: StaticString = #fileID, - line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + _ other: EventLoopFuture, + file: StaticString = #fileID, + line: UInt = #line + ) -> EventLoopFuture<(Value, OtherValue)> { return self.and(other) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and(value: OtherValue, - file: StaticString = #fileID, - line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + value: OtherValue, + file: StaticString = #fileID, + line: UInt = #line + ) -> EventLoopFuture<(Value, OtherValue)> { return self.and(value: value) } } diff --git a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift index 4d105fa6e5..637f674242 100644 --- a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift +++ b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift @@ -41,7 +41,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapWithEventLoop(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapWithEventLoop( + _ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { [eventLoop = self.eventLoop] in switch self._value! { @@ -75,7 +77,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the recovered value. @inlinable @preconcurrency - public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapErrorWithEventLoop( + _ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { [eventLoop = self.eventLoop] in switch self._value! { @@ -114,10 +118,11 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func foldWithEventLoop( + public func foldWithEventLoop( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + @Sendable func fold0(eventLoop: EventLoop) -> EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture in diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 185e00fecc..55e1756ceb 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -178,30 +178,52 @@ public struct EventLoopPromise { /// /// - parameters: /// - value: The successful result of the operation. + @preconcurrency @inlinable - public func succeed(_ value: Value) { + public func succeed(_ value: Value) where Value: Sendable { self._resolve(value: .success(value)) } + /// Deliver a successful result to the associated `EventLoopFuture` object. + /// + /// - Note: The call to this method must happen on the same event loop as this promise was created from. + /// + /// - parameters: + /// - eventLoopBoundValue: The successful result of the operation. + @inlinable + public func succeed(eventLoopBoundValue: Value) { + self._resolve(eventLoopBoundResult: .success(eventLoopBoundValue)) + } + /// Deliver an error to the associated `EventLoopFuture` object. /// /// - parameters: /// - error: The error from the operation. @inlinable public func fail(_ error: Error) { - self._resolve(value: .failure(error)) + if self.futureResult.eventLoop.inEventLoop { + self.futureResult._setError(error)._run() + } else { + self.futureResult.eventLoop.execute { + self.futureResult._setError(error)._run() + } + } } /// Complete the promise with the passed in `EventLoopFuture`. /// /// This method is equivalent to invoking `future.cascade(to: promise)`, /// but sometimes may read better than its cascade counterpart. - /// + /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the passed future and this promise might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - future: The future whose value will be used to succeed or fail this promise. /// - seealso: `EventLoopFuture.cascade(to:)` + @preconcurrency @inlinable - public func completeWith(_ future: EventLoopFuture) { + public func completeWith(_ future: EventLoopFuture) where Value: Sendable { future.cascade(to: self) } @@ -219,11 +241,33 @@ public struct EventLoopPromise { /// /// - parameters: /// - result: The result which will be used to succeed or fail this promise. + @preconcurrency @inlinable - public func completeWith(_ result: Result) { + public func completeWith(_ result: Result) where Value: Sendable { self._resolve(value: result) } + /// Complete the promise with the passed in `Result`. + /// + /// This method is equivalent to invoking: + /// ``` + /// switch result { + /// case .success(let value): + /// promise.succeed(value) + /// case .failure(let error): + /// promise.fail(error) + /// } + /// ``` + /// + /// - Note: The call to this method must happen on the same event loop as this promise was created from. + /// + /// - parameters: + /// - result: The result which will be used to succeed or fail this promise. + @inlinable + public func completeWith(eventLoopBoundResult: Result) { + self._resolve(eventLoopBoundResult: eventLoopBoundResult) + } + /// Fire the associated `EventLoopFuture` on the appropriate event loop. /// /// This method provides the primary difference between the `EventLoopPromise` and most @@ -233,7 +277,7 @@ public struct EventLoopPromise { /// - parameters: /// - value: The value to fire the future with. @inlinable - internal func _resolve(value: Result) { + internal func _resolve(value: Result) where Value: Sendable { if self.futureResult.eventLoop.inEventLoop { self._setValue(value: value)._run() } else { @@ -243,6 +287,23 @@ public struct EventLoopPromise { } } + /// Fire the associated `EventLoopFuture` on the appropriate event loop. + /// + /// This method provides the primary difference between the `EventLoopPromise` and most + /// other `Promise` implementations: specifically, all callbacks fire on the `EventLoop` + /// that was used to create the promise. + /// + /// - Note: The call to this method must happen on the same event loop as this promise was created from. + /// + /// - parameters: + /// - value: The value to fire the future with. + @inlinable + internal func _resolve(eventLoopBoundResult: Result) { + self.futureResult.eventLoop.assertInEventLoop() + + self._setValue(value: eventLoopBoundResult)._run() + } + /// Set the future result and get the associated callbacks. /// /// - parameters: @@ -464,19 +525,18 @@ extension EventLoopFuture { /// /// Note: In a sense, the `EventLoopFuture` is returned before it's created. /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the value of this `EventLoopFuture` and return /// a new `EventLoopFuture`. /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMap(_ callback: @escaping @Sendable (Value) -> EventLoopFuture) -> EventLoopFuture { - self._flatMap(callback) - } - @usableFromInline typealias FlatMapCallback = @Sendable (Value) -> EventLoopFuture - - @inlinable - func _flatMap(_ callback: @escaping FlatMapCallback) -> EventLoopFuture { + public func flatMap( + _ callback: @escaping @Sendable (Value) -> EventLoopFuture + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -507,19 +567,18 @@ extension EventLoopFuture { /// /// If your callback function throws, the returned `EventLoopFuture` will error. /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the value of this `EventLoopFuture` and return /// a new value lifted into a new `EventLoopFuture`. /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapThrowing(_ callback: @escaping @Sendable (Value) throws -> NewValue) -> EventLoopFuture { - self._flatMapThrowing(callback) - } - @usableFromInline typealias FlatMapThrowingCallback = @Sendable (Value) throws -> NewValue - - @inlinable - func _flatMapThrowing(_ callback: @escaping FlatMapThrowingCallback) -> EventLoopFuture { + public func flatMapThrowing( + _ callback: @escaping @Sendable (Value) throws -> NewValue + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -553,13 +612,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value or a rethrown error. @inlinable @preconcurrency - public func flatMapErrorThrowing(_ callback: @escaping @Sendable (Error) throws -> Value) -> EventLoopFuture { - self._flatMapErrorThrowing(callback) - } - @usableFromInline typealias FlatMapErrorThrowingCallback = @Sendable (Error) throws -> Value - - @inlinable - func _flatMapErrorThrowing(_ callback: @escaping FlatMapErrorThrowingCallback) -> EventLoopFuture { + public func flatMapErrorThrowing( + _ callback: @escaping @Sendable (Error) throws -> Value + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -605,13 +660,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func map(_ callback: @escaping @Sendable (Value) -> (NewValue)) -> EventLoopFuture { - self._map(callback) - } - @usableFromInline typealias MapCallback = @Sendable (Value) -> (NewValue) - - @inlinable - func _map(_ callback: @escaping MapCallback) -> EventLoopFuture { + public func map( + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { if NewValue.self == Value.self && NewValue.self == Void.self { self.whenSuccess(callback as! @Sendable (Value) -> Void) return self as! EventLoopFuture @@ -631,19 +682,18 @@ extension EventLoopFuture { /// /// If the callback cannot recover it should return a failed `EventLoopFuture`. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the error value of this `EventLoopFuture` and return /// a new value lifted into a new `EventLoopFuture`. /// - returns: A future that will receive the recovered value. @inlinable @preconcurrency - public func flatMapError(_ callback: @escaping @Sendable (Error) -> EventLoopFuture) -> EventLoopFuture { - self._flatMapError(callback) - } - @usableFromInline typealias FlatMapErrorCallback = @Sendable (Error) -> EventLoopFuture - - @inlinable - func _flatMapError(_ callback: @escaping FlatMapErrorCallback) -> EventLoopFuture { + public func flatMapError( + _ callback: @escaping @Sendable (Error) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -679,13 +729,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapResult(_ body: @escaping @Sendable (Value) -> Result) -> EventLoopFuture { - self._flatMapResult(body) - } - @usableFromInline typealias FlatMapResultCallback = @Sendable (Value) -> Result - - @inlinable - func _flatMapResult(_ body: @escaping FlatMapResultCallback) -> EventLoopFuture { + public func flatMapResult( + _ body: @escaping @Sendable (Value) -> Result + ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -718,12 +764,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func recover(_ callback: @escaping @Sendable (Error) -> Value) -> EventLoopFuture { - self._recover(callback) - } - @usableFromInline typealias RecoverCallback = @Sendable (Error) -> Value - - @inlinable - func _recover(_ callback: @escaping RecoverCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -736,10 +776,9 @@ extension EventLoopFuture { return next.futureResult } - @usableFromInline typealias AddCallbackCallback = @Sendable () -> CallbackList /// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions. @inlinable - internal func _addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList { + internal func _addCallback(_ callback: @escaping @Sendable () -> CallbackList) -> CallbackList { self.eventLoop.assertInEventLoop() if self._value == nil { self._callbacks.append(callback) @@ -754,11 +793,10 @@ extension EventLoopFuture { internal func _whenComplete(_ callback: @escaping @Sendable () -> CallbackList) { self._internalWhenComplete(callback) } - @usableFromInline typealias InternalWhenCompleteCallback = @Sendable () -> CallbackList /// Add a callback. If there's already a value, run as much of the chain as we can. @inlinable - internal func _internalWhenComplete(_ callback: @escaping InternalWhenCompleteCallback) { + internal func _internalWhenComplete(_ callback: @escaping @Sendable () -> CallbackList) { if self.eventLoop.inEventLoop { self._addCallback(callback)._run() } else { @@ -781,12 +819,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenSuccess(_ callback: @escaping @Sendable (Value) -> Void) { - self._whenSuccess(callback) - } - @usableFromInline typealias WhenSuccessCallback = @Sendable (Value) -> Void - - @inlinable - func _whenSuccess(_ callback: @escaping WhenSuccessCallback) { self._whenComplete { if case .success(let t) = self._value! { callback(t) @@ -808,12 +840,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenFailure(_ callback: @escaping @Sendable (Error) -> Void) { - self._whenFailure(callback) - } - @usableFromInline typealias WhenFailureCallback = @Sendable (Error) -> Void - - @inlinable - func _whenFailure(_ callback: @escaping WhenFailureCallback) { self._whenComplete { if case .failure(let e) = self._value! { callback(e) @@ -830,11 +856,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenComplete(_ callback: @escaping @Sendable (Result) -> Void) { - self._publicWhenComplete(callback) - } - @usableFromInline typealias WhenCompleteCallback = @Sendable (Result) -> Void - @inlinable - func _publicWhenComplete(_ callback: @escaping WhenCompleteCallback) { self._whenComplete { callback(self._value!) return CallbackList() @@ -853,6 +874,21 @@ extension EventLoopFuture { } return CallbackList() } + + /// Internal: Set the value and return a list of callbacks that should be invoked as a result. + /// + /// We need a seperate method for setting the error to avoid Sendable checking of `Value` + @inlinable + internal func _setError(_ error: Error) -> CallbackList { + self.eventLoop.assertInEventLoop() + if self._value == nil { + self._value = .failure(error) + let callbacks = self._callbacks + self._callbacks = CallbackList() + return callbacks + } + return CallbackList() + } } // MARK: and @@ -862,8 +898,14 @@ extension EventLoopFuture { /// provided `EventLoopFuture` both succeed. It then provides the pair /// of results. If either one fails, the combined `EventLoopFuture` will fail with /// the first error encountered. + /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the other future might differ i.e. + /// they might be bound to different event loops. + @preconcurrency @inlinable - public func and(_ other: EventLoopFuture) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + _ other: EventLoopFuture + ) -> EventLoopFuture<(Value, OtherValue)> { let promise = EventLoopPromise<(Value, OtherValue)>.makeUnleakablePromise(eventLoop: self.eventLoop) let box: UnsafeMutableTransferBox<(t:Value?, u: OtherValue?)> = .init((nil, nil)) @@ -903,8 +945,11 @@ extension EventLoopFuture { /// Return a new EventLoopFuture that contains this "and" another value. /// This is just syntactic sugar for `future.and(loop.makeSucceedFuture(value))`. + @preconcurrency @inlinable - public func and(value: OtherValue) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + value: OtherValue // TODO: This should be transferring + ) -> EventLoopFuture<(Value, OtherValue)> { return self.and(EventLoopFuture(eventLoop: self.eventLoop, value: value)) } } @@ -931,10 +976,14 @@ extension EventLoopFuture { /// }.cascade(to: userPromise) /// ``` /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the promise might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameter to: The `EventLoopPromise` to fulfill with the results of this future. /// - SeeAlso: `EventLoopPromise.completeWith(_:)` + @preconcurrency @inlinable - public func cascade(to promise: EventLoopPromise?) { + public func cascade(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenComplete { result in switch result { @@ -955,9 +1004,13 @@ extension EventLoopFuture { /// doWorkReturningInt().map({ $0 >= 0 }).cascade(to: boolPromise) /// ``` /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the promise might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameter to: The `EventLoopPromise` to fulfill when a successful result is available. + @preconcurrency @inlinable - public func cascadeSuccess(to promise: EventLoopPromise?) { + public func cascadeSuccess(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenSuccess { promise.succeed($0) } } @@ -967,6 +1020,7 @@ extension EventLoopFuture { /// This is an alternative variant of `cascade` that allows you to potentially return early failures in /// error cases, while passing the user `EventLoopPromise` onwards. /// + /// /// - Parameter to: The `EventLoopPromise` that should fail with the error of this `EventLoopFuture`. @inlinable public func cascadeFailure(to promise: EventLoopPromise?) { @@ -989,16 +1043,14 @@ extension EventLoopFuture { /// /// This is also forbidden in async contexts: prefer ``EventLoopFuture/get()``. /// + /// - Note: The `Value` must be `Sendable` since it is shared outside of the isolation domain of the event loop. + /// /// - returns: The value of the `EventLoopFuture` when it completes. /// - throws: The error value of the `EventLoopFuture` if it errors. @available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()") + @preconcurrency @inlinable - public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value { - return try self._wait(file: file, line: line) - } - - @inlinable - func _wait(file: StaticString, line: UInt) throws -> Value { + public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value where Value: Sendable { self.eventLoop._preconditionSafeToWait(file: file, line: line) let v: UnsafeMutableTransferBox?> = .init(nil) @@ -1036,25 +1088,20 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall EventLoopFuture. /// + /// - Note: The `Value` and `NewValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - futures: An array of `EventLoopFuture` to wait for. /// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`. /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func fold( + public func fold( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue) -> EventLoopFuture - ) -> EventLoopFuture { - self._fold(futures, with: combiningFunction) - } - @usableFromInline typealias FoldCallback = @Sendable (Value, OtherValue) -> EventLoopFuture - - @inlinable - func _fold( - _ futures: [EventLoopFuture], - with combiningFunction: @escaping FoldCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + @Sendable func fold0() -> EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture in @@ -1096,6 +1143,9 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall `EventLoopFuture`. /// + /// - Note: The `Value` and `InputValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - initialResult: An initial result to begin the reduction. /// - futures: An array of `EventLoopFuture` to wait for. @@ -1104,23 +1154,12 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the reduced value. @preconcurrency @inlinable - public static func reduce( - _ initialResult: Value, - _ futures: [EventLoopFuture], - on eventLoop: EventLoop, - _ nextPartialResult: @escaping @Sendable (Value, InputValue) -> Value - ) -> EventLoopFuture { - Self._reduce(initialResult, futures, on: eventLoop, nextPartialResult) - } - @usableFromInline typealias ReduceCallback = @Sendable (Value, InputValue) -> Value - - @inlinable - static func _reduce( + public static func reduce( _ initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, - _ nextPartialResult: @escaping ReduceCallback - ) -> EventLoopFuture { + _ nextPartialResult: @escaping @Sendable (Value, InputValue) -> Value + ) -> EventLoopFuture where Value: Sendable { let f0 = eventLoop.makeSucceededFuture(initialResult) let body = f0.fold(futures) { (t: Value, u: InputValue) -> EventLoopFuture in @@ -1141,6 +1180,9 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall `EventLoopFuture`. /// + /// - Note: The `Value` and `InputValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - initialResult: An initial result to begin the reduction. /// - futures: An array of `EventLoopFuture` to wait for. @@ -1149,36 +1191,27 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the combined value. @inlinable @preconcurrency - public static func reduce( + public static func reduce( into initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping @Sendable (inout Value, InputValue) -> Void - ) -> EventLoopFuture { - Self._reduce(into: initialResult, futures, on: eventLoop, updateAccumulatingResult) - } - @usableFromInline typealias ReduceIntoCallback = @Sendable (inout Value, InputValue) -> Void - - @inlinable - static func _reduce( - into initialResult: Value, - _ futures: [EventLoopFuture], - on eventLoop: EventLoop, - _ updateAccumulatingResult: @escaping ReduceIntoCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let p0 = eventLoop.makePromise(of: Value.self) - var value: Value = initialResult + let value = NIOLoopBoundBox(_value: initialResult, uncheckedEventLoop: eventLoop) let f0 = eventLoop.makeSucceededFuture(()) let future = f0.fold(futures) { (_: (), newValue: InputValue) -> EventLoopFuture in eventLoop.assertInEventLoop() - updateAccumulatingResult(&value, newValue) + var v = value.value + updateAccumulatingResult(&v, newValue) + value.value = v return eventLoop.makeSucceededFuture(()) } future.whenSuccess { eventLoop.assertInEventLoop() - p0.succeed(value) + p0.succeed(value.value) } future.whenFailure { (error) in eventLoop.assertInEventLoop() @@ -1203,7 +1236,10 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { + public static func andAllSucceed( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: Void.self) EventLoopFuture.andAllSucceed(futures, promise: promise) return promise.futureResult @@ -1218,14 +1254,17 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllSucceed( + _ futures: [EventLoopFuture], + promise: EventLoopPromise + ) { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { - self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop) } else { eventLoop.execute { - self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop) } } } @@ -1234,11 +1273,19 @@ extension EventLoopFuture { /// The new `EventLoopFuture` will contain all of the values fulfilled by the futures. /// /// The returned `EventLoopFuture` will fail as soon as any of the futures fails. + /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to wait on for fulfilled values. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. - public static func whenAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> { + @preconcurrency + public static func whenAllSucceed( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture<[Value]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Value].self) EventLoopFuture.whenAllSucceed(futures, promise: promise) return promise.futureResult @@ -1249,10 +1296,17 @@ extension EventLoopFuture { /// /// If the _results of all futures should be collected use `andAllComplete` instead. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. - public static func whenAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise<[Value]>) { + @preconcurrency + public static func whenAllSucceed( + _ futures: [EventLoopFuture], + promise: EventLoopPromise<[Value]> + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1281,7 +1335,6 @@ extension EventLoopFuture { } } - @usableFromInline typealias ReduceSuccessCallback = @Sendable (Int, InputValue) -> Void /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. /// @@ -1292,25 +1345,26 @@ extension EventLoopFuture { _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onValue: @escaping ReduceSuccessCallback - ) { + onValue: @escaping @Sendable (Int, InputValue) -> Void + ) where InputValue: Sendable { eventLoop.assertInEventLoop() - var remainingCount = futures.count - - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + @Sendable func processResult(_ index: Int, _ result: Result) { switch result { case .success(let result): onValue(index, result) - remainingCount -= 1 + remainingCount.value -= 1 - if remainingCount == 0 { + if remainingCount.value == 0 { promise.succeed(()) } case .failure(let error): @@ -1334,6 +1388,67 @@ extension EventLoopFuture { } } } + + /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when + /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have succeed, the provided promise will succeed. + /// Once any future fails, the provided promise will fail. + @inlinable + internal static func _reduceSuccesses0( + _ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop + ) { + eventLoop.assertInEventLoop() + + if futures.count == 0 { + promise.succeed(()) + return + } + + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + @Sendable + func processResult(_ index: Int, _ result: Result) { + switch result { + case .success: + remainingCount.value -= 1 + + if remainingCount.value == 0 { + promise.succeed(()) + } + case .failure(let error): + promise.fail(error) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success: + processResult(index, .success(())) + case .failure(let error): + processResult(index, .failure(error)) + } + if case .failure = result { + return // Once the promise is failed, future results do not need to be processed. + } + } else { + // We have to map to `Void` here to avoid sharing the potentially non-Sendable + // value across event loops. + future + .map { _ in () } + .hop(to: eventLoop) + .whenComplete { result in processResult(index, result) } + } + } + } } // MARK: "fail slow" reduce @@ -1350,7 +1465,10 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that succeeds after all futures complete. @inlinable - public static func andAllComplete(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { + public static func andAllComplete( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: Void.self) EventLoopFuture.andAllComplete(futures, promise: promise) return promise.futureResult @@ -1366,14 +1484,17 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFuture`s to wait for. /// - promise: The `EventLoopPromise` to succeed when all futures have completed. @inlinable - public static func andAllComplete(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllComplete( + _ futures: [EventLoopFuture], + promise: EventLoopPromise + ) { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, futures, eventLoop) } else { eventLoop.execute { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, futures, eventLoop) } } } @@ -1383,15 +1504,21 @@ extension EventLoopFuture { /// /// The returned `EventLoopFuture` always succeeds, regardless of any failures from the waiting futures. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// If it is desired to flatten them into a single `EventLoopFuture` that fails on the first `EventLoopFuture` failure, /// use one of the `reduce` methods instead. /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all the results of the provided futures. + @preconcurrency @inlinable - public static func whenAllComplete(_ futures: [EventLoopFuture], - on eventLoop: EventLoop) -> EventLoopFuture<[Result]> { + public static func whenAllComplete( + _ futures: [EventLoopFuture], + on eventLoop: EventLoop + ) -> EventLoopFuture<[Result]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Result].self) EventLoopFuture.whenAllComplete(futures, promise: promise) return promise.futureResult @@ -1401,12 +1528,18 @@ extension EventLoopFuture { /// /// The promise will always be succeeded, regardless of the outcome of the futures. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - promise: The `EventLoopPromise` to complete with the result of the futures. + @preconcurrency @inlinable - public static func whenAllComplete(_ futures: [EventLoopFuture], - promise: EventLoopPromise<[Result]>) { + public static func whenAllComplete( + _ futures: [EventLoopFuture], + promise: EventLoopPromise<[Result]> + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1439,34 +1572,33 @@ extension EventLoopFuture { } } - @usableFromInline typealias ReduceCompletions = @Sendable (Int, Result) -> Void - /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when /// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`. /// /// Once all the futures have completed, the provided promise will succeed. @inlinable - internal static func _reduceCompletions0( + internal static func _reduceCompletions0( _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onResult: @escaping ReduceCompletions + onResult: @escaping @Sendable (Int, Result) -> Void ) { eventLoop.assertInEventLoop() - var remainingCount = futures.count - - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + @Sendable func processResult(_ index: Int, _ result: Result) { onResult(index, result) - remainingCount -= 1 + remainingCount.value -= 1 - if remainingCount == 0 { + if remainingCount.value == 0 { promise.succeed(()) } } @@ -1484,6 +1616,58 @@ extension EventLoopFuture { } } } + + /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when + /// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have completed, the provided promise will succeed. + @inlinable + internal static func _reduceCompletions0( + _ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop + ) { + eventLoop.assertInEventLoop() + + if futures.count == 0 { + promise.succeed(()) + return + } + + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + @Sendable + func processResult(_ index: Int, _ result: Result) { + remainingCount.value -= 1 + + if remainingCount.value == 0 { + promise.succeed(()) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~30% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success: + processResult(index, .success(())) + case .failure(let error): + processResult(index, .failure(error)) + } + } else { + // We have to map to `Void` here to avoid sharing the potentially non-Sendable + // value across event loops. + future + .map { _ in () } + .hop(to: eventLoop) + .whenComplete { result in processResult(index, result) } + } + } + } } // MARK: hop @@ -1497,11 +1681,14 @@ extension EventLoopFuture { /// succinctly. It also contains an optimisation for the case when the loop you're hopping *from* is the same as /// the one you're hopping *to*, allowing you to avoid doing allocations in that case. /// + /// - Note: The `Value` must be `Sendable` since it is shared with the isolation domain of the target event loop. + /// /// - parameters: /// - to: The `EventLoop` that the returned `EventLoopFuture` will run on. /// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop. + @preconcurrency @inlinable - public func hop(to target: EventLoop) -> EventLoopFuture { + public func hop(to target: EventLoop) -> EventLoopFuture where Value: Sendable { if target === self.eventLoop { // We're already on that event loop, nothing to do here. Save an allocation. return self @@ -1524,12 +1711,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func always(_ callback: @escaping @Sendable (Result) -> Void) -> EventLoopFuture { - self._always(callback) - } - @usableFromInline typealias AlwaysCallback = @Sendable (Result) -> Void - - @inlinable - func _always(_ callback: @escaping AlwaysCallback) -> EventLoopFuture { self.whenComplete { result in callback(result) } return self } @@ -1577,8 +1758,11 @@ extension EventLoopFuture { /// - parameters: /// - orReplace: the value of the returned `EventLoopFuture` when then resolved future's value is `Optional.some()`. /// - returns: an new `EventLoopFuture` with new type parameter `NewValue` and the value passed in the `orReplace` parameter. + @preconcurrency @inlinable - public func unwrap(orReplace replacement: NewValue) -> EventLoopFuture where Value == Optional { + public func unwrap( + orReplace replacement: NewValue + ) -> EventLoopFuture where Value == Optional { return self.map { (value) -> NewValue in guard let value = value else { return replacement @@ -1605,14 +1789,6 @@ extension EventLoopFuture { @preconcurrency public func unwrap( orElse callback: @escaping @Sendable () -> NewValue - ) -> EventLoopFuture where Value == Optional { - self._unwrap(orElse: callback) - } - @usableFromInline typealias UnwrapCallback = @Sendable () -> NewValue - - @inlinable - func _unwrap( - orElse callback: @escaping UnwrapCallback ) -> EventLoopFuture where Value == Optional { return self.map { (value) -> NewValue in guard let value = value else { @@ -1632,25 +1808,18 @@ extension EventLoopFuture { /// blockingTask(value) /// } /// + /// - Note: The `Value` and `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: Function that will receive the value of this `EventLoopFuture` and return /// a new `EventLoopFuture`. @inlinable @preconcurrency - public func flatMapBlocking( + public func flatMapBlocking( onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Value) throws -> NewValue - ) -> EventLoopFuture { - self._flatMapBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias FlatMapBlockingCallback = @Sendable (Value) throws -> NewValue - - @inlinable - func _flatMapBlocking( - onto queue: DispatchQueue, - _ callbackMayBlock: @escaping FlatMapBlockingCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { return self.flatMap { result in queue.asyncWithFuture(eventLoop: self.eventLoop) { try callbackMayBlock(result) } } @@ -1664,11 +1833,17 @@ extension EventLoopFuture { /// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise` /// in the body of this function, consider using `cascade` instead. /// + /// - Note: The `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called with the successful result of the `EventLoopFuture`. + @preconcurrency @inlinable - public func whenSuccessBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Value) -> Void) { + public func whenSuccessBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Value) -> Void + ) where Value: Sendable { self.whenSuccess { value in queue.async { callbackMayBlock(value) } } @@ -1687,13 +1862,10 @@ extension EventLoopFuture { /// - callbackMayBlock: The callback that is called with the failed result of the `EventLoopFuture`. @inlinable @preconcurrency - public func whenFailureBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Error) -> Void) { - self._whenFailureBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias WhenFailureBlockingCallback = @Sendable (Error) -> Void - - @inlinable - func _whenFailureBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenFailureBlockingCallback) { + public func whenFailureBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Error) -> Void + ) { self.whenFailure { err in queue.async { callbackMayBlock(err) } } @@ -1702,18 +1874,17 @@ extension EventLoopFuture { /// Adds an observer callback to this `EventLoopFuture` that is called when the /// `EventLoopFuture` has any result. The observer callback is permitted to block. /// + /// - Note: The `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called when the `EventLoopFuture` is fulfilled. @inlinable @preconcurrency - public func whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Result) -> Void) { - self._whenCompleteBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias WhenCompleteBlocking = @Sendable (Result) -> Void - - @inlinable - func _whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenCompleteBlocking) { + public func whenCompleteBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Result) -> Void + ) where Value: Sendable { self.whenComplete { value in queue.async { callbackMayBlock(value) } } @@ -1825,13 +1996,19 @@ public struct _NIOEventLoopFutureIdentifier: Hashable, Sendable { } } -// EventLoopPromise is a reference type, but by its very nature is Sendable (if its Value is). -extension EventLoopPromise: Sendable where Value: Sendable { } +// The promise and future are both unchecked Sendable following the below isolation rules this is safe +// +// 1. Receiving the value of the future is always done on the EventLoop of the future, hence +// the value is never transferred out of the event loops isolation domain. It only gets transferred +// by certain methods such as `hop()` and those methods are annotated with requiring the Value to be +// Sendable +// 2. The promise is `Sendable` but fulfilling the promise with a value requires the user to +// transfer the value to the promise. This ensures that the value is now isolated to the event loops +// isolation domain. Note: Sendable values can always be transferred + +extension EventLoopPromise: @unchecked Sendable { } -// EventLoopFuture is a reference type, but it is Sendable (if its Value is). However, we enforce -// that by way of the guarantees of the EventLoop protocol, so the compiler cannot -// check it. -extension EventLoopFuture: @unchecked Sendable where Value: Sendable { } +extension EventLoopFuture: @unchecked Sendable { } extension EventLoopPromise where Value == Void { // Deliver a successful result to the associated `EventLoopFuture` object. diff --git a/Tests/NIOPosixTests/EventLoopFutureTest.swift b/Tests/NIOPosixTests/EventLoopFutureTest.swift index d7939adc31..54dbdf4363 100644 --- a/Tests/NIOPosixTests/EventLoopFutureTest.swift +++ b/Tests/NIOPosixTests/EventLoopFutureTest.swift @@ -17,6 +17,7 @@ import Dispatch @testable import NIOCore import NIOEmbedded import NIOPosix +import NIOConcurrencyHelpers enum EventLoopFutureTestError : Error { case example @@ -1380,18 +1381,19 @@ class EventLoopFutureTest : XCTestCase { func testWhenSuccessBlocking() { let eventLoop = EmbeddedEventLoop() let sem = DispatchSemaphore(value: 0) - var nonBlockingRan = false + let nonBlockingRan = NIOLockedValueBox(false) let p = eventLoop.makePromise(of: String.self) p.futureResult.whenSuccessBlocking(onto: DispatchQueue.global()) { sem.wait() // Block in callback XCTAssertEqual($0, "hello") - XCTAssertTrue(nonBlockingRan) + nonBlockingRan.withLockedValue { XCTAssertTrue($0) } + } p.succeed("hello") let p2 = eventLoop.makePromise(of: Bool.self) p2.futureResult.whenSuccess { _ in - nonBlockingRan = true + nonBlockingRan.withLockedValue { $0 = true } } p2.succeed(true) From 5d20621e1fae0f6eb13f8d7282f8c79e89608f1d Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Mon, 19 Feb 2024 15:02:56 +0000 Subject: [PATCH 02/10] George review --- Sources/NIOCore/EventLoop.swift | 2 +- Sources/NIOCore/EventLoopFuture.swift | 73 +++++---------------------- 2 files changed, 14 insertions(+), 61 deletions(-) diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index b2fe4c8f14..4c9d6c6018 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -736,7 +736,7 @@ extension EventLoop { /// - returns: An `EventLoopFuture` identical to the `EventLoopFuture` returned from `task`. @inlinable @preconcurrency - public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { // TODO: This should take a closure that returns fresh + public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { self.submit(task).flatMap { $0 } } diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 55e1756ceb..4e58738844 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -184,17 +184,6 @@ public struct EventLoopPromise { self._resolve(value: .success(value)) } - /// Deliver a successful result to the associated `EventLoopFuture` object. - /// - /// - Note: The call to this method must happen on the same event loop as this promise was created from. - /// - /// - parameters: - /// - eventLoopBoundValue: The successful result of the operation. - @inlinable - public func succeed(eventLoopBoundValue: Value) { - self._resolve(eventLoopBoundResult: .success(eventLoopBoundValue)) - } - /// Deliver an error to the associated `EventLoopFuture` object. /// /// - parameters: @@ -247,27 +236,6 @@ public struct EventLoopPromise { self._resolve(value: result) } - /// Complete the promise with the passed in `Result`. - /// - /// This method is equivalent to invoking: - /// ``` - /// switch result { - /// case .success(let value): - /// promise.succeed(value) - /// case .failure(let error): - /// promise.fail(error) - /// } - /// ``` - /// - /// - Note: The call to this method must happen on the same event loop as this promise was created from. - /// - /// - parameters: - /// - result: The result which will be used to succeed or fail this promise. - @inlinable - public func completeWith(eventLoopBoundResult: Result) { - self._resolve(eventLoopBoundResult: eventLoopBoundResult) - } - /// Fire the associated `EventLoopFuture` on the appropriate event loop. /// /// This method provides the primary difference between the `EventLoopPromise` and most @@ -287,23 +255,6 @@ public struct EventLoopPromise { } } - /// Fire the associated `EventLoopFuture` on the appropriate event loop. - /// - /// This method provides the primary difference between the `EventLoopPromise` and most - /// other `Promise` implementations: specifically, all callbacks fire on the `EventLoop` - /// that was used to create the promise. - /// - /// - Note: The call to this method must happen on the same event loop as this promise was created from. - /// - /// - parameters: - /// - value: The value to fire the future with. - @inlinable - internal func _resolve(eventLoopBoundResult: Result) { - self.futureResult.eventLoop.assertInEventLoop() - - self._setValue(value: eventLoopBoundResult)._run() - } - /// Set the future result and get the associated callbacks. /// /// - parameters: @@ -1435,17 +1386,17 @@ extension EventLoopFuture { processResult(index, .success(())) case .failure(let error): processResult(index, .failure(error)) - } - if case .failure = result { - return // Once the promise is failed, future results do not need to be processed. + return } } else { // We have to map to `Void` here to avoid sharing the potentially non-Sendable // value across event loops. - future - .map { _ in () } - .hop(to: eventLoop) - .whenComplete { result in processResult(index, result) } + future.whenComplete { result in + let voidResult = result.map { _ in } + future.eventLoop.execute { + processResult(index, voidResult) + } + } } } } @@ -1661,10 +1612,12 @@ extension EventLoopFuture { } else { // We have to map to `Void` here to avoid sharing the potentially non-Sendable // value across event loops. - future - .map { _ in () } - .hop(to: eventLoop) - .whenComplete { result in processResult(index, result) } + future.whenComplete { result in + let voidResult = result.map { _ in } + future.eventLoop.execute { + processResult(index, voidResult) + } + } } } } From 469e4750688080e01b3ab878c4ed0058a14bf8f4 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 9 Oct 2024 15:49:02 +0100 Subject: [PATCH 03/10] Clean up catch-up-merge issues --- Sources/NIOCore/EventLoopFuture.swift | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index d2e57a0b3d..85fc1d474d 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -639,11 +639,14 @@ extension EventLoopFuture { @inlinable @preconcurrency public func map( + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { self._map(callback) } @usableFromInline typealias MapCallback = @Sendable (Value) -> (NewValue) @inlinable + func _map( _ callback: @escaping @Sendable (Value) -> (NewValue) ) -> EventLoopFuture { if NewValue.self == Value.self && NewValue.self == Void.self { @@ -1043,12 +1046,12 @@ extension EventLoopFuture { /// - throws: The error value of the `EventLoopFuture` if it errors. @available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()") @preconcurrency - public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value { + public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value where Value: Sendable { try self._wait(file: file, line: line) } @inlinable - public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value where Value: Sendable { + func _wait(file: StaticString = #file, line: UInt = #line) throws -> Value where Value: Sendable { self.eventLoop._preconditionSafeToWait(file: file, line: line) let v: UnsafeMutableTransferBox?> = .init(nil) From 7f83110050637f92f7789788bad9e3ab62c47a16 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 16 Oct 2024 13:25:19 +0100 Subject: [PATCH 04/10] Pick up some missing annotations --- Sources/NIOCore/EventLoop+Deprecated.swift | 3 ++- Sources/NIOCore/EventLoop.swift | 13 ++++++++----- Sources/NIOCore/EventLoopFuture.swift | 7 ++++--- Sources/NIOCore/NIOScheduledCallback.swift | 6 ++++-- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/Sources/NIOCore/EventLoop+Deprecated.swift b/Sources/NIOCore/EventLoop+Deprecated.swift index e2321ceb74..dc3d356b5e 100644 --- a/Sources/NIOCore/EventLoop+Deprecated.swift +++ b/Sources/NIOCore/EventLoop+Deprecated.swift @@ -23,9 +23,10 @@ extension EventLoop { self.makeFailedFuture(error) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func makeSucceededFuture( + public func makeSucceededFuture( _ value: Success, file: StaticString = #fileID, line: UInt = #line diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index 7456d51ed9..7544c4a10f 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -60,7 +60,7 @@ public struct Scheduled { } } -extension Scheduled: Sendable where T: Sendable {} +extension Scheduled: Sendable {} /// Returned once a task was scheduled to be repeatedly executed on the `EventLoop`. /// @@ -753,7 +753,7 @@ extension EventLoop { /// - returns: An `EventLoopFuture` containing the result of `task`'s execution. @inlinable @preconcurrency - public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { + public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { let promise: EventLoopPromise = makePromise(file: #fileID, line: #line) self.execute { @@ -872,8 +872,9 @@ extension EventLoop { /// - parameters: /// - result: the value that is used by the `EventLoopFuture`. /// - returns: a succeeded `EventLoopFuture`. + @preconcurrency @inlinable - public func makeSucceededFuture(_ value: Success) -> EventLoopFuture { + public func makeSucceededFuture(_ value: Success) -> EventLoopFuture { if Success.self == Void.self { // The as! will always succeed because we previously checked that Success.self == Void.self. return self.makeSucceededVoidFuture() as! EventLoopFuture @@ -887,8 +888,9 @@ extension EventLoop { /// - Parameters: /// - result: The value that is used by the `EventLoopFuture` /// - Returns: A completed `EventLoopFuture`. + @preconcurrency @inlinable - public func makeCompletedFuture(_ result: Result) -> EventLoopFuture { + public func makeCompletedFuture(_ result: Result) -> EventLoopFuture { switch result { case .success(let value): return self.makeSucceededFuture(value) @@ -902,8 +904,9 @@ extension EventLoop { /// - Parameters: /// - body: The function that is used to complete the `EventLoopFuture` /// - Returns: A completed `EventLoopFuture`. + @preconcurrency @inlinable - public func makeCompletedFuture(withResultOf body: () throws -> Success) -> EventLoopFuture { + public func makeCompletedFuture(withResultOf body: () throws -> Success) -> EventLoopFuture { let trans = Result(catching: body) return self.makeCompletedFuture(trans) } diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 85fc1d474d..6b69d0f456 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -26,7 +26,7 @@ import Dispatch /// In particular, note that _run() here continues to obtain and execute lists of callbacks until it completes. /// This eliminates recursion when processing `flatMap()` chains. @usableFromInline -internal struct CallbackList { +internal struct CallbackList: Sendable { @usableFromInline internal typealias Element = @Sendable () -> CallbackList @usableFromInline @@ -423,7 +423,7 @@ public final class EventLoopFuture { /// A EventLoopFuture that has already succeeded @inlinable - internal init(eventLoop: EventLoop, value: Value) { + internal init(eventLoop: EventLoop, value: Value) where Value: Sendable { self.eventLoop = eventLoop self._value = .success(value) self._callbacks = .init() @@ -2063,7 +2063,8 @@ extension Optional { /// to `promise`. /// /// - Parameter promise: The promise to set or cascade to. - public mutating func setOrCascade(to promise: EventLoopPromise?) + @preconcurrency + public mutating func setOrCascade(to promise: EventLoopPromise?) where Wrapped == EventLoopPromise { guard let promise = promise else { return } diff --git a/Sources/NIOCore/NIOScheduledCallback.swift b/Sources/NIOCore/NIOScheduledCallback.swift index 84c9f8d202..c450f0023f 100644 --- a/Sources/NIOCore/NIOScheduledCallback.swift +++ b/Sources/NIOCore/NIOScheduledCallback.swift @@ -90,9 +90,10 @@ public struct NIOScheduledCallback: Sendable { extension EventLoop { // This could be package once we drop Swift 5.8. + @preconcurrency public func _scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) -> NIOScheduledCallback { let task = self.scheduleTask(deadline: deadline) { handler.handleScheduledCallback(eventLoop: self) } task.futureResult.whenFailure { error in @@ -131,10 +132,11 @@ extension EventLoop { /// /// The implementation of this default conformance has been further factored out so we can use it in /// `NIOAsyncTestingEventLoop`, where the use of `wait()` is _less bad_. + @preconcurrency @discardableResult public func scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) -> NIOScheduledCallback { self._scheduleCallback(at: deadline, handler: handler) } From b9b9ea69a91756bc649510fd32e6c1d8165bdec2 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 16 Oct 2024 13:43:51 +0100 Subject: [PATCH 05/10] Formatting --- Sources/NIOCore/EventLoop.swift | 4 +++- .../NIOCore/EventLoopFuture+Deprecated.swift | 2 +- Sources/NIOCore/EventLoopFuture.swift | 22 ++++++++++--------- Tests/NIOPosixTests/EventLoopFutureTest.swift | 2 +- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index 7544c4a10f..3cd440163f 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -906,7 +906,9 @@ extension EventLoop { /// - Returns: A completed `EventLoopFuture`. @preconcurrency @inlinable - public func makeCompletedFuture(withResultOf body: () throws -> Success) -> EventLoopFuture { + public func makeCompletedFuture( + withResultOf body: () throws -> Success + ) -> EventLoopFuture { let trans = Result(catching: body) return self.makeCompletedFuture(trans) } diff --git a/Sources/NIOCore/EventLoopFuture+Deprecated.swift b/Sources/NIOCore/EventLoopFuture+Deprecated.swift index cff384c68b..2e48e7c798 100644 --- a/Sources/NIOCore/EventLoopFuture+Deprecated.swift +++ b/Sources/NIOCore/EventLoopFuture+Deprecated.swift @@ -62,7 +62,7 @@ extension EventLoopFuture { line: UInt = #line, _ callback: @escaping @Sendable (Error) -> EventLoopFuture ) -> EventLoopFuture where Value: Sendable { - return self.flatMapError(callback) + self.flatMapError(callback) } @preconcurrency diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 6b69d0f456..b5b4b4352f 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -945,9 +945,9 @@ extension EventLoopFuture { @preconcurrency @inlinable public func and( - value: OtherValue // TODO: This should be transferring + value: OtherValue // TODO: This should be transferring ) -> EventLoopFuture<(Value, OtherValue)> { - return self.and(EventLoopFuture(eventLoop: self.eventLoop, value: value)) + self.and(EventLoopFuture(eventLoop: self.eventLoop, value: value)) } } @@ -1441,7 +1441,8 @@ extension EventLoopFuture { // in the "futures" to pass their result to the caller for (index, future) in futures.enumerated() { if future.eventLoop.inEventLoop, - let result = future._value { + let result = future._value + { // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. switch result { @@ -1668,7 +1669,8 @@ extension EventLoopFuture { // in the "futures" to pass their result to the caller for (index, future) in futures.enumerated() { if future.eventLoop.inEventLoop, - let result = future._value { + let result = future._value + { // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a // ~30% performance improvement in the case of large arrays where all elements are already fulfilled. switch result { @@ -1784,7 +1786,7 @@ extension EventLoopFuture { public func unwrap( orReplace replacement: NewValue ) -> EventLoopFuture where Value == NewValue? { - return self.map { (value) -> NewValue in + self.map { (value) -> NewValue in guard let value = value else { return replacement } @@ -1894,8 +1896,8 @@ extension EventLoopFuture { @preconcurrency public func whenFailureBlocking( onto queue: DispatchQueue, - _ callbackMayBlock: @escaping @Sendable (Error) -> Void) - { + _ callbackMayBlock: @escaping @Sendable (Error) -> Void + ) { self._whenFailureBlocking(onto: queue, callbackMayBlock) } @usableFromInline typealias WhenFailureBlockingCallback = @Sendable (Error) -> Void @@ -2033,7 +2035,7 @@ public struct _NIOEventLoopFutureIdentifier: Hashable, Sendable { } } -// The promise and future are both unchecked Sendable following the below isolation rules this is safe +// The future is unchecked Sendable following the below isolation rules this is safe // // 1. Receiving the value of the future is always done on the EventLoop of the future, hence // the value is never transferred out of the event loops isolation domain. It only gets transferred @@ -2043,9 +2045,9 @@ public struct _NIOEventLoopFutureIdentifier: Hashable, Sendable { // transfer the value to the promise. This ensures that the value is now isolated to the event loops // isolation domain. Note: Sendable values can always be transferred -extension EventLoopPromise: @unchecked Sendable { } +extension EventLoopPromise: Sendable {} -extension EventLoopFuture: @unchecked Sendable { } +extension EventLoopFuture: @unchecked Sendable {} extension EventLoopPromise where Value == Void { // Deliver a successful result to the associated `EventLoopFuture` object. diff --git a/Tests/NIOPosixTests/EventLoopFutureTest.swift b/Tests/NIOPosixTests/EventLoopFutureTest.swift index cd6fe3610d..d0f7bb3335 100644 --- a/Tests/NIOPosixTests/EventLoopFutureTest.swift +++ b/Tests/NIOPosixTests/EventLoopFutureTest.swift @@ -13,9 +13,9 @@ //===----------------------------------------------------------------------===// import Dispatch +import NIOConcurrencyHelpers import NIOEmbedded import NIOPosix -import NIOConcurrencyHelpers import XCTest @testable import NIOCore From 77da82cebddcae3aefa88f2d9cdcaecbfd989613 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 16 Oct 2024 14:17:05 +0100 Subject: [PATCH 06/10] Get tests passing on older Swifts --- Sources/NIOCore/ChannelPipeline.swift | 12 +++--- .../AsyncChannel/AsyncChannelTests.swift | 4 +- .../NIOPosixTests/NonBlockingFileIOTest.swift | 38 ++++++++++--------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index b3c0ef580f..b2c90c142d 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -457,10 +457,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.completeWith(self.contextSync(handler: handler)) + promise.assumeIsolated().completeWith(self.contextSync(handler: handler)) } else { self.eventLoop.execute { - promise.completeWith(self.contextSync(handler: handler)) + promise.assumeIsolated().completeWith(self.contextSync(handler: handler)) } } @@ -486,10 +486,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.completeWith(self.contextSync(name: name)) + promise.assumeIsolated().completeWith(self.contextSync(name: name)) } else { self.eventLoop.execute { - promise.completeWith(self.contextSync(name: name)) + promise.assumeIsolated().completeWith(self.contextSync(name: name)) } } @@ -519,10 +519,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.completeWith(self._contextSync(handlerType: handlerType)) + promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType)) } else { self.eventLoop.execute { - promise.completeWith(self._contextSync(handlerType: handlerType)) + promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType)) } } diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 7b38e49301..4e4c6a34e0 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -253,7 +253,9 @@ final class AsyncChannelTests: XCTestCase { let strongSentinel: Sentinel? = Sentinel() sentinel = strongSentinel! try await XCTAsyncAssertNotNil( - await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).get() + await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).map { _ in + true + }.get() ) try await channel.writeInbound(strongSentinel!) _ = try await channel.readInbound(as: Sentinel.self) diff --git a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift index 3fbdc71e30..1019b35616 100644 --- a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift +++ b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift @@ -634,34 +634,38 @@ class NonBlockingFileIOTest: XCTestCase { func testFileOpenWorks() throws { let content = "123" try withTemporaryFile(content: content) { (fileHandle, path) -> Void in - let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait() - try fh.withUnsafeFileDescriptor { fd in - XCTAssertGreaterThanOrEqual(fd, 0) - } - XCTAssertTrue(fh.isOpen) - XCTAssertEqual(0, fr.readerIndex) - XCTAssertEqual(3, fr.endIndex) - try fh.close() + try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).flatMapThrowing { vals in + let (fh, fr) = vals + try fh.withUnsafeFileDescriptor { fd in + XCTAssertGreaterThanOrEqual(fd, 0) + } + XCTAssertTrue(fh.isOpen) + XCTAssertEqual(0, fr.readerIndex) + XCTAssertEqual(3, fr.endIndex) + try fh.close() + }.wait() } } func testFileOpenWorksWithEmptyFile() throws { let content = "" try withTemporaryFile(content: content) { (fileHandle, path) -> Void in - let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait() - try fh.withUnsafeFileDescriptor { fd in - XCTAssertGreaterThanOrEqual(fd, 0) - } - XCTAssertTrue(fh.isOpen) - XCTAssertEqual(0, fr.readerIndex) - XCTAssertEqual(0, fr.endIndex) - try fh.close() + try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).flatMapThrowing { vals in + let (fh, fr) = vals + try fh.withUnsafeFileDescriptor { fd in + XCTAssertGreaterThanOrEqual(fd, 0) + } + XCTAssertTrue(fh.isOpen) + XCTAssertEqual(0, fr.readerIndex) + XCTAssertEqual(0, fr.endIndex) + try fh.close() + }.wait() } } func testFileOpenFails() throws { do { - _ = try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).wait() + try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).map { _ in }.wait() XCTFail("should've thrown") } catch let e as IOError where e.errnoCode == ENOTDIR { // OK From 75ad4d70e56f41cd3af1549badc624d02b75142b Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Thu, 17 Oct 2024 16:36:13 +0100 Subject: [PATCH 07/10] Further tweaks --- Sources/NIOCore/EventLoop.swift | 6 ++++-- Sources/NIOCore/EventLoopFuture.swift | 12 ++++++++++-- Sources/NIOCore/NIOScheduledCallback.swift | 3 ++- Sources/NIOEmbedded/AsyncTestingEventLoop.swift | 6 ++++-- Sources/NIOEmbedded/Embedded.swift | 3 ++- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index ddcfcaf030..5732a989be 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -368,20 +368,22 @@ public protocol EventLoop: EventLoopGroup { /// /// - NOTE: Event loops that provide a custom scheduled callback implementation **must** also implement /// `cancelScheduledCallback`. Failure to do so will result in a runtime error. + @preconcurrency @discardableResult func scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback /// Schedule a callback after given time. /// /// - NOTE: Event loops that provide a custom scheduled callback implementation **must** also implement /// `cancelScheduledCallback`. Failure to do so will result in a runtime error. + @preconcurrency @discardableResult func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback /// Cancel a scheduled callback. diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index b5b4b4352f..ab90364407 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -1457,8 +1457,12 @@ extension EventLoopFuture { // value across event loops. future.whenComplete { result in let voidResult = result.map { _ in } - future.eventLoop.execute { + if eventLoop.inEventLoop { processResult(index, voidResult) + } else { + eventLoop.execute { + processResult(index, voidResult) + } } } } @@ -1684,8 +1688,12 @@ extension EventLoopFuture { // value across event loops. future.whenComplete { result in let voidResult = result.map { _ in } - future.eventLoop.execute { + if eventLoop.inEventLoop { processResult(index, voidResult) + } else { + eventLoop.execute { + processResult(index, voidResult) + } } } } diff --git a/Sources/NIOCore/NIOScheduledCallback.swift b/Sources/NIOCore/NIOScheduledCallback.swift index b5ac26431f..b2af415aad 100644 --- a/Sources/NIOCore/NIOScheduledCallback.swift +++ b/Sources/NIOCore/NIOScheduledCallback.swift @@ -141,11 +141,12 @@ extension EventLoop { } /// Default implementation of `scheduleCallback(in amount:handler:)`: calls `scheduleCallback(at deadline:handler:)`. + @preconcurrency @discardableResult @inlinable public func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback { try self.scheduleCallback(at: .now() + amount, handler: handler) } diff --git a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift index 52d56bf9ca..14e5242df0 100644 --- a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift +++ b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift @@ -192,9 +192,10 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { self.scheduleTask(deadline: self.now + `in`, task) } + @preconcurrency public func scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback { /// The default implementation of `scheduledCallback(at:handler)` makes two calls to the event loop because it /// needs to hook the future of the backing scheduled task, which can lead to lost cancellation callbacks when @@ -213,10 +214,11 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { } } + @preconcurrency @discardableResult public func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback { /// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so /// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index a6c2b33b5b..3105f3af85 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -160,10 +160,11 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { scheduleTask(deadline: self._now + `in`, task) } + @preconcurrency @discardableResult public func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) -> NIOScheduledCallback { /// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so /// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline From 28029bf95a0e8ade4cd32a2c077cef49f467929a Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Thu, 17 Oct 2024 17:03:13 +0100 Subject: [PATCH 08/10] Add explanatory documentation --- Sources/NIOCore/Docs.docc/index.md | 1 + .../Docs.docc/loops-futures-concurrency.md | 176 ++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 Sources/NIOCore/Docs.docc/loops-futures-concurrency.md diff --git a/Sources/NIOCore/Docs.docc/index.md b/Sources/NIOCore/Docs.docc/index.md index 65e595732d..13d76894ec 100644 --- a/Sources/NIOCore/Docs.docc/index.md +++ b/Sources/NIOCore/Docs.docc/index.md @@ -15,6 +15,7 @@ More specialized modules provide concrete implementations of many of the abstrac - - +- ### Event Loops and Event Loop Groups diff --git a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md new file mode 100644 index 0000000000..9bbfbda11e --- /dev/null +++ b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md @@ -0,0 +1,176 @@ +# EventLoops, EventLoopFutures, and Swift Concurrency + +This article aims to communicate how NIO's ``EventLoop``s and ``EventLoopFuture``s interact with the Swift 6 +concurrency model, particularly regarding data-race safety. It aims to be a reference for writing correct +concurrent code in the NIO model. + +NIO predates the Swift concurrency model. As a result, several of NIO's concepts are not perfect matches to +the concepts that Swift uses, or have overlapping responsibilities. + +## Isolation domains and executors + +First, a quick recap. The core of Swift 6's data-race safety protection is the concept of an "isolation +domain". Some valuable reading regarding the concept can be found in +[SE-0414 (Region based isolation)](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0414-region-based-isolation.md) +but at a high level an isolation domain can be understood to be a region within which there cannot be +multiple executors executing code at the same time. + +In standard Swift Concurrency, the main boundaries of isolation regions are actors and tasks. Each actor, +including global actors, defines an isolation domain. Additionally, for functions and methods that are +not isolated to an actor, the `Task` within which that code executes defines an isolation domain. Passing +values between these isolation domains requires that these values are either `Sendable` (safe to hold in +multiple domains), or that the `sending` keyword is used to force the value to be passed from one domain +to another. + +A related concept to an "isolation domain" is an "executor". Again, useful reading can be found in +[SE-0392 (Custom actor executors)](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0392-custom-actor-executors.md). +At a high level, an executor is simply an object that is capable of executing Swift `Task`s. Executors can be +concurrent, or they can be serial. Serial executors are the most common, as they can be used to back an +actor. + +## Event Loops + +NIO's core execution primitive is the ``EventLoop``. An ``EventLoop`` is fundamentally nothing more than +a Swift Concurrency Serial Executor that can also perform I/O operations directly. Indeed, NIO's +``EventLoop``s can be exposed as serial executors, using ``EventLoop/executor``. This provides a mechanism +to protect actor-isolated state using a NIO event-loop. With [the introduction of task executors](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0417-task-executor-preference.md), +future versions of SwiftNIO will also be able to offer their event loops for individual `Task`s to execute +on as well. + +In a Swift 6 world, it is possible that these would be the API that NIO offered to execute tasks on the +loop. However, as NIO predates Swift 6, it also offers its own set of APIs to enqueue work. This includes +(but is not limited to): + +- ``EventLoop/execute(_:)`` +- ``EventLoop/submit(_:)`` +- ``EventLoop/scheduleTask(in:_:)`` +- ``EventLoop/scheduleRepeatedTask(initialDelay:delay:notifying:_:)`` +- ``EventLoop/scheduleCallback(at:handler:)-2xm6l`` + +The existence of these APIs requires us to also ask the question of where the submitted code executes. The +answer is that the submitted code executes on the event loop (or, in Swift Concurrency terms, on the +executor provided by the event loop). + +As the event loop only ever executes a single item of work (either an `async` function or one of the +closures above) at a time, it is a _serial_ executor. It also provides an _isolation domain_: code +submitted to a given `EventLoop` never runs in parallel with other code submitted to the same loop. + +The result here is that a all closures passed into the event loop to do work must be transferred +in: they may not be kept hold of outside of the event loop. That means they must be sent using +the `sending` keyword. + +> Note: As of the current 2.75.0 release, NIO enforces the stricter requirement that these closures + are `@Sendable`. This is not a long-term position, but reflects the need to continue + to support Swift 5 code which requires this stricter standard. In a future release of + SwiftNIO we expect to relax this constraint: if you need this relaxed constraint + then please file an issue. + +## Event loop futures + +In Swift NIO the most common mechanism to arrange a series of asynchronous work items is +_not_ to queue up a series of ``EventLoop/execute(_:)`` calls. Instead, users typically +use ``EventLoopFuture``. + +``EventLoopFuture`` has some extensive semantics documented in its API documentation. The +most important principal for this discussion is that all callbacks added to an +``EventLoopFuture`` will execute on the ``EventLoop`` to which that ``EventLoopFuture`` is +bound. By extension, then, all callbacks added to an ``EventLoopFuture`` execute on the same +executor (the ``EventLoop``) in the same isolation domain. + +The analogy to an actor here is hopefully fairly clear. Conceptually, an ``EventLoopFuture`` +could be modelled as an actor. That means all the callbacks have the same logical semantics: +the ``EventLoopFuture`` defines an isolation domain, and all the callbacks are `sent` into the +isolation domain. To that end, all the callback-taking APIs require that the callback is sent +using `sending` into the ``EventLoopFuture``. + +> Note: As of the current 2.75.0 release, NIO enforces the stricter requirement that these callbacks + are `@Sendable`. This is not a long-term position, but reflects the need to continue + to support Swift 5 code which requires this stricter standard. In a future release of + SwiftNIO we expect to relax this constraint: if you need this relaxed constraint + then please file an issue. + +Unlike ``EventLoop``s, however, ``EventLoopFuture``s also have value-receiving and value-taking +APIs. This is because ``EventLoopFuture``s pass a value along to their various callbacks, and +so need to be both given an initial value (via an ``EventLoopPromise``) and in some cases to +extract that value from the ``EventLoopFuture`` wrapper. + +This implies that ``EventLoopPromise``'s various success functions +(_and_ ``EventLoop/makeSucceededFuture(_:)``) need to take their value as `sending`. The value +is potentially sent from its current isolation domain into the ``EventLoop``, which will require +that the value is safe to move. + +> Note: As of the current 2.75.0 release, NIO enforces the stricter requirement that these values + are `Sendable`. This is not a long-term position, but reflects the need to continue + to support Swift 5 code which requires this stricter standard. In a future release of + SwiftNIO we expect to relax this constraint: if you need this relaxed constraint + then please file an issue. + +There are also a few ways to extract a value, such as ``EventLoopFuture/wait(file:line:)`` +and ``EventLoopFuture/get()``. These APIs can only safely be called when the ``EventLoopFuture`` +is carrying a `Sendable` value. This is because ``EventLoopFuture``s hold on to their value and +can give it to other closures or other callers of `get` and `wait`. Thus, `sending` is not +sufficient. + +## Combining Futures + +NIO provides a number of APIs for combining futures, such as ``EventLoopFuture/and(_:)``. +This potentially represents an issue, as two futures may not share the same isolation domain. +As a result, we can only safely call these combining functions when the ``EventLoopFuture`` +values are `Sendable`. + +> Note: We can conceptually relax this constraint somewhat by offering equivalent + functions that can only safely be called when all the combined futures share the + same bound event loop: that is, when they are all within the same isolation domain. + + This can be enforced with runtime isolation checks. If you have a need for these + functions, please reach out to the NIO team. + +## Interacting with Futures on the Event Loop + +In a number of contexts (such as in ``ChannelHandler``s), the programmer has static knowledge +that they are within an isolation domain. That isolation domain may well be shared with the +isolation domain of many futures and promises with which they interact. For example, +futures that are provided from ``ChannelHandlerContext/write(_:promise:)`` will be bound to +the event loop on which the ``ChannelHandler`` resides. + +In this context, the `sending` constraint is unnecessarily strict. The future callbacks are +guaranteed to fire on the same isolation domain as the ``ChannelHandlerContext``: no risk +of data race is present. However, Swift Concurrency cannot guarantee this at compile time, +as the specific isolation domain is determined only at runtime. + +In these contexts, today users can make their callbacks safe using ``NIOLoopBound`` and +``NIOLoopBoundBox``. These values can only be constructed on the event loop, and only allow +access to their values on the same event loop. These constraints are enforced at runtime, +so at compile time these are unconditionally `Sendable`. + +> Warning: ``NIOloopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks + with runtime ones. This makes it possible to introduce crashes in your code. Please + ensure that you are 100% confident that the isolation domains align. If you are not + sure that the ``EventLoopFuture`` you wish to attach a callback to is bound to your + ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain + before using these types. + +> Note: In a future NIO release we intend to improve the ergonomics of this common problem + by offering a related type that can only be created from an ``EventLoopFuture`` on a + given ``EventLoop``. This minimises the number of runtime checks, and will make it + easier and more pleasant to write this kind of code. + +## Interacting with Event Loops on the Event Loop + +As with Futures, there are occasionally times where it is necessary to schedule +``EventLoop`` operations on the ``EventLoop`` where your code is currently executing. + +Much like with ``EventLoopFuture``, you can use ``NIOLoopBound`` and ``NIOLoopBoundBox`` +to make these callbacks safe. + +> Warning: ``NIOloopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks + with runtime ones. This makes it possible to introduce crashes in your code. Please + ensure that you are 100% confident that the isolation domains align. If you are not + sure that the ``EventLoopFuture`` you wish to attach a callback to is bound to your + ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain + before using these types. + +> Note: In a future NIO release we intend to improve the ergonomics of this common problem + by offering a related type that can only be created from an ``EventLoopFuture`` on a + given ``EventLoop``. This minimises the number of runtime checks, and will make it + easier and more pleasant to write this kind of code. From dacc452d596aa90a6f2b13110ee786ae9dab79ef Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Thu, 17 Oct 2024 19:30:56 +0100 Subject: [PATCH 09/10] Formatting --- Sources/NIOCore/Docs.docc/loops-futures-concurrency.md | 2 +- Tests/NIOPosixTests/NonBlockingFileIOTest.swift | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md index 9bbfbda11e..1aae6312b5 100644 --- a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md +++ b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md @@ -143,7 +143,7 @@ In these contexts, today users can make their callbacks safe using ``NIOLoopBoun access to their values on the same event loop. These constraints are enforced at runtime, so at compile time these are unconditionally `Sendable`. -> Warning: ``NIOloopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks +> Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks with runtime ones. This makes it possible to introduce crashes in your code. Please ensure that you are 100% confident that the isolation domains align. If you are not sure that the ``EventLoopFuture`` you wish to attach a callback to is bound to your diff --git a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift index 1019b35616..16bbfdd496 100644 --- a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift +++ b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift @@ -665,7 +665,9 @@ class NonBlockingFileIOTest: XCTestCase { func testFileOpenFails() throws { do { - try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).map { _ in }.wait() + try self.fileIO.openFile( + path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop + ).map { _ in }.wait() XCTFail("should've thrown") } catch let e as IOError where e.errnoCode == ENOTDIR { // OK From c41d45d14b5cd2e0286693a5013873d2cb321808 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Thu, 17 Oct 2024 20:46:58 +0100 Subject: [PATCH 10/10] More formatting --- Sources/NIOCore/Docs.docc/loops-futures-concurrency.md | 2 +- Tests/NIOPosixTests/NonBlockingFileIOTest.swift | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md index 1aae6312b5..17399cd6d0 100644 --- a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md +++ b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md @@ -163,7 +163,7 @@ As with Futures, there are occasionally times where it is necessary to schedule Much like with ``EventLoopFuture``, you can use ``NIOLoopBound`` and ``NIOLoopBoundBox`` to make these callbacks safe. -> Warning: ``NIOloopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks +> Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks with runtime ones. This makes it possible to introduce crashes in your code. Please ensure that you are 100% confident that the isolation domains align. If you are not sure that the ``EventLoopFuture`` you wish to attach a callback to is bound to your diff --git a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift index 16bbfdd496..405f7e84b7 100644 --- a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift +++ b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift @@ -666,7 +666,8 @@ class NonBlockingFileIOTest: XCTestCase { func testFileOpenFails() throws { do { try self.fileIO.openFile( - path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop + path: "/dev/null/this/does/not/exist", + eventLoop: self.eventLoop ).map { _ in }.wait() XCTFail("should've thrown") } catch let e as IOError where e.errnoCode == ENOTDIR {