Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TaskExecutor conformance to EventLoops #2732

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,32 @@ private let eventLoop = MultiThreadedEventLoopGroup.singleton.next()
let benchmarks = {
let defaultMetrics: [BenchmarkMetric] = [
.mallocCountTotal,
.cpuTotal,
.contextSwitches
]

Benchmark(
"TCPEcho",
"TCPEcho pure NIO 1M times",
configuration: .init(
metrics: defaultMetrics,
timeUnits: .milliseconds,
scalingFactor: .mega
scalingFactor: .one
)
) { benchmark in
try runTCPEcho(
numberOfWrites: benchmark.scaledIterations.upperBound,
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}

Benchmark(
"TCPEchoAsyncChannel pure async/await 1M times",
configuration: .init(
metrics: defaultMetrics,
scalingFactor: .one
)
) { benchmark in
try await runTCPEchoAsyncChannel(
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}
Expand All @@ -40,11 +54,10 @@ let benchmarks = {
// to serial executor is also gated behind 5.9.
#if compiler(>=5.9)
Benchmark(
"TCPEchoAsyncChannel",
"TCPEchoAsyncChannel using globalHook 1M times",
configuration: .init(
metrics: defaultMetrics,
timeUnits: .milliseconds,
scalingFactor: .mega,
scalingFactor: .one,
// We are expecting a bit of allocation variance due to an allocation
// in the Concurrency runtime which happens when resuming a continuation.
thresholds: [.mallocCountTotal: .init(absolute: [.p90: 2000])],
Expand All @@ -59,9 +72,31 @@ let benchmarks = {
)
) { benchmark in
try await runTCPEchoAsyncChannel(
numberOfWrites: benchmark.scaledIterations.upperBound,
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}
#endif

#if compiler(>=6.0)
if #available(macOS 15.0, *) {
Benchmark(
"TCPEchoAsyncChannel using task executor preference 1M times",
configuration: .init(
metrics: defaultMetrics,
scalingFactor: .one
// We are expecting a bit of allocation variance due to an allocation
// in the Concurrency runtime which happens when resuming a continuation.
// thresholds: [.mallocCountTotal: .init(absolute: [.p90: 2000])]
)
) { benchmark in
try await withTaskExecutorPreference(eventLoop.taskExecutor) {
try await runTCPEchoAsyncChannel(
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}
}
}
#endif
}
38 changes: 34 additions & 4 deletions Sources/NIOCore/EventLoop+SerialExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ extension NIOSerialEventLoopExecutor {
/// This type is not recommended for use because it risks problems with unowned
/// executors. Adopters are recommended to conform their own event loop
/// types to `SerialExecutor`.
final class NIODefaultSerialEventLoopExecutor {
final class NIODefaultEventLoopExecutor {
@usableFromInline
let loop: EventLoop

Expand All @@ -62,7 +62,7 @@ final class NIODefaultSerialEventLoopExecutor {
}

@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
extension NIODefaultSerialEventLoopExecutor: SerialExecutor {
extension NIODefaultEventLoopExecutor: SerialExecutor {
@inlinable
public func enqueue(_ job: consuming ExecutorJob) {
self.loop.enqueue(job)
Expand All @@ -71,12 +71,42 @@ extension NIODefaultSerialEventLoopExecutor: SerialExecutor {
@inlinable
public func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(complexEquality: self)

}

@inlinable
public func isSameExclusiveExecutionContext(other: NIODefaultSerialEventLoopExecutor) -> Bool {
public func isSameExclusiveExecutionContext(other: NIODefaultEventLoopExecutor) -> Bool {
self.loop === other.loop
}
}
#endif

#if compiler(>=6.0)
/// A helper protocol that can be mixed in to a NIO ``EventLoop`` to provide an
/// automatic conformance to `TaskExecutor`.
///
/// Implementers of `EventLoop` should consider conforming to this protocol as
/// well on Swift 6.0 and later.
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
public protocol NIOTaskEventLoopExecutor: NIOSerialEventLoopExecutor & TaskExecutor { }

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension NIOTaskEventLoopExecutor {
@inlinable
func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}

@inlinable
public var taskExecutor: any TaskExecutor {
self
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension NIODefaultEventLoopExecutor: TaskExecutor {
@inlinable
public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}
}
#endif
9 changes: 8 additions & 1 deletion Sources/NIOCore/EventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ extension EventLoop {
#if compiler(>=5.9)
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
public var executor: any SerialExecutor {
NIODefaultSerialEventLoopExecutor(self)
NIODefaultEventLoopExecutor(self)
}

@inlinable
Expand All @@ -398,6 +398,13 @@ extension EventLoop {
}
}
#endif

#if compiler(>=6.0)
@available(macOS 15.0, iOS 9999.0, watchOS 9999.0, tvOS 9999.0, *)
public var taskExecutor: any TaskExecutor {
NIODefaultEventLoopExecutor(self)
}
#endif
}

extension EventLoopGroup {
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIOEmbedded/AsyncTestingEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable {
extension NIOAsyncTestingEventLoop: NIOSerialEventLoopExecutor { }
#endif

// MARK: TaskExecutor conformance
#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension NIOAsyncTestingEventLoop: NIOTaskEventLoopExecutor { }
#endif

/// This is a thread-safe promise creation store.
///
/// We use this to keep track of where promises come from in the `NIOAsyncTestingEventLoop`.
Expand Down
7 changes: 7 additions & 0 deletions Sources/NIOEmbedded/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ public final class EmbeddedEventLoop: EventLoop {
fatalError("EmbeddedEventLoop is not thread safe and cannot be used as a SerialExecutor. Use NIOAsyncTestingEventLoop instead.")
}
#endif

#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
public var taskExecutor: any TaskExecutor {
fatalError("EmbeddedEventLoop is not thread safe and cannot be used as a TaskExecutor. Use NIOAsyncTestingEventLoop instead.")
}
#endif
}

@usableFromInline
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -883,3 +883,9 @@ internal func assertExpression(_ body: () -> Bool) {
return body()
}())
}

// MARK: TaskExecutor conformance
#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension SelectableEventLoop: NIOTaskEventLoopExecutor { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add conformance for AsyncTestingEventLoop and a fatalErroring conformance for EmbeddedEventLoop?

#endif
108 changes: 108 additions & 0 deletions Tests/NIOPosixTests/TaskExecutorTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOEmbedded
import NIOPosix
import XCTest

final class TaskExecutorTests: XCTestCase {

#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func _runTests(loop1: some EventLoop, loop2: some EventLoop) async {
await withTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask(executorPreference: loop1.taskExecutor) {
loop1.assertInEventLoop()
loop2.assertNotInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, loop1.taskExecutor.asUnownedTaskExecutor())
}
}

taskGroup.addTask(executorPreference: loop2.taskExecutor) {
loop1.assertNotInEventLoop()
loop2.assertInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, loop2.taskExecutor.asUnownedTaskExecutor())
}
}
}

let task = Task(executorPreference: loop1.taskExecutor) {
loop1.assertInEventLoop()
loop2.assertNotInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, loop1.taskExecutor.asUnownedTaskExecutor())
}
}

await task.value
}
#endif

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func testSelectableEventLoopAsTaskExecutor() async throws {
#if compiler(>=6.0)
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
try! group.syncShutdownGracefully()
}
var iterator = group.makeIterator()
let loop1 = iterator.next()!
let loop2 = iterator.next()!

await self._runTests(loop1: loop1, loop2: loop2)
Copy link
Member Author

@fabianfett fabianfett May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test currently crashes:

Thread 4 Crashed:: NIO-ELT-0-#0
0   libswiftCore.dylib            	       0x19a319644 swift_getObjectType + 88
1   libswift_Concurrency.dylib    	       0x2617ea0e4 swift_task_enqueueImpl(swift::Job*, swift::SerialExecutorRef) + 160
2   libswift_Concurrency.dylib    	       0x2617ec890 swift::AsyncTask::flagAsAndEnqueueOnExecutor(swift::SerialExecutorRef) + 432
3   libswift_Concurrency.dylib    	       0x2617eacdc swift_task_switchImpl(swift::AsyncContext*, void (swift::AsyncContext* swift_async_context) swiftasynccall*, swift::SerialExecutorRef) + 640
4   swift-nioPackageTests         	       0x1080d5ca1 partial apply for closure #1 in closure #1 in TaskExecutorTests._runTests<A, B>(loop1:loop2:) + 1
5   swift-nioPackageTests         	       0x1080d4d3d thunk for @escaping @isolated(any) @callee_guaranteed @Sendable @async () -> (@out A) + 1
6   swift-nioPackageTests         	       0x1080d4e99 partial apply for thunk for @escaping @isolated(any) @callee_guaranteed @Sendable @async () -> (@out A) + 1
7   libswift_Concurrency.dylib    	       0x2617f1921 completeTaskWithClosure(swift::AsyncContext*, swift::SwiftError*) + 1

Why do we call swift_task_switchImpl with a SerialExecutorRef?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch is what checks if it can "switch" without releasing actor locks; it won't be able to switch when there's a task executor. I'll look into reproducing this though with a debug runtime to check.

#endif
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func testAsyncTestingEventLoopAsTaskExecutor() async throws {
#if compiler(>=6.0)
let loop1 = NIOAsyncTestingEventLoop()
let loop2 = NIOAsyncTestingEventLoop()
defer {
try? loop1.syncShutdownGracefully()
try? loop2.syncShutdownGracefully()
}

await withTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask(executorPreference: loop1.taskExecutor) {
loop1.assertInEventLoop()
loop2.assertNotInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, loop1.taskExecutor.asUnownedTaskExecutor())
}
}

taskGroup.addTask(executorPreference: loop2) {
loop1.assertNotInEventLoop()
loop2.assertInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, loop2.taskExecutor.asUnownedTaskExecutor())
}
}
}
#endif
}
}