Skip to content

Commit

Permalink
allow setting MTELG.singleton as Swift Concurrency executor (#2564)
Browse files Browse the repository at this point in the history
  • Loading branch information
weissi authored Jan 2, 2024
1 parent 1445dca commit 5c668eb
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 10 deletions.
33 changes: 32 additions & 1 deletion Sources/NIOCrashTester/CrashTests+EventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if !canImport(Darwin) || os(macOS)
import Dispatch
import NIOCore
import NIOPosix

Expand Down Expand Up @@ -177,5 +179,34 @@ struct EventLoopCrashTests {
) {
NIOSingletons.groupLoopCountSuggestion = -1
}

#if compiler(>=5.9) // We only support Concurrency executor take-over on 5.9+
let testInstallingSingletonMTELGAsConcurrencyExecutorWorksButOnlyOnce = CrashTest(
regex: #"Fatal error: Must be called only once"#
) {
guard NIOSingletons.unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor() else {
print("Installation failed, that's unexpected -> let's not crash")
return
}

// Yes, this pattern is bad abuse but this is a crash test, we don't mind.
let semaphoreAbuse = DispatchSemaphore(value: 0)
if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
Task {
precondition(MultiThreadedEventLoopGroup.currentEventLoop != nil)
try await Task.sleep(nanoseconds: 123)
precondition(MultiThreadedEventLoopGroup.currentEventLoop != nil)
semaphoreAbuse.signal()
}
} else {
semaphoreAbuse.signal()
}
semaphoreAbuse.wait()
print("Okay, worked")

// This should crash
_ = NIOSingletons.unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()
}
#endif // compiler(>=5.9)
}
#endif
#endif // !canImport(Darwin) || os(macOS)
6 changes: 3 additions & 3 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ extension MultiThreadedEventLoopGroup: CustomStringConvertible {
}
}

#if swift(>=5.9)
#if compiler(>=5.9)
@usableFromInline
struct ErasedUnownedJob {
@usableFromInline
Expand All @@ -427,7 +427,7 @@ internal struct ScheduledTask {
@usableFromInline
enum UnderlyingTask {
case function(() -> Void)
#if swift(>=5.9)
#if compiler(>=5.9)
case unownedJob(ErasedUnownedJob)
#endif
}
Expand All @@ -452,7 +452,7 @@ internal struct ScheduledTask {
self.readyTime = time
}

#if swift(>=5.9)
#if compiler(>=5.9)
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
@usableFromInline
init(id: UInt64, job: consuming ExecutorJob, readyTime: NIODeadline) {
Expand Down
123 changes: 123 additions & 0 deletions Sources/NIOPosix/PosixSingletons+ConcurrencyTakeOver.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics
import NIOCore

#if compiler(>=5.9)
private protocol SilenceWarning {
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
func enqueue(_ job: UnownedJob)
}
@available(macOS 14, *)
extension SelectableEventLoop: SilenceWarning {}
#endif

private let _haveWeTakenOverTheConcurrencyPool = ManagedAtomic(false)
extension NIOSingletons {
/// Install ``MultiThreadedEventLoopGroup/singleton`` as Swift Concurrency's global executor.
///
/// This allows to use Swift Concurrency and retain high-performance I/O alleviating the otherwise necessary thread switches between
/// Swift Concurrency's own global pool and a place (like an `EventLoop`) that allows to perform I/O
///
/// This method uses an atomic compare and exchange to install the hook (and makes sure it's not already set). This unilateral atomic memory
/// operation doesn't guarantee anything because another piece of code could have done the same without using atomic operations. But we
/// do our best.
///
/// - warning: You may only call this method from the main thread.
/// - warning: You may only call this method once.
@discardableResult
public static func unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor() -> Bool {
#if /* minimum supported */ compiler(>=5.9) && /* maximum tested */ swift(<5.11)
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
return false
}

typealias ConcurrencyEnqueueGlobalHook = @convention(thin) (
UnownedJob, @convention(thin) (UnownedJob) -> Void
) -> Void

guard
_haveWeTakenOverTheConcurrencyPool.compareExchange(
expected: false,
desired: true,
ordering: .relaxed
).exchanged
else {
fatalError("Must be called only once")
}

#if canImport(Darwin)
guard pthread_main_np() == 1 else {
fatalError("Must be called from the main thread")
}
#endif

// Unsafe 1: We pretend that the hook's type is actually fully equivalent to `ConcurrencyEnqueueGlobalHook`
// @convention(thin) (UnownedJob, @convention(thin) (UnownedJob) -> Void) -> Void
// which isn't formally guaranteed.
let concurrencyEnqueueGlobalHookPtr = dlsym(
dlopen(nil, RTLD_NOW),
"swift_task_enqueueGlobal_hook"
)?.assumingMemoryBound(to: UnsafeRawPointer?.AtomicRepresentation.self)
guard let concurrencyEnqueueGlobalHookPtr = concurrencyEnqueueGlobalHookPtr else {
return false
}

// We will use an atomic operation to swap the pointers aiming to protect against other code that attempts
// to swap the pointer. This isn't guaranteed to work as we can't be sure that the other code will actually
// use atomic compares and exchanges to. Nevertheless, we're doing our best.
let concurrencyEnqueueGlobalHookAtomic = UnsafeAtomic<UnsafeRawPointer?>(at: concurrencyEnqueueGlobalHookPtr)
// note: We don't need to destroy this atomic as we're borrowing the storage from `dlsym`.

return withUnsafeTemporaryAllocation(
of: ConcurrencyEnqueueGlobalHook.self,
capacity: 1
) { enqueueOnNIOPtr -> Bool in
// Unsafe 2: We mandate that we're actually getting _the_ function pointer to the closure below which
// isn't formally guaranteed by Swift.
enqueueOnNIOPtr.baseAddress!.initialize(to: { job, _ in
// This formally picks a random EventLoop from the singleton group. However, `EventLoopGroup.any()`
// attempts to be sticky. So if we're already in an `EventLoop` that's part of the singleton
// `EventLoopGroup`, we'll get that one and be very fast (avoid a thread switch).
let targetEL = MultiThreadedEventLoopGroup.singleton.any()

(targetEL.executor as! any SilenceWarning).enqueue(job)
})

// Unsafe 3: We mandate that the function pointer can be reinterpreted as `UnsafeRawPointer` which isn't
// formally guaranteed by Swift
return enqueueOnNIOPtr.baseAddress!.withMemoryRebound(
to: UnsafeRawPointer.self,
capacity: 1
) { enqueueOnNIOPtr in
// Unsafe 4: We just pretend that we're the only ones in the world pulling this trick (or at least
// that the others also use a `compareExchange`)...
guard concurrencyEnqueueGlobalHookAtomic.compareExchange(
expected: nil,
desired: enqueueOnNIOPtr.pointee,
ordering: .relaxed
).exchanged else {
return false
}

// nice, everything worked.
return true
}
}
#else
return false
#endif
}
}
4 changes: 2 additions & 2 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ Further information:
}, .now()))
}

#if swift(>=5.9)
#if compiler(>=5.9)
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
@usableFromInline
func enqueue(_ job: consuming ExecutorJob) {
Expand Down Expand Up @@ -533,7 +533,7 @@ Further information:
case .function(let function):
function()

#if swift(>=5.9)
#if compiler(>=5.9)
case .unownedJob(let erasedUnownedJob):
if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
erasedUnownedJob.unownedJob.runSynchronously(on: self.asUnownedSerialExecutor())
Expand Down
3 changes: 2 additions & 1 deletion Sources/NIOTCPEchoClient/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if swift(>=5.9)

#if compiler(>=5.9)
import NIOCore
import NIOPosix

Expand Down
3 changes: 2 additions & 1 deletion Sources/NIOTCPEchoServer/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if swift(>=5.9)

#if compiler(>=5.9)
import NIOCore
import NIOPosix

Expand Down
3 changes: 2 additions & 1 deletion Sources/NIOWebSocketClient/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if (!canImport(Darwin) && swift(>=5.9)) || (canImport(Darwin) && swift(>=5.10))

#if (!canImport(Darwin) && compiler(>=5.9)) || (canImport(Darwin) && compiler(>=5.10))
import NIOCore
import NIOPosix
import NIOHTTP1
Expand Down
3 changes: 2 additions & 1 deletion Sources/NIOWebSocketServer/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if (!canImport(Darwin) && swift(>=5.9)) || (canImport(Darwin) && swift(>=5.10))

#if (!canImport(Darwin) && compiler(>=5.9)) || (canImport(Darwin) && compiler(>=5.10))
import NIOCore
import NIOPosix
import NIOHTTP1
Expand Down

0 comments on commit 5c668eb

Please sign in to comment.