Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Make NIOFileDescriptor/FileRegion/IOData Sendable #2598

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 154 additions & 20 deletions Sources/NIOCore/FileHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics
#if os(Windows)
import ucrt
#elseif canImport(Darwin)
Expand All @@ -29,6 +31,60 @@ public typealias NIOPOSIXFileMode = CInt
public typealias NIOPOSIXFileMode = mode_t
#endif

internal struct FileDescriptorState {
private static let closedValue: UInt = 0xdead
private static let inUseValue: UInt = 0xbeef
private static let openValue: UInt = 0xcafe
internal var rawValue: DoubleWord

internal init(rawValue: DoubleWord) {
self.rawValue = rawValue
}

internal init(descriptor: CInt) {
self.rawValue = DoubleWord(
first: UInt(truncatingIfNeeded: CUnsignedInt(bitPattern: descriptor)),
second: Self.openValue
)
}

internal var descriptor: CInt {
get {
return CInt(bitPattern: UInt32(truncatingIfNeeded: self.rawValue.first))
}
set {
self.rawValue.first = UInt(truncatingIfNeeded: CUnsignedInt(bitPattern: newValue))
}
}

internal var isOpen: Bool {
return self.rawValue.second == Self.openValue
}

internal var isInUse: Bool {
return self.rawValue.second == Self.inUseValue
}

internal var isClosed: Bool {
return self.rawValue.second == Self.closedValue
}

mutating func close() {
assert(self.isOpen)
self.rawValue.second = Self.closedValue
}

mutating func markInUse() {
assert(self.isOpen)
self.rawValue.second = Self.inUseValue
}

mutating func markNotInUse() {
assert(self.rawValue.second == Self.inUseValue)
self.rawValue.second = Self.openValue
}
}

/// A `NIOFileHandle` is a handle to an open file.
///
/// When creating a `NIOFileHandle` it takes ownership of the underlying file descriptor. When a `NIOFileHandle` is no longer
Expand All @@ -39,19 +95,51 @@ public typealias NIOPOSIXFileMode = mode_t
/// - warning: Failing to manage the lifetime of a `NIOFileHandle` correctly will result in undefined behaviour.
///
/// - warning: `NIOFileHandle` objects are not thread-safe and are mutable. They also cannot be fully thread-safe as they refer to a global underlying file descriptor.
public final class NIOFileHandle: FileDescriptor {
public private(set) var isOpen: Bool
private let descriptor: CInt
public final class NIOFileHandle: FileDescriptor & Sendable {
private static let descriptorClosed: CInt = CInt.min
private let descriptor: UnsafeAtomic<DoubleWord>

public var isOpen: Bool {
return FileDescriptorState(
rawValue: self.descriptor.load(ordering: .sequentiallyConsistent)
).isOpen
}

private static func interpretDescriptorValueThrowIfNotOpen(
_ descriptor: DoubleWord,
applyCloseInUseException: Bool
) throws -> FileDescriptorState {
let descriptorState = FileDescriptorState(rawValue: descriptor)
if descriptorState.isOpen {
return descriptorState
} else if descriptorState.isClosed {
throw IOError(errnoCode: EBADF, reason: "can't close file (as it's not open anymore).")
} else {
if applyCloseInUseException {
return descriptorState
} else {
throw IOError(errnoCode: EBUSY, reason: "file descriptor currently in use")
}
}
}

private func peekAtDescriptorIfOpen(applyCloseInUseException: Bool) throws -> FileDescriptorState {
let descriptor = self.descriptor.load(ordering: .relaxed)
return try Self.interpretDescriptorValueThrowIfNotOpen(
descriptor,
applyCloseInUseException: applyCloseInUseException
)
}

/// Create a `NIOFileHandle` taking ownership of `descriptor`. You must call `NIOFileHandle.close` or `NIOFileHandle.takeDescriptorOwnership` before
/// this object can be safely released.
public init(descriptor: CInt) {
self.descriptor = descriptor
self.isOpen = true
self.descriptor = UnsafeAtomic.create(FileDescriptorState(descriptor: descriptor).rawValue)
}

deinit {
assert(!self.isOpen, "leaked open NIOFileHandle(descriptor: \(self.descriptor)). Call `close()` to close or `takeDescriptorOwnership()` to take ownership and close by some other means.")
self.descriptor.destroy()
}

/// Duplicates this `NIOFileHandle`. This means that a new `NIOFileHandle` object with a new underlying file descriptor
Expand All @@ -66,34 +154,80 @@ public final class NIOFileHandle: FileDescriptor {
}
}

private func activateDescriptor(as descriptor: CInt) {
let desired = FileDescriptorState(descriptor: descriptor)
var expected = desired
expected.markInUse()
let (exchanged, original) = self.descriptor.compareExchange(
expected: expected.rawValue,
desired: desired.rawValue,
ordering: .sequentiallyConsistent
)
guard exchanged || FileDescriptorState(rawValue: original).isClosed else {
fatalError("bug in NIO (please report): NIOFileDescritor activate failed \(original)")
}
}

private func deactivateDescriptor(toClosed: Bool) throws -> CInt {
let peekedDescriptor = try self.peekAtDescriptorIfOpen(applyCloseInUseException: toClosed)
assert(peekedDescriptor.isOpen || peekedDescriptor.isInUse)
var desired = peekedDescriptor
if toClosed {
if desired.isInUse {
desired.markNotInUse()
}
desired.close()
} else {
desired.markInUse()
}
let (exchanged, originalDescriptor) = self.descriptor.compareExchange(
expected: peekedDescriptor.rawValue,
desired: desired.rawValue,
ordering: .sequentiallyConsistent
)

if exchanged {
assert(peekedDescriptor.rawValue == originalDescriptor)
return peekedDescriptor.descriptor
} else {
let fauxDescriptor = try Self.interpretDescriptorValueThrowIfNotOpen(
originalDescriptor,
applyCloseInUseException: toClosed
)
// This is impossible, because there are only 4 options in which the exchange above can fail
// 1. Descriptor already closed (would've thrown above)
// 2. Descriptor in use (would've thrown above)
// 3. Descriptor at illegal negative value (would've crashed above)
// 4. Descriptor a different, positive value (this is where we're at) --> memory corruption, let's crash
fatalError("""
bug in NIO (please report): \
NIOFileDescriptor illegal state \
(\(peekedDescriptor), \(originalDescriptor), \(fauxDescriptor))")
""")
}
}

/// Take the ownership of the underlying file descriptor. This is similar to `close()` but the underlying file
/// descriptor remains open. The caller is responsible for closing the file descriptor by some other means.
///
/// After calling this, the `NIOFileHandle` cannot be used for anything else and all the operations will throw.
///
/// - returns: The underlying file descriptor, now owned by the caller.
public func takeDescriptorOwnership() throws -> CInt {
guard self.isOpen else {
throw IOError(errnoCode: EBADF, reason: "can't close file (as it's not open anymore).")
}

self.isOpen = false
return self.descriptor
return try self.deactivateDescriptor(toClosed: true)
}

public func close() throws {
try withUnsafeFileDescriptor { fd in
try SystemCalls.close(descriptor: fd)
}

self.isOpen = false
let descriptor = try self.deactivateDescriptor(toClosed: true)
try SystemCalls.close(descriptor: descriptor)
}

public func withUnsafeFileDescriptor<T>(_ body: (CInt) throws -> T) throws -> T {
guard self.isOpen else {
throw IOError(errnoCode: EBADF, reason: "file descriptor already closed!")
let descriptor = try self.deactivateDescriptor(toClosed: false)
defer {
self.activateDescriptor(as: descriptor)
}
return try body(self.descriptor)
return try body(descriptor)
}
}

Expand Down Expand Up @@ -186,6 +320,6 @@ extension NIOFileHandle {

extension NIOFileHandle: CustomStringConvertible {
public var description: String {
return "FileHandle { descriptor: \(self.descriptor) }"
return "FileHandle { descriptor: \(FileDescriptorState(rawValue: self.descriptor.load(ordering: .relaxed)).descriptor) }"
}
}
2 changes: 1 addition & 1 deletion Sources/NIOCore/FileRegion.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import Musl
///
/// - note: It is important to manually manage the lifetime of the `NIOFileHandle` used to create a `FileRegion`.
/// - warning: `FileRegion` objects are not thread-safe and are mutable. They also cannot be fully thread-safe as they refer to a global underlying file descriptor.
public struct FileRegion {
public struct FileRegion: Sendable {

/// The `NIOFileHandle` that is used by this `FileRegion`.
public let fileHandle: NIOFileHandle
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOCore/IOData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
///
/// Many `ChannelHandler`s receive or emit bytes and in most cases this can be either a `ByteBuffer` or a `FileRegion`
/// from disk. To still form a well-typed `ChannelPipeline` such handlers should receive and emit value of type `IOData`.
public enum IOData {
public enum IOData: Sendable {
/// A `ByteBuffer`.
case byteBuffer(ByteBuffer)

Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOPosix/SelectorEpoll.swift
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ extension Selector: _SelectorBackendProtocol {
assert(self.timerFD == -1, "self.timerFD == \(self.timerFD) in deinitAssertions0, forgot close?")
}

func register0<S: Selectable>(selectable: S,
func register0(selectableFD: CInt,
fileDescriptor: CInt,
interested: SelectorEventSet,
registrationID: SelectorRegistrationID) throws {
Expand All @@ -162,7 +162,7 @@ extension Selector: _SelectorBackendProtocol {
try Epoll.epoll_ctl(epfd: self.selectorFD, op: Epoll.EPOLL_CTL_ADD, fd: fileDescriptor, event: &ev)
}

func reregister0<S: Selectable>(selectable: S,
func reregister0(selectableFD: CInt,
fileDescriptor: CInt,
oldInterested: SelectorEventSet,
newInterested: SelectorEventSet,
Expand All @@ -174,7 +174,7 @@ extension Selector: _SelectorBackendProtocol {
_ = try Epoll.epoll_ctl(epfd: self.selectorFD, op: Epoll.EPOLL_CTL_MOD, fd: fileDescriptor, event: &ev)
}

func deregister0<S: Selectable>(selectable: S, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
func deregister0(selectableFD: CInt, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
var ev = Epoll.epoll_event()
_ = try Epoll.epoll_ctl(epfd: self.selectorFD, op: Epoll.EPOLL_CTL_DEL, fd: fileDescriptor, event: &ev)
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/NIOPosix/SelectorGeneric.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ protocol _SelectorBackendProtocol {
associatedtype R: Registration
func initialiseState0() throws
func deinitAssertions0() // allows actual implementation to run some assertions as part of the class deinit
func register0<S: Selectable>(selectable: S, fileDescriptor: CInt, interested: SelectorEventSet, registrationID: SelectorRegistrationID) throws
func reregister0<S: Selectable>(selectable: S, fileDescriptor: CInt, oldInterested: SelectorEventSet, newInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws
func deregister0<S: Selectable>(selectable: S, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws
func register0(selectableFD: CInt, fileDescriptor: CInt, interested: SelectorEventSet, registrationID: SelectorRegistrationID) throws
func reregister0(selectableFD: CInt, fileDescriptor: CInt, oldInterested: SelectorEventSet, newInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws
func deregister0(selectableFD: CInt, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws
/* attention, this may (will!) be called from outside the event loop, ie. can't access mutable shared state (such as `self.open`) */
func wakeup0() throws
/// Apply the given `SelectorStrategy` and execute `body` once it's complete (which may produce `SelectorEvent`s to handle).
Expand Down Expand Up @@ -223,7 +223,7 @@ internal class Selector<R: Registration> {

try selectable.withUnsafeHandle { fd in
assert(registrations[Int(fd)] == nil)
try self.register0(selectable: selectable,
try self.register0(selectableFD: fd,
fileDescriptor: fd,
interested: interested,
registrationID: self.registrationID)
Expand All @@ -245,7 +245,7 @@ internal class Selector<R: Registration> {
assert(interested.contains(.reset), "must register for at least .reset but tried registering for \(interested)")
try selectable.withUnsafeHandle { fd in
var reg = registrations[Int(fd)]!
try self.reregister0(selectable: selectable,
try self.reregister0(selectableFD: fd,
fileDescriptor: fd,
oldInterested: reg.interested,
newInterested: interested,
Expand All @@ -271,7 +271,7 @@ internal class Selector<R: Registration> {
guard let reg = registrations.removeValue(forKey: Int(fd)) else {
return
}
try self.deregister0(selectable: selectable,
try self.deregister0(selectableFD: fd,
fileDescriptor: fd,
oldInterested: reg.interested,
registrationID: reg.registrationID)
Expand Down
18 changes: 8 additions & 10 deletions Sources/NIOPosix/SelectorKqueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,17 @@ extension Selector: _SelectorBackendProtocol {
}
}

private func kqueueUpdateEventNotifications<S: Selectable>(selectable: S, interested: SelectorEventSet, oldInterested: SelectorEventSet?, registrationID: SelectorRegistrationID) throws {
private func kqueueUpdateEventNotifications(selectableFD: CInt, interested: SelectorEventSet, oldInterested: SelectorEventSet?, registrationID: SelectorRegistrationID) throws {
assert(self.myThread == NIOThread.current)
let oldKQueueFilters = KQueueEventFilterSet(selectorEventSet: oldInterested ?? ._none)
let newKQueueFilters = KQueueEventFilterSet(selectorEventSet: interested)
assert(interested.contains(.reset))
assert(oldInterested?.contains(.reset) ?? true)

try selectable.withUnsafeHandle {
try newKQueueFilters.calculateKQueueFilterSetChanges(previousKQueueFilterSet: oldKQueueFilters,
fileDescriptor: $0,
fileDescriptor: selectableFD,
registrationID: registrationID,
kqueueApplyEventChangeSet)
}
}

func initialiseState0() throws {
Expand All @@ -184,16 +182,16 @@ extension Selector: _SelectorBackendProtocol {
func deinitAssertions0() {
}

func register0<S: Selectable>(selectable: S, fileDescriptor: CInt, interested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
try kqueueUpdateEventNotifications(selectable: selectable, interested: interested, oldInterested: nil, registrationID: registrationID)
func register0(selectableFD: CInt, fileDescriptor: CInt, interested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
try kqueueUpdateEventNotifications(selectableFD: selectableFD, interested: interested, oldInterested: nil, registrationID: registrationID)
}

func reregister0<S: Selectable>(selectable: S, fileDescriptor: CInt, oldInterested: SelectorEventSet, newInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
try kqueueUpdateEventNotifications(selectable: selectable, interested: newInterested, oldInterested: oldInterested, registrationID: registrationID)
func reregister0(selectableFD: CInt, fileDescriptor: CInt, oldInterested: SelectorEventSet, newInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
try kqueueUpdateEventNotifications(selectableFD: selectableFD, interested: newInterested, oldInterested: oldInterested, registrationID: registrationID)
}

func deregister0<S: Selectable>(selectable: S, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
try kqueueUpdateEventNotifications(selectable: selectable, interested: .reset, oldInterested: oldInterested, registrationID: registrationID)
func deregister0(selectableFD: CInt, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
try kqueueUpdateEventNotifications(selectableFD: selectableFD, interested: .reset, oldInterested: oldInterested, registrationID: registrationID)
}

/// Apply the given `SelectorStrategy` and execute `body` once it's complete (which may produce `SelectorEvent`s to handle).
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOPosix/SelectorUring.swift
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ extension Selector: _SelectorBackendProtocol {
assert(self.eventFD == -1, "self.eventFD == \(self.eventFD) on deinitAssertions0 deinit, forgot close?")
}

func register0<S: Selectable>(selectable: S,
func register0(selectableFD: CInt,
fileDescriptor: CInt,
interested: SelectorEventSet,
registrationID: SelectorRegistrationID) throws {
Expand All @@ -142,7 +142,7 @@ extension Selector: _SelectorBackendProtocol {
multishot: multishot)
}

func reregister0<S: Selectable>(selectable: S,
func reregister0(selectableFD: CInt,
fileDescriptor: CInt,
oldInterested: SelectorEventSet,
newInterested: SelectorEventSet,
Expand Down Expand Up @@ -172,7 +172,7 @@ extension Selector: _SelectorBackendProtocol {
}
}

func deregister0<S: Selectable>(selectable: S, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
func deregister0(selectableFD: CInt, fileDescriptor: CInt, oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID) throws {
_debugPrint("deregister interested \(selectable) reg.interested.uringEventSet [\(oldInterested.uringEventSet)]")

self.deferredReregistrationsPending = true
Expand Down
4 changes: 2 additions & 2 deletions Tests/NIOHTTP1Tests/HTTPServerClientTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class HTTPServerClientTest : XCTestCase {
XCTAssertNoThrow(try content.write(to: URL(fileURLWithPath: filePath)))
let fh = try! NIOFileHandle(path: filePath)
let region = FileRegion(fileHandle: fh,
readerIndex: 0,
endIndex: buffer.readableBytes)
readerIndex: 0,
endIndex: buffer.readableBytes)
return (.body(.fileRegion(region)), { try! fh.close() })
}
}
Expand Down