From 26abe917e12130b4abe6731064a9d956f381d953 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Mon, 5 May 2025 17:03:25 -0700 Subject: [PATCH 1/4] Move .standardOutput and .standardError out of Execution and remove AtomicBox --- Sources/Subprocess/API.swift | 395 +++++++++++++++--- Sources/Subprocess/AsyncBufferSequence.swift | 6 +- Sources/Subprocess/Atomic.swift | 241 ----------- Sources/Subprocess/Buffer.swift | 10 +- Sources/Subprocess/Configuration.swift | 307 +------------- Sources/Subprocess/Execution.swift | 141 +------ Sources/Subprocess/IO/Input.swift | 2 +- Sources/Subprocess/IO/Output.swift | 31 +- .../Platforms/Subprocess+Darwin.swift | 47 ++- .../Platforms/Subprocess+Linux.swift | 46 +- .../Platforms/Subprocess+Unix.swift | 66 ++- .../Platforms/Subprocess+Windows.swift | 128 +++--- Sources/Subprocess/Teardown.swift | 45 +- .../SubprocessTests+Darwin.swift | 4 +- .../SubprocessTests+Linux.swift | 4 +- .../SubprocessTests+Unix.swift | 42 +- .../SubprocessTests+Windows.swift | 23 +- 17 files changed, 600 insertions(+), 938 deletions(-) delete mode 100644 Sources/Subprocess/Atomic.swift diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 60af124..4c57d64 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -91,16 +91,78 @@ public func run< output: Output = .string, error: Error = .discarded ) async throws -> CollectedResult { - return try await Configuration( + typealias RunResult = ( + processIdentifier: ProcessIdentifier, + standardOutput: Output.OutputType, + standardError: Error.OutputType + ) + + let customInput = CustomWriteInput() + let result = try await Configuration( executable: executable, arguments: arguments, environment: environment, workingDirectory: workingDirectory, platformOptions: platformOptions - ).run(input: input, output: output, error: error) + ).run( + input: try customInput.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { execution, inputIO, outputIO, errorIO in + // Write input, capture output and error in parallel + return try await withThrowingTaskGroup( + of: OutputCapturingState.self, + returning: RunResult.self + ) { group in + group.addTask { + let stdout = try await output.captureOutput( + from: outputIO + ) + return .standardOutputCaptured(stdout) + } + group.addTask { + let stderr = try await error.captureOutput( + from: errorIO + ) + return .standardErrorCaptured(stderr) + } + + // Write span at the same isolation + if let writeFd = inputIO { + let writer = StandardInputWriter(diskIO: writeFd) + _ = try await writer.write(input.bytes) + try await writer.finish() + } + + var stdout: Output.OutputType! + var stderror: Error.OutputType! + while let state = try await group.next() { + switch state { + case .standardOutputCaptured(let output): + stdout = output + case .standardErrorCaptured(let error): + stderror = error + } + } + + return ( + processIdentifier: execution.processIdentifier, + standardOutput: stdout, + standardError: stderror + ) + } + } + + return CollectedResult( + processIdentifier: result.value.processIdentifier, + terminationStatus: result.terminationStatus, + standardOutput: result.value.standardOutput, + standardError: result.value.standardError + ) } #endif // SubprocessSpan + // MARK: - Custom Execution Body /// Run a executable with given parameters and a custom closure @@ -122,7 +184,54 @@ public func run< #if SubprocessSpan @available(SubprocessSpan, *) #endif -public func run( +public func run( + _ executable: Executable, + arguments: Arguments = [], + environment: Environment = .inherit, + workingDirectory: FilePath? = nil, + platformOptions: PlatformOptions = PlatformOptions(), + input: Input = .none, + error: Error, + isolation: isolated (any Actor)? = #isolation, + body: ((Execution, AsyncBufferSequence) async throws -> Result) +) async throws -> ExecutionResult where Error.OutputType == Void { + let output = SequenceOutput() + return try await Configuration( + executable: executable, + arguments: arguments, + environment: environment, + workingDirectory: workingDirectory, + platformOptions: platformOptions + ).run( + input: try input.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { execution, inputIO, outputIO, errorIO in + return try await withThrowingTaskGroup( + of: Void.self, + returning: Result.self + ) { group in + group.addTask { + if let inputIO = inputIO { + let writer = StandardInputWriter(diskIO: inputIO) + try await input.write(with: writer) + try await writer.finish() + } + } + + // Body runs in the same isolation + let outputSequence = AsyncBufferSequence(diskIO: outputIO!) + let result = try await body(execution, outputSequence) + try await group.waitForAll() + return result + } + } +} + +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +public func run( _ executable: Executable, arguments: Arguments = [], environment: Environment = .inherit, @@ -130,10 +239,57 @@ public func run Result) +) async throws -> ExecutionResult where Output.OutputType == Void { + let error = SequenceOutput() + return try await Configuration( + executable: executable, + arguments: arguments, + environment: environment, + workingDirectory: workingDirectory, + platformOptions: platformOptions + ).run( + input: try input.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { execution, inputIO, outputIO, errorIO in + return try await withThrowingTaskGroup( + of: Void.self, + returning: Result.self + ) { group in + group.addTask { + if let inputIO = inputIO { + let writer = StandardInputWriter(diskIO: inputIO) + try await input.write(with: writer) + try await writer.finish() + } + } + + // Body runs in the same isolation + let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + let result = try await body(execution, errorSequence) + try await group.waitForAll() + return result + } + } +} + +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif +public func run( + _ executable: Executable, + arguments: Arguments = [], + environment: Environment = .inherit, + workingDirectory: FilePath? = nil, + platformOptions: PlatformOptions = PlatformOptions(), error: Error, isolation: isolated (any Actor)? = #isolation, - body: ((Execution) async throws -> Result) -) async throws -> ExecutionResult where Output.OutputType == Void, Error.OutputType == Void { + body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) +) async throws -> ExecutionResult where Error.OutputType == Void { + let input = CustomWriteInput() + let output = SequenceOutput() return try await Configuration( executable: executable, arguments: arguments, @@ -141,7 +297,48 @@ public func run( + _ executable: Executable, + arguments: Arguments = [], + environment: Environment = .inherit, + workingDirectory: FilePath? = nil, + platformOptions: PlatformOptions = PlatformOptions(), + output: Output, + isolation: isolated (any Actor)? = #isolation, + body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) +) async throws -> ExecutionResult where Output.OutputType == Void { + let input = CustomWriteInput() + let error = SequenceOutput() + return try await Configuration( + executable: executable, + arguments: arguments, + environment: environment, + workingDirectory: workingDirectory, + platformOptions: platformOptions + ) + .run( + input: try input.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { execution, inputIO, outputIO, errorIO in + let writer = StandardInputWriter(diskIO: inputIO!) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + return try await body(execution, writer, errorSequence) + } } /// Run a executable with given parameters and a custom closure @@ -163,25 +360,43 @@ public func run( +public func run( _ executable: Executable, arguments: Arguments = [], environment: Environment = .inherit, workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), - output: Output, - error: Error, isolation: isolated (any Actor)? = #isolation, - body: ((Execution, StandardInputWriter) async throws -> Result) -) async throws -> ExecutionResult where Output.OutputType == Void, Error.OutputType == Void { - return try await Configuration( + body: ( + ( + Execution, + StandardInputWriter, + AsyncBufferSequence, + AsyncBufferSequence + ) async throws -> Result + ) +) async throws -> ExecutionResult { + let configuration = Configuration( executable: executable, arguments: arguments, environment: environment, workingDirectory: workingDirectory, platformOptions: platformOptions ) - .run(output: output, error: error, body) + let input = CustomWriteInput() + let output = SequenceOutput() + let error = SequenceOutput() + + return try await configuration.run( + input: try input.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { execution, inputIO, outputIO, errorIO in + let writer = StandardInputWriter(diskIO: inputIO!) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + return try await body(execution, writer, outputSequence, errorSequence) + } } // MARK: - Configuration Based @@ -207,21 +422,63 @@ public func run< output: Output = .string, error: Error = .discarded ) async throws -> CollectedResult { + typealias RunResult = ( + processIdentifier: ProcessIdentifier, + standardOutput: Output.OutputType, + standardError: Error.OutputType + ) let result = try await configuration.run( - input: input, - output: output, - error: error - ) { execution in - let ( - standardOutput, - standardError - ) = try await execution.captureIOs() - return ( - processIdentifier: execution.processIdentifier, - standardOutput: standardOutput, - standardError: standardError - ) + input: try input.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { (execution, inputIO, outputIO, errorIO) -> RunResult in + // Write input, capture output and error in parallel + return try await withThrowingTaskGroup( + of: OutputCapturingState?.self, + returning: RunResult.self + ) { group in + group.addTask { + if let writeFd = inputIO { + let writer = StandardInputWriter(diskIO: writeFd) + try await input.write(with: writer) + try await writer.finish() + } + return nil + } + group.addTask { + let stdout = try await output.captureOutput( + from: outputIO + ) + return .standardOutputCaptured(stdout) + } + group.addTask { + let stderr = try await error.captureOutput( + from: errorIO + ) + return .standardErrorCaptured(stderr) + } + + var stdout: Output.OutputType! + var stderror: Error.OutputType! + while let state = try await group.next() { + switch state { + case .standardOutputCaptured(let output): + stdout = output + case .standardErrorCaptured(let error): + stderror = error + case .none: + continue + } + } + + return ( + processIdentifier: execution.processIdentifier, + standardOutput: stdout, + standardError: stderror + ) + } } + return CollectedResult( processIdentifier: result.value.processIdentifier, terminationStatus: result.terminationStatus, @@ -243,14 +500,24 @@ public func run< #if SubprocessSpan @available(SubprocessSpan, *) #endif -public func run( +public func run( _ configuration: Configuration, - output: Output, - error: Error, isolation: isolated (any Actor)? = #isolation, - body: ((Execution, StandardInputWriter) async throws -> Result) -) async throws -> ExecutionResult where Output.OutputType == Void, Error.OutputType == Void { - return try await configuration.run(output: output, error: error, body) + body: ((Execution, StandardInputWriter, AsyncBufferSequence, AsyncBufferSequence) async throws -> Result) +) async throws -> ExecutionResult { + let input = CustomWriteInput() + let output = SequenceOutput() + let error = SequenceOutput() + return try await configuration.run( + input: try input.createPipe(), + output: try output.createPipe(), + error: try error.createPipe() + ) { execution, inputIO, outputIO, errorIO in + let writer = StandardInputWriter(diskIO: inputIO!) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + return try await body(execution, writer, outputSequence, errorSequence) + } } // MARK: - Detached @@ -319,36 +586,39 @@ public func runDetached( ) throws -> ProcessIdentifier { switch (input, output, error) { case (.none, .none, .none): + let processInput = NoInput() let processOutput = DiscardedOutput() let processError = DiscardedOutput() return try configuration.spawn( - withInput: NoInput().createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.none, .none, .some(let errorFd)): + let processInput = NoInput() let processOutput = DiscardedOutput() - let processError = FileDescriptorOutput(fileDescriptor: errorFd, closeAfterSpawningProcess: false) + let processError = FileDescriptorOutput( + fileDescriptor: errorFd, + closeAfterSpawningProcess: false + ) return try configuration.spawn( - withInput: NoInput().createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.none, .some(let outputFd), .none): - let processOutput = FileDescriptorOutput(fileDescriptor: outputFd, closeAfterSpawningProcess: false) + let processInput = NoInput() + let processOutput = FileDescriptorOutput( + fileDescriptor: outputFd, closeAfterSpawningProcess: false + ) let processError = DiscardedOutput() return try configuration.spawn( - withInput: NoInput().createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.none, .some(let outputFd), .some(let errorFd)): + let processInput = NoInput() let processOutput = FileDescriptorOutput( fileDescriptor: outputFd, closeAfterSpawningProcess: false @@ -358,52 +628,56 @@ public func runDetached( closeAfterSpawningProcess: false ) return try configuration.spawn( - withInput: NoInput().createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.some(let inputFd), .none, .none): + let processInput = FileDescriptorInput( + fileDescriptor: inputFd, + closeAfterSpawningProcess: false + ) let processOutput = DiscardedOutput() let processError = DiscardedOutput() return try configuration.spawn( - withInput: FileDescriptorInput( - fileDescriptor: inputFd, - closeAfterSpawningProcess: false - ).createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.some(let inputFd), .none, .some(let errorFd)): + let processInput = FileDescriptorInput( + fileDescriptor: inputFd, closeAfterSpawningProcess: false + ) let processOutput = DiscardedOutput() let processError = FileDescriptorOutput( fileDescriptor: errorFd, closeAfterSpawningProcess: false ) return try configuration.spawn( - withInput: FileDescriptorInput(fileDescriptor: inputFd, closeAfterSpawningProcess: false).createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.some(let inputFd), .some(let outputFd), .none): + let processInput = FileDescriptorInput( + fileDescriptor: inputFd, + closeAfterSpawningProcess: false + ) let processOutput = FileDescriptorOutput( fileDescriptor: outputFd, closeAfterSpawningProcess: false ) let processError = DiscardedOutput() return try configuration.spawn( - withInput: FileDescriptorInput(fileDescriptor: inputFd, closeAfterSpawningProcess: false).createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier case (.some(let inputFd), .some(let outputFd), .some(let errorFd)): + let processInput = FileDescriptorInput( + fileDescriptor: inputFd, + closeAfterSpawningProcess: false + ) let processOutput = FileDescriptorOutput( fileDescriptor: outputFd, closeAfterSpawningProcess: false @@ -413,11 +687,10 @@ public func runDetached( closeAfterSpawningProcess: false ) return try configuration.spawn( - withInput: FileDescriptorInput(fileDescriptor: inputFd, closeAfterSpawningProcess: false).createPipe(), - output: processOutput, + withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), - error: processError, errorPipe: try processError.createPipe() ).processIdentifier } } + diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 163e595..deb62db 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -20,11 +20,11 @@ #endif public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Failure = any Swift.Error - public typealias Element = SequenceOutput.Buffer + public typealias Element = Buffer @_nonSendable public struct Iterator: AsyncIteratorProtocol { - public typealias Element = SequenceOutput.Buffer + public typealias Element = Buffer private let diskIO: TrackedPlatformDiskIO private var buffer: [UInt8] @@ -38,7 +38,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { self.finished = false } - public func next() async throws -> SequenceOutput.Buffer? { + public func next() async throws -> Buffer? { let data = try await self.diskIO.readChunk( upToLength: readBufferSize ) diff --git a/Sources/Subprocess/Atomic.swift b/Sources/Subprocess/Atomic.swift deleted file mode 100644 index 18f7e86..0000000 --- a/Sources/Subprocess/Atomic.swift +++ /dev/null @@ -1,241 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift.org open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -#if canImport(Synchronization) -import Synchronization -#else - -#if canImport(os) -internal import os -#if canImport(C.os.lock) -internal import C.os.lock -#endif -#elseif canImport(Bionic) -@preconcurrency import Bionic -#elseif canImport(Glibc) -@preconcurrency import Glibc -#elseif canImport(Musl) -@preconcurrency import Musl -#elseif canImport(WinSDK) -import WinSDK -#endif // canImport(os) - -#endif // canImport(Synchronization) - -internal struct AtomicBox: Sendable, ~Copyable { - internal typealias BitwiseXorFunc = (OutputConsumptionState) -> OutputConsumptionState - - private let storage: @Sendable () -> BitwiseXorFunc - - internal init() { - #if canImport(Synchronization) - guard #available(macOS 15, *) else { - fatalError("Unexpected configuration") - } - let box = Atomic(UInt8(0)) - self.storage = { - return { input in - return box._bitwiseXor(input) - } - } - #else - let state = LockedState(OutputConsumptionState(rawValue: 0)) - self.storage = { - return state._bitwiseXor - } - #endif - } - - internal func bitwiseXor( - _ operand: OutputConsumptionState - ) -> OutputConsumptionState { - return self.storage()(operand) - } -} - -#if canImport(Synchronization) -@available(macOS 15, *) -extension Atomic where Value == UInt8 { - borrowing func _bitwiseXor( - _ operand: OutputConsumptionState - ) -> OutputConsumptionState { - let newState = self.bitwiseXor( - operand.rawValue, - ordering: .relaxed - ).newValue - return OutputConsumptionState(rawValue: newState) - } - - init(_ initialValue: OutputConsumptionState) { - self = Atomic(initialValue.rawValue) - } -} -#else -// Fallback to LockedState if `Synchronization` is not available -extension LockedState where State == OutputConsumptionState { - init(_ initialValue: OutputConsumptionState) { - self.init(initialState: initialValue) - } - - func _bitwiseXor( - _ operand: OutputConsumptionState - ) -> OutputConsumptionState { - return self.withLock { state in - state = OutputConsumptionState(rawValue: state.rawValue ^ operand.rawValue) - return state - } - } -} - -// MARK: - LockState -internal struct LockedState: ~Copyable { - - // Internal implementation for a cheap lock to aid sharing code across platforms - private struct _Lock { - #if canImport(os) - typealias Primitive = os_unfair_lock - #elseif os(FreeBSD) || os(OpenBSD) - typealias Primitive = pthread_mutex_t? - #elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl) - typealias Primitive = pthread_mutex_t - #elseif canImport(WinSDK) - typealias Primitive = SRWLOCK - #elseif os(WASI) - // WASI is single-threaded, so we don't need a lock. - typealias Primitive = Void - #endif - - typealias PlatformLock = UnsafeMutablePointer - var _platformLock: PlatformLock - - fileprivate static func initialize(_ platformLock: PlatformLock) { - #if canImport(os) - platformLock.initialize(to: os_unfair_lock()) - #elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl) - pthread_mutex_init(platformLock, nil) - #elseif canImport(WinSDK) - InitializeSRWLock(platformLock) - #elseif os(WASI) - // no-op - #else - #error("LockedState._Lock.initialize is unimplemented on this platform") - #endif - } - - fileprivate static func deinitialize(_ platformLock: PlatformLock) { - #if canImport(Bionic) || canImport(Glibc) || canImport(Musl) - pthread_mutex_destroy(platformLock) - #endif - platformLock.deinitialize(count: 1) - } - - static fileprivate func lock(_ platformLock: PlatformLock) { - #if canImport(os) - os_unfair_lock_lock(platformLock) - #elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl) - pthread_mutex_lock(platformLock) - #elseif canImport(WinSDK) - AcquireSRWLockExclusive(platformLock) - #elseif os(WASI) - // no-op - #else - #error("LockedState._Lock.lock is unimplemented on this platform") - #endif - } - - static fileprivate func unlock(_ platformLock: PlatformLock) { - #if canImport(os) - os_unfair_lock_unlock(platformLock) - #elseif canImport(Bionic) || canImport(Glibc) || canImport(Musl) - pthread_mutex_unlock(platformLock) - #elseif canImport(WinSDK) - ReleaseSRWLockExclusive(platformLock) - #elseif os(WASI) - // no-op - #else - #error("LockedState._Lock.unlock is unimplemented on this platform") - #endif - } - } - - private class _Buffer: ManagedBuffer { - deinit { - withUnsafeMutablePointerToElements { - _Lock.deinitialize($0) - } - } - } - - private let _buffer: ManagedBuffer - - package init(initialState: State) { - _buffer = _Buffer.create( - minimumCapacity: 1, - makingHeaderWith: { buf in - buf.withUnsafeMutablePointerToElements { - _Lock.initialize($0) - } - return initialState - } - ) - } - - package func withLock(_ body: @Sendable (inout State) throws -> T) rethrows -> T { - try withLockUnchecked(body) - } - - package func withLockUnchecked(_ body: (inout State) throws -> T) rethrows -> T { - try _buffer.withUnsafeMutablePointers { state, lock in - _Lock.lock(lock) - defer { _Lock.unlock(lock) } - return try body(&state.pointee) - } - } - - // Ensures the managed state outlives the locked scope. - package func withLockExtendingLifetimeOfState(_ body: @Sendable (inout State) throws -> T) rethrows -> T { - try _buffer.withUnsafeMutablePointers { state, lock in - _Lock.lock(lock) - return try withExtendedLifetime(state.pointee) { - defer { _Lock.unlock(lock) } - return try body(&state.pointee) - } - } - } -} - -extension LockedState where State == Void { - internal init() { - self.init(initialState: ()) - } - - internal func withLock(_ body: @Sendable () throws -> R) rethrows -> R { - return try withLock { _ in - try body() - } - } - - internal func lock() { - _buffer.withUnsafeMutablePointerToElements { lock in - _Lock.lock(lock) - } - } - - internal func unlock() { - _buffer.withUnsafeMutablePointerToElements { lock in - _Lock.unlock(lock) - } - } -} - -extension LockedState: @unchecked Sendable where State: Sendable {} - -#endif diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index f6d6e1b..aae5a09 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -14,7 +14,7 @@ #if SubprocessSpan @available(SubprocessSpan, *) #endif -extension SequenceOutput { +extension AsyncBufferSequence { /// A immutable collection of bytes public struct Buffer: Sendable { #if os(Windows) @@ -37,7 +37,7 @@ extension SequenceOutput { #if SubprocessSpan @available(SubprocessSpan, *) #endif -extension SequenceOutput.Buffer { +extension AsyncBufferSequence.Buffer { /// Number of bytes stored in the buffer public var count: Int { return self.data.count @@ -53,7 +53,7 @@ extension SequenceOutput.Buffer { #if SubprocessSpan @available(SubprocessSpan, *) #endif -extension SequenceOutput.Buffer { +extension AsyncBufferSequence.Buffer { #if !SubprocessSpan /// Access the raw bytes stored in this buffer /// - Parameter body: A closure with an `UnsafeRawBufferPointer` parameter that @@ -140,11 +140,11 @@ extension SequenceOutput.Buffer { #if SubprocessSpan @available(SubprocessSpan, *) #endif -extension SequenceOutput.Buffer: Equatable, Hashable { +extension AsyncBufferSequence.Buffer: Equatable, Hashable { #if os(Windows) // Compiler generated conformances #else - public static func == (lhs: SequenceOutput.Buffer, rhs: SequenceOutput.Buffer) -> Bool { + public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool { return lhs.data.elementsEqual(rhs.data) } diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 214d750..90244c0 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -61,233 +61,36 @@ public struct Configuration: Sendable { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func run< - Result, - Output: OutputProtocol, - Error: OutputProtocol - >( - output: Output, - error: Error, + internal func run( + input: CreatedPipe, + output: CreatedPipe, + error: CreatedPipe, isolation: isolated (any Actor)? = #isolation, - _ body: ( - Execution, - StandardInputWriter - ) async throws -> Result + _ body: ((Execution, TrackedPlatformDiskIO?, TrackedPlatformDiskIO?, TrackedPlatformDiskIO?) async throws -> Result) ) async throws -> ExecutionResult { - let input = CustomWriteInput() - - let inputPipe = try input.createPipe() - let outputPipe = try output.createPipe() - let errorPipe = try error.createPipe() - - let execution = try self.spawn( - withInput: inputPipe, - output: output, - outputPipe: outputPipe, - error: error, - errorPipe: errorPipe - ) - // After spawn, cleanup child side fds - try await self.cleanup( - execution: execution, - childSide: true, - parentSide: false, - attemptToTerminateSubProcess: false - ) - return try await withAsyncTaskCleanupHandler { - async let waitingStatus = try await monitorProcessTermination( - forProcessWithIdentifier: execution.processIdentifier - ) - // Body runs in the same isolation - let result = try await body( - execution, - .init(diskIO: execution.inputPipe.writeEnd!) - ) - return ExecutionResult( - terminationStatus: try await waitingStatus, - value: result - ) - } onCleanup: { - // Attempt to terminate the child process - // Since the task has already been cancelled, - // this is the best we can do - try? await self.cleanup( - execution: execution, - childSide: false, - parentSide: true, - attemptToTerminateSubProcess: true - ) - } - } - - #if SubprocessSpan - @available(SubprocessSpan, *) - internal func run< - InputElement: BitwiseCopyable, - Output: OutputProtocol, - Error: OutputProtocol - >( - input: borrowing Span, - output: Output, - error: Error, - isolation: isolated (any Actor)? = #isolation - ) async throws -> CollectedResult { - let writerInput = CustomWriteInput() - - let inputPipe = try writerInput.createPipe() - let outputPipe = try output.createPipe() - let errorPipe = try error.createPipe() - let execution = try self.spawn( - withInput: inputPipe, - output: output, - outputPipe: outputPipe, - error: error, - errorPipe: errorPipe - ) - // After spawn, clean up child side - try await self.cleanup( - execution: execution, - childSide: true, - parentSide: false, - attemptToTerminateSubProcess: false + withInput: input, + outputPipe: output, + errorPipe: error ) return try await withAsyncTaskCleanupHandler { - // Spawn parallel tasks to monitor exit status - // and capture outputs. Input writing must happen - // in this scope for Span async let terminationStatus = try monitorProcessTermination( forProcessWithIdentifier: execution.processIdentifier ) - async let ( - standardOutput, - standardError - ) = try await execution.captureIOs() - // Write input in the same scope - guard let writeFd = execution.inputPipe.writeEnd else { - fatalError("Trying to write to an input that has been closed") - } - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - input.withUnsafeBytes { ptr in - #if os(Windows) - let bytes = ptr - #else - let bytes = DispatchData( - bytesNoCopy: ptr, - deallocator: .custom( - nil, - { - // noop - } - ) - ) - #endif - - writeFd.write(bytes) { _, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume() - } - } - } - - } - try writeFd.safelyClose() - return CollectedResult( - processIdentifier: execution.processIdentifier, + // Body runs in the same isolation + let inputIO = input.createInputPlatformDiskIO() + let outputIO = output.createOutputPlatformDiskIO() + let errorIO = error.createOutputPlatformDiskIO() + let result = try await body(execution, inputIO, outputIO, errorIO) + return ExecutionResult( terminationStatus: try await terminationStatus, - standardOutput: try await standardOutput, - standardError: try await standardError + value: result ) } onCleanup: { // Attempt to terminate the child process - // Since the task has already been cancelled, - // this is the best we can do - try? await self.cleanup( - execution: execution, - childSide: false, - parentSide: true, - attemptToTerminateSubProcess: true - ) - } - } - #endif // SubprocessSpan - - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif - internal func run< - Result, - Input: InputProtocol, - Output: OutputProtocol, - Error: OutputProtocol - >( - input: Input, - output: Output, - error: Error, - isolation: isolated (any Actor)? = #isolation, - _ body: ((Execution) async throws -> Result) - ) async throws -> ExecutionResult { - - let inputPipe = try input.createPipe() - let outputPipe = try output.createPipe() - let errorPipe = try error.createPipe() - - let execution = try self.spawn( - withInput: inputPipe, - output: output, - outputPipe: outputPipe, - error: error, - errorPipe: errorPipe - ) - // After spawn, clean up child side - try await self.cleanup( - execution: execution, - childSide: true, - parentSide: false, - attemptToTerminateSubProcess: false - ) - - return try await withAsyncTaskCleanupHandler { - return try await withThrowingTaskGroup( - of: TerminationStatus?.self, - returning: ExecutionResult.self - ) { group in - group.addTask { - if let writeFd = execution.inputPipe.writeEnd { - let writer = StandardInputWriter(diskIO: writeFd) - try await input.write(with: writer) - try await writer.finish() - } - return nil - } - group.addTask { - return try await monitorProcessTermination( - forProcessWithIdentifier: execution.processIdentifier - ) - } - - // Body runs in the same isolation - let result = try await body(execution) - var status: TerminationStatus? = nil - while let monitorResult = try await group.next() { - if let monitorResult = monitorResult { - status = monitorResult - } - } - return ExecutionResult(terminationStatus: status!, value: result) - } - } onCleanup: { - // Attempt to terminate the child process - // Since the task has already been cancelled, - // this is the best we can do - try? await self.cleanup( - execution: execution, - childSide: false, - parentSide: true, - attemptToTerminateSubProcess: true + await Execution.runTeardownSequence( + self.platformOptions.teardownSequence, on: execution.processIdentifier ) } } @@ -322,82 +125,6 @@ extension Configuration: CustomStringConvertible, CustomDebugStringConvertible { // MARK: - Cleanup extension Configuration { - /// Close each input individually, and throw the first error if there's multiple errors thrown - @Sendable - #if SubprocessSpan - @available(SubprocessSpan, *) - #endif - private func cleanup< - Output: OutputProtocol, - Error: OutputProtocol - >( - execution: Execution, - childSide: Bool, - parentSide: Bool, - attemptToTerminateSubProcess: Bool - ) async throws { - func captureError(_ work: () throws -> Void) -> Swift.Error? { - do { - try work() - return nil - } catch { - // Ignore badFileDescriptor for double close - return error - } - } - - guard childSide || parentSide || attemptToTerminateSubProcess else { - return - } - - // Attempt to teardown the subprocess - if attemptToTerminateSubProcess { - await execution.teardown( - using: self.platformOptions.teardownSequence - ) - } - - var inputError: Swift.Error? - var outputError: Swift.Error? - var errorError: Swift.Error? // lol - - if childSide { - inputError = captureError { - try execution.inputPipe.readEnd?.safelyClose() - } - outputError = captureError { - try execution.outputPipe.writeEnd?.safelyClose() - } - errorError = captureError { - try execution.errorPipe.writeEnd?.safelyClose() - } - } - - if parentSide { - inputError = captureError { - try execution.inputPipe.writeEnd?.safelyClose() - } - outputError = captureError { - try execution.outputPipe.readEnd?.safelyClose() - } - errorError = captureError { - try execution.errorPipe.readEnd?.safelyClose() - } - } - - if let inputError = inputError { - throw inputError - } - - if let outputError = outputError { - throw outputError - } - - if let errorError = errorError { - throw errorError - } - } - /// Close each input individually, and throw the first error if there's multiple errors thrown @Sendable internal func cleanupPreSpawn( diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index d2b1964..c783c22 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -33,168 +33,31 @@ import WinSDK #if SubprocessSpan @available(SubprocessSpan, *) #endif -public final class Execution< - Output: OutputProtocol, - Error: OutputProtocol ->: Sendable { +public struct Execution: Sendable { /// The process identifier of the current execution public let processIdentifier: ProcessIdentifier - internal let output: Output - internal let error: Error - internal let inputPipe: InputPipe - internal let outputPipe: OutputPipe - internal let errorPipe: OutputPipe - internal let outputConsumptionState: AtomicBox - #if os(Windows) internal let consoleBehavior: PlatformOptions.ConsoleBehavior init( processIdentifier: ProcessIdentifier, - output: Output, - error: Error, - inputPipe: InputPipe, - outputPipe: OutputPipe, - errorPipe: OutputPipe, consoleBehavior: PlatformOptions.ConsoleBehavior ) { self.processIdentifier = processIdentifier - self.output = output - self.error = error - self.inputPipe = inputPipe - self.outputPipe = outputPipe - self.errorPipe = errorPipe - self.outputConsumptionState = AtomicBox() self.consoleBehavior = consoleBehavior } #else init( - processIdentifier: ProcessIdentifier, - output: Output, - error: Error, - inputPipe: InputPipe, - outputPipe: OutputPipe, - errorPipe: OutputPipe + processIdentifier: ProcessIdentifier ) { self.processIdentifier = processIdentifier - self.output = output - self.error = error - self.inputPipe = inputPipe - self.outputPipe = outputPipe - self.errorPipe = errorPipe - self.outputConsumptionState = AtomicBox() } #endif // os(Windows) } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif -extension Execution where Output == SequenceOutput { - /// The standard output of the subprocess. - /// - /// Accessing this property will **fatalError** if this property was - /// accessed multiple times. Subprocess communicates with parent process - /// via pipe under the hood and each pipe can only be consumed once. - public var standardOutput: AsyncBufferSequence { - let consumptionState = self.outputConsumptionState.bitwiseXor( - OutputConsumptionState.standardOutputConsumed - ) - - guard consumptionState.contains(.standardOutputConsumed), - let readFd = self.outputPipe.readEnd - else { - fatalError("The standard output has already been consumed") - } - return AsyncBufferSequence(diskIO: readFd) - } -} - -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif -extension Execution where Error == SequenceOutput { - /// The standard error of the subprocess. - /// - /// Accessing this property will **fatalError** if this property was - /// accessed multiple times. Subprocess communicates with parent process - /// via pipe under the hood and each pipe can only be consumed once. - public var standardError: AsyncBufferSequence { - let consumptionState = self.outputConsumptionState.bitwiseXor( - OutputConsumptionState.standardErrorConsumed - ) - - guard consumptionState.contains(.standardErrorConsumed), - let readFd = self.errorPipe.readEnd - else { - fatalError("The standard error has already been consumed") - } - return AsyncBufferSequence(diskIO: readFd) - } -} - // MARK: - Output Capture internal enum OutputCapturingState: Sendable { case standardOutputCaptured(Output) case standardErrorCaptured(Error) } - -internal struct OutputConsumptionState: OptionSet { - typealias RawValue = UInt8 - - internal let rawValue: UInt8 - - internal init(rawValue: UInt8) { - self.rawValue = rawValue - } - - static let standardOutputConsumed: Self = .init(rawValue: 0b0001) - static let standardErrorConsumed: Self = .init(rawValue: 0b0010) -} - -internal typealias CapturedIOs< - Output: Sendable, - Error: Sendable -> = (standardOutput: Output, standardError: Error) - -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif -extension Execution { - internal func captureIOs() async throws -> CapturedIOs< - Output.OutputType, Error.OutputType - > { - return try await withThrowingTaskGroup( - of: OutputCapturingState.self - ) { group in - group.addTask { - let stdout = try await self.output.captureOutput( - from: self.outputPipe.readEnd - ) - return .standardOutputCaptured(stdout) - } - group.addTask { - let stderr = try await self.error.captureOutput( - from: self.errorPipe.readEnd - ) - return .standardErrorCaptured(stderr) - } - - var stdout: Output.OutputType! - var stderror: Error.OutputType! - while let state = try await group.next() { - switch state { - case .standardOutputCaptured(let output): - stdout = output - case .standardErrorCaptured(let error): - stderror = error - } - } - return ( - standardOutput: stdout, - standardError: stderror - ) - } - } -} diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index ff5dc45..5c4e7a8 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -138,7 +138,7 @@ public struct ArrayInput: InputProtocol { /// A concrete `Input` type for subprocess that indicates that /// the Subprocess should read its input from `StandardInputWriter`. -public struct CustomWriteInput: InputProtocol { +internal struct CustomWriteInput: InputProtocol { public func write(with writer: StandardInputWriter) async throws { // noop } diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index e33292d..b18f493 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -125,7 +125,6 @@ public struct FileDescriptorOutput: OutputProtocol { public struct StringOutput: OutputProtocol { public typealias OutputType = String? public let maxSize: Int - private let encoding: Encoding.Type #if SubprocessSpan public func output(from span: RawSpan) throws -> String? { @@ -134,7 +133,7 @@ public struct StringOutput: OutputProtocol { for index in 0..) throws -> String? { @@ -146,7 +145,6 @@ public struct StringOutput: OutputProtocol { internal init(limit: Int, encoding: Encoding.Type) { self.maxSize = limit - self.encoding = encoding } } @@ -208,7 +206,7 @@ public struct BytesOutput: OutputProtocol { #if SubprocessSpan @available(SubprocessSpan, *) #endif -public struct SequenceOutput: OutputProtocol { +internal struct SequenceOutput: OutputProtocol { public typealias OutputType = Void internal init() {} @@ -276,16 +274,6 @@ extension OutputProtocol where Self == BytesOutput { } } -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif -extension OutputProtocol where Self == SequenceOutput { - /// Create a `Subprocess` output that redirects the output - /// to the `.standardOutput` (or `.standardError`) property - /// of `Execution` as `AsyncSequence`. - public static var sequence: Self { .init() } -} - // MARK: - Span Default Implementations #if SubprocessSpan @available(SubprocessSpan, *) @@ -391,6 +379,20 @@ extension OutputProtocol where OutputType == Void { #if SubprocessSpan @available(SubprocessSpan, *) extension OutputProtocol { + #if os(Windows) + internal func output(from data: [UInt8]) throws -> OutputType { + guard !data.isEmpty else { + let empty = UnsafeRawBufferPointer(start: nil, count: 0) + let span = RawSpan(_unsafeBytes: empty) + return try self.output(from: span) + } + + return try data.withUnsafeBufferPointer { ptr in + let span = RawSpan(_unsafeBytes: UnsafeRawBufferPointer(ptr)) + return try self.output(from: span) + } + } + #else internal func output(from data: DispatchData) throws -> OutputType { guard !data.isEmpty else { let empty = UnsafeRawBufferPointer(start: nil, count: 0) @@ -404,6 +406,7 @@ extension OutputProtocol { return try self.output(from: span) } } + #endif // os(Windows) } #endif diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 384091b..86bb09c 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -156,22 +156,17 @@ extension Configuration { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func spawn< - Output: OutputProtocol, - Error: OutputProtocol - >( + internal func spawn( withInput inputPipe: CreatedPipe, - output: Output, outputPipe: CreatedPipe, - error: Error, errorPipe: CreatedPipe - ) throws -> Execution { + ) throws -> Execution { // Instead of checking if every possible executable path // is valid, spawn each directly and catch ENOENT let possiblePaths = self.executable.possibleExecutablePaths( withPathValue: self.environment.pathValue() ) - return try self.preSpawn { args throws -> Execution in + return try self.preSpawn { args throws -> Execution in let (env, uidPtr, gidPtr, supplementaryGroups) = args for possibleExecutablePath in possiblePaths { var pid: pid_t = 0 @@ -350,13 +345,37 @@ extension Configuration { underlyingError: .init(rawValue: spawnError) ) } + + func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { + do { + try work() + return nil + } catch { + return error + } + } + // After spawn finishes, close all child side fds + let inputCloseError = captureError { + try inputPipe.readFileDescriptor?.safelyClose() + } + let outputCloseError = captureError { + try outputPipe.writeFileDescriptor?.safelyClose() + } + let errorCloseError = captureError { + try errorPipe.writeFileDescriptor?.safelyClose() + } + if let inputCloseError = inputCloseError { + throw inputCloseError + } + if let outputCloseError = outputCloseError { + throw outputCloseError + } + if let errorCloseError = errorCloseError { + throw errorCloseError + } + return Execution( - processIdentifier: .init(value: pid), - output: output, - error: error, - inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(), - errorPipe: errorPipe.createOutputPipe() + processIdentifier: .init(value: pid) ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 92d5b9a..8ec894b 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -35,16 +35,11 @@ extension Configuration { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func spawn< - Output: OutputProtocol, - Error: OutputProtocol - >( + internal func spawn( withInput inputPipe: CreatedPipe, - output: Output, outputPipe: CreatedPipe, - error: Error, errorPipe: CreatedPipe - ) throws -> Execution { + ) throws -> Execution { _setupMonitorSignalHandler() // Instead of checking if every possible executable path @@ -53,7 +48,7 @@ extension Configuration { withPathValue: self.environment.pathValue() ) - return try self.preSpawn { args throws -> Execution in + return try self.preSpawn { args throws -> Execution in let (env, uidPtr, gidPtr, supplementaryGroups) = args for possibleExecutablePath in possiblePaths { @@ -122,13 +117,36 @@ extension Configuration { underlyingError: .init(rawValue: spawnError) ) } + func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { + do { + try work() + return nil + } catch { + return error + } + } + // After spawn finishes, close all child side fds + let inputCloseError = captureError { + try inputPipe.readFileDescriptor?.safelyClose() + } + let outputCloseError = captureError { + try outputPipe.writeFileDescriptor?.safelyClose() + } + let errorCloseError = captureError { + try errorPipe.writeFileDescriptor?.safelyClose() + } + if let inputCloseError = inputCloseError { + throw inputCloseError + } + if let outputCloseError = outputCloseError { + throw outputCloseError + } + if let errorCloseError = errorCloseError { + throw errorCloseError + } + return Execution( - processIdentifier: .init(value: pid), - output: output, - error: error, - inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(), - errorPipe: errorPipe.createOutputPipe() + processIdentifier: .init(value: pid) ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 19461db..9758b86 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -121,7 +121,19 @@ extension Execution { signal: Signal, toProcessGroup shouldSendToProcessGroup: Bool = false ) throws { - let pid = shouldSendToProcessGroup ? -(self.processIdentifier.value) : self.processIdentifier.value + try Self.send( + signal: signal, + to: self.processIdentifier, + toProcessGroup: shouldSendToProcessGroup + ) + } + + internal static func send( + signal: Signal, + to processIdentifier: ProcessIdentifier, + toProcessGroup shouldSendToProcessGroup: Bool + ) throws { + let pid = shouldSendToProcessGroup ? -(processIdentifier.value) : processIdentifier.value guard kill(pid, signal.rawValue) == 0 else { throw SubprocessError( code: .init(.failedToSendSignal(signal.rawValue)), @@ -129,23 +141,6 @@ extension Execution { ) } } - - internal func tryTerminate() -> Swift.Error? { - do { - try self.send(signal: .kill) - } catch { - guard let posixError: SubprocessError = error as? SubprocessError else { - return error - } - // Ignore ESRCH (no such process) - if let underlyingError = posixError.underlyingError, - underlyingError.rawValue != ESRCH - { - return error - } - } - return nil - } } // MARK: - Environment Resolution @@ -398,8 +393,7 @@ internal typealias PlatformFileDescriptor = CInt internal typealias TrackedPlatformDiskIO = TrackedDispatchIO extension CreatedPipe { - internal func createInputPipe() -> InputPipe { - var writeEnd: TrackedPlatformDiskIO? = nil + internal func createInputPlatformDiskIO() -> TrackedPlatformDiskIO? { if let writeFileDescriptor = self.writeFileDescriptor { let dispatchIO: DispatchIO = DispatchIO( type: .stream, @@ -412,19 +406,12 @@ extension CreatedPipe { } } ) - writeEnd = .init( - dispatchIO, - closeWhenDone: writeFileDescriptor.closeWhenDone - ) + return .init(dispatchIO, closeWhenDone: writeFileDescriptor.closeWhenDone) } - return InputPipe( - readEnd: self.readFileDescriptor, - writeEnd: writeEnd - ) + return nil } - internal func createOutputPipe() -> OutputPipe { - var readEnd: TrackedPlatformDiskIO? = nil + internal func createOutputPlatformDiskIO() -> TrackedPlatformDiskIO? { if let readFileDescriptor = self.readFileDescriptor { let dispatchIO: DispatchIO = DispatchIO( type: .stream, @@ -437,15 +424,9 @@ extension CreatedPipe { } } ) - readEnd = .init( - dispatchIO, - closeWhenDone: readFileDescriptor.closeWhenDone - ) + return .init(dispatchIO, closeWhenDone: readFileDescriptor.closeWhenDone) } - return OutputPipe( - readEnd: readEnd, - writeEnd: self.writeFileDescriptor - ) + return nil } } @@ -454,7 +435,7 @@ extension TrackedDispatchIO { #if SubprocessSpan @available(SubprocessSpan, *) #endif - package func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { + package func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { return try await withCheckedThrowingContinuation { continuation in var buffer: DispatchData = .empty self.dispatchIO.read( @@ -480,7 +461,7 @@ extension TrackedDispatchIO { } if done { if !buffer.isEmpty { - continuation.resume(returning: SequenceOutput.Buffer(data: buffer)) + continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer)) } else { continuation.resume(returning: nil) } @@ -500,6 +481,7 @@ extension TrackedDispatchIO { queue: .global() ) { done, data, error in guard error == 0, let chunkData = data else { + self.dispatchIO.close() resultHandler( .failure( SubprocessError( @@ -510,6 +492,10 @@ extension TrackedDispatchIO { ) return } + // Close dispatchIO if we are done + if done { + self.dispatchIO.close() + } // Easy case: if we are done and buffer is nil, this means // there is only one chunk of data if done && buffer == nil { diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 627cdaa..257b2c7 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -24,47 +24,33 @@ extension Configuration { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func spawn< - Output: OutputProtocol, - Error: OutputProtocol - >( + internal func spawn( withInput inputPipe: CreatedPipe, - output: Output, outputPipe: CreatedPipe, - error: Error, errorPipe: CreatedPipe - ) throws -> Execution { + ) throws -> Execution { // Spawn differently depending on whether // we need to spawn as a user guard let userCredentials = self.platformOptions.userCredentials else { return try self.spawnDirect( withInput: inputPipe, - output: output, outputPipe: outputPipe, - error: error, errorPipe: errorPipe ) } return try self.spawnAsUser( withInput: inputPipe, - output: output, outputPipe: outputPipe, - error: error, errorPipe: errorPipe, userCredentials: userCredentials ) } - internal func spawnDirect< - Output: OutputProtocol, - Error: OutputProtocol - >( + internal func spawnDirect( withInput inputPipe: CreatedPipe, - output: Output, outputPipe: CreatedPipe, - error: Error, errorPipe: CreatedPipe - ) throws -> Execution { + ) throws -> Execution { let ( applicationName, commandAndArgs, @@ -147,28 +133,46 @@ extension Configuration { let pid = ProcessIdentifier( value: processInfo.dwProcessId ) + + func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { + do { + try work() + return nil + } catch { + return error + } + } + // After spawn finishes, close all child side fds + let inputCloseError = captureError { + try inputPipe.readFileDescriptor?.safelyClose() + } + let outputCloseError = captureError { + try outputPipe.writeFileDescriptor?.safelyClose() + } + let errorCloseError = captureError { + try errorPipe.writeFileDescriptor?.safelyClose() + } + if let inputCloseError = inputCloseError { + throw inputCloseError + } + if let outputCloseError = outputCloseError { + throw outputCloseError + } + if let errorCloseError = errorCloseError { + throw errorCloseError + } return Execution( processIdentifier: pid, - output: output, - error: error, - inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(), - errorPipe: errorPipe.createOutputPipe(), consoleBehavior: self.platformOptions.consoleBehavior ) } - internal func spawnAsUser< - Output: OutputProtocol, - Error: OutputProtocol - >( + internal func spawnAsUser( withInput inputPipe: CreatedPipe, - output: Output, outputPipe: CreatedPipe, - error: Error, errorPipe: CreatedPipe, userCredentials: PlatformOptions.UserCredentials - ) throws -> Execution { + ) throws -> Execution { let ( applicationName, commandAndArgs, @@ -264,13 +268,35 @@ extension Configuration { let pid = ProcessIdentifier( value: processInfo.dwProcessId ) + func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { + do { + try work() + return nil + } catch { + return error + } + } + // After spawn finishes, close all child side fds + let inputCloseError = captureError { + try inputPipe.readFileDescriptor?.safelyClose() + } + let outputCloseError = captureError { + try outputPipe.writeFileDescriptor?.safelyClose() + } + let errorCloseError = captureError { + try errorPipe.writeFileDescriptor?.safelyClose() + } + if let inputCloseError = inputCloseError { + throw inputCloseError + } + if let outputCloseError = outputCloseError { + throw outputCloseError + } + if let errorCloseError = errorCloseError { + throw errorCloseError + } return Execution( processIdentifier: pid, - output: output, - error: error, - inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(), - errorPipe: errorPipe.createOutputPipe(), consoleBehavior: self.platformOptions.consoleBehavior ) } @@ -497,12 +523,19 @@ extension Execution { /// Terminate the current subprocess with the given exit code /// - Parameter exitCode: The exit code to use for the subprocess. public func terminate(withExitCode exitCode: DWORD) throws { + try Self.terminate(self.processIdentifier, withExitCode: exitCode) + } + + internal static func terminate( + _ processIdentifier: ProcessIdentifier, + withExitCode exitCode: DWORD + ) throws { guard let processHandle = OpenProcess( // PROCESS_ALL_ACCESS DWORD(STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0xFFFF), false, - self.processIdentifier.value + processIdentifier.value ) else { throw SubprocessError( @@ -602,15 +635,6 @@ extension Execution { ) } } - - internal func tryTerminate() -> Swift.Error? { - do { - try self.terminate(withExitCode: 0) - } catch { - return error - } - return nil - } } // MARK: - Executable Searching @@ -1027,18 +1051,12 @@ extension FileDescriptor { extension CreatedPipe { /// On Windows, we use file descriptors directly - internal func createInputPipe() -> InputPipe { - return InputPipe( - readEnd: self.readFileDescriptor, - writeEnd: self.writeFileDescriptor - ) + internal func createInputPlatformDiskIO() -> TrackedPlatformDiskIO? { + return self.writeFileDescriptor } - internal func createOutputPipe() -> OutputPipe { - return OutputPipe( - readEnd: self.readFileDescriptor, - writeEnd: self.writeFileDescriptor - ) + internal func createOutputPlatformDiskIO() -> TrackedPlatformDiskIO? { + return self.readFileDescriptor } } diff --git a/Sources/Subprocess/Teardown.swift b/Sources/Subprocess/Teardown.swift index b417664..66a4ce2 100644 --- a/Sources/Subprocess/Teardown.swift +++ b/Sources/Subprocess/Teardown.swift @@ -81,8 +81,15 @@ extension Execution { /// Teardown sequence always ends with a `.kill` signal /// - Parameter sequence: The steps to perform. public func teardown(using sequence: some Sequence & Sendable) async { + await Self.runTeardownSequence(sequence, on: self.processIdentifier) + } + + internal static func teardown( + using sequence: some Sequence & Sendable, + on processIdentifier: ProcessIdentifier + ) async { await withUncancelledTask { - await self.runTeardownSequence(sequence) + await Self.runTeardownSequence(sequence, on: processIdentifier) } } } @@ -97,7 +104,8 @@ internal enum TeardownStepCompletion { @available(SubprocessSpan, *) #endif extension Execution { - internal func gracefulShutDown( + internal static func gracefulShutDown( + _ processIdentifier: ProcessIdentifier, allowedDurationToNextStep duration: Duration ) async { #if os(Windows) @@ -105,7 +113,7 @@ extension Execution { let processHandle = OpenProcess( DWORD(PROCESS_QUERY_INFORMATION | SYNCHRONIZE), false, - self.processIdentifier.value + processIdentifier.value ) else { // Nothing more we can do @@ -117,13 +125,13 @@ extension Execution { // 1. Attempt to send WM_CLOSE to the main window if _subprocess_windows_send_vm_close( - self.processIdentifier.value + processIdentifier.value ) { try? await Task.sleep(for: duration) } // 2. Attempt to attach to the console and send CTRL_C_EVENT - if AttachConsole(self.processIdentifier.value) { + if AttachConsole(processIdentifier.value) { // Disable Ctrl-C handling in this process if SetConsoleCtrlHandler(nil, true) { if GenerateConsoleCtrlEvent(DWORD(CTRL_C_EVENT), 0) { @@ -138,17 +146,24 @@ extension Execution { } // 3. Attempt to send CTRL_BREAK_EVENT to the process group - if GenerateConsoleCtrlEvent(DWORD(CTRL_BREAK_EVENT), self.processIdentifier.value) { + if GenerateConsoleCtrlEvent(DWORD(CTRL_BREAK_EVENT), processIdentifier.value) { // Wait for process to exit try? await Task.sleep(for: duration) } #else // Send SIGTERM - try? self.send(signal: .terminate) + try? self.send( + signal: .terminate, + to: processIdentifier, + toProcessGroup: false + ) #endif } - internal func runTeardownSequence(_ sequence: some Sequence & Sendable) async { + internal static func runTeardownSequence( + _ sequence: some Sequence & Sendable, + on processIdentifier: ProcessIdentifier + ) async { // First insert the `.kill` step let finalSequence = sequence + [TeardownStep(storage: .kill)] for step in finalSequence { @@ -167,7 +182,10 @@ extension Execution { return .processHasExited } } - await self.gracefulShutDown(allowedDurationToNextStep: allowedDuration) + await self.gracefulShutDown( + processIdentifier, + allowedDurationToNextStep: allowedDuration + ) return await group.next()! } #if !os(Windows) @@ -183,15 +201,18 @@ extension Execution { return .processHasExited } } - try? self.send(signal: signal) + try? self.send(signal: signal, to: processIdentifier, toProcessGroup: false) return await group.next()! } #endif // !os(Windows) case .kill: #if os(Windows) - try? self.terminate(withExitCode: 0) + try? self.terminate( + processIdentifier, + withExitCode: 0 + ) #else - try? self.send(signal: .kill) + try? self.send(signal: .kill, to: processIdentifier, toProcessGroup: false) #endif stepCompletion = .killedTheProcess } diff --git a/Tests/SubprocessTests/SubprocessTests+Darwin.swift b/Tests/SubprocessTests/SubprocessTests+Darwin.swift index f1a3874..2ac21bc 100644 --- a/Tests/SubprocessTests/SubprocessTests+Darwin.swift +++ b/Tests/SubprocessTests/SubprocessTests+Darwin.swift @@ -85,9 +85,8 @@ struct SubprocessDarwinTests { _ = try await Subprocess.run( // This will intentionally hang .path("/bin/cat"), - output: .discarded, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in // First suspend the procss try subprocess.send(signal: .suspend) var suspendedStatus: Int32 = 0 @@ -101,6 +100,7 @@ struct SubprocessDarwinTests { // Now kill the process try subprocess.send(signal: .terminate) + for try await _ in standardOutput {} } } } diff --git a/Tests/SubprocessTests/SubprocessTests+Linux.swift b/Tests/SubprocessTests/SubprocessTests+Linux.swift index b5ea036..3324343 100644 --- a/Tests/SubprocessTests/SubprocessTests+Linux.swift +++ b/Tests/SubprocessTests/SubprocessTests+Linux.swift @@ -75,9 +75,8 @@ struct SubprocessLinuxTests { // This will intentionally hang .path("/usr/bin/sleep"), arguments: ["infinity"], - output: .discarded, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in // First suspend the procss try subprocess.send(signal: .suspend) #expect( @@ -90,6 +89,7 @@ struct SubprocessLinuxTests { ) // Now kill the process try subprocess.send(signal: .terminate) + for try await _ in standardOutput {} } } } diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index f6a82c9..86fe242 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -429,11 +429,10 @@ extension SubprocessUnixTests { let result = try await Subprocess.run( .path("/bin/cat"), input: .data(expected), - output: .sequence, error: .discarded - ) { execution in + ) { execution, standardOutput in var buffer = Data() - for try await chunk in execution.standardOutput { + for try await chunk in standardOutput { let currentChunk = chunk._withUnsafeBytes { Data($0) } buffer += currentChunk } @@ -469,11 +468,10 @@ extension SubprocessUnixTests { let result = try await Subprocess.run( .path("/bin/cat"), input: .sequence(stream), - output: .sequence, error: .discarded - ) { execution in + ) { execution, standardOutput in var buffer = Data() - for try await chunk in execution.standardOutput { + for try await chunk in standardOutput { let currentChunk = chunk._withUnsafeBytes { Data($0) } buffer += currentChunk } @@ -618,11 +616,10 @@ extension SubprocessUnixTests { let catResult = try await Subprocess.run( .path("/bin/cat"), arguments: [theMysteriousIsland.string], - output: .sequence, error: .discarded - ) { execution in + ) { execution, standardOutput in var buffer = Data() - for try await chunk in execution.standardOutput { + for try await chunk in standardOutput { let currentChunk = chunk._withUnsafeBytes { Data($0) } buffer += currentChunk } @@ -816,9 +813,8 @@ extension SubprocessUnixTests { """, ], input: .none, - output: .sequence, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in return try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { try await Task.sleep(for: .milliseconds(200)) @@ -831,7 +827,7 @@ extension SubprocessUnixTests { } group.addTask { var outputs: [String] = [] - for try await bit in subprocess.standardOutput { + for try await bit in standardOutput { let bitString = bit._withUnsafeBytes { ptr in return String(decoding: ptr, as: UTF8.self) }.trimmingCharacters(in: .whitespacesAndNewlines) @@ -881,11 +877,11 @@ extension SubprocessUnixTests { let stuckResult = try await Subprocess.run( // This will intentionally hang .path("/bin/cat"), - output: .discarded, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in // Make sure we can send signals to terminate the process try subprocess.send(signal: .terminate) + for try await _ in standardOutput {} } guard case .unhandledException(let exception) = stuckResult.terminationStatus else { Issue.record("Wrong termination status repored: \(stuckResult.terminationStatus)") @@ -894,24 +890,6 @@ extension SubprocessUnixTests { #expect(exception == Signal.terminate.rawValue) } - @Test func testAtomicBox() async throws { - // Start with 0 - let atomicBox = AtomicBox() - // After first insert: 0 ^ .standardErrorConsumed = .standardErrorConsumed - #expect(atomicBox.bitwiseXor(.standardErrorConsumed) == .standardErrorConsumed) - // Second insert: - // .standardErrorConsumed ^ .standardOutputConsumed = - // (.standardErrorConsumed & .standardOutputConsumed) - #expect(atomicBox.bitwiseXor(.standardOutputConsumed).contains(.standardOutputConsumed)) - // Thrid xor insert should remove error, but retain output: - // (.standardErrorConsumed & .standardOutputConsumed) ^ .standardErrorConsumed = - // .standardOutputConsumed - #expect(atomicBox.bitwiseXor(.standardErrorConsumed).contains(.standardOutputConsumed)) - // Fourth xor insert should clear out output as well: - // .standardOutputConsumed ^ .standardOutputConsumed = 0 - #expect(atomicBox.bitwiseXor(.standardOutputConsumed) == OutputConsumptionState(rawValue: 0)) - } - @Test func testExitSignal() async throws { guard #available(SubprocessSpan , *) else { return diff --git a/Tests/SubprocessTests/SubprocessTests+Windows.swift b/Tests/SubprocessTests/SubprocessTests+Windows.swift index 8e20711..0def034 100644 --- a/Tests/SubprocessTests/SubprocessTests+Windows.swift +++ b/Tests/SubprocessTests/SubprocessTests+Windows.swift @@ -363,11 +363,10 @@ extension SubprocessWindowsTests { self.cmdExe, arguments: ["/c", "findstr x*"], input: .data(expected), - output: .sequence, error: .discarded - ) { execution in + ) { execution, standardOutput in var buffer = Data() - for try await chunk in execution.standardOutput { + for try await chunk in standardOutput { let currentChunk = chunk._withUnsafeBytes { Data($0) } buffer += currentChunk } @@ -404,11 +403,10 @@ extension SubprocessWindowsTests { self.cmdExe, arguments: ["/c", "findstr x*"], input: .sequence(stream), - output: .sequence, error: .discarded - ) { execution in + ) { execution, standardOutput in var buffer = Data() - for try await chunk in execution.standardOutput { + for try await chunk in standardOutput { let currentChunk = chunk._withUnsafeBytes { Data($0) } buffer += currentChunk } @@ -502,11 +500,10 @@ extension SubprocessWindowsTests { let catResult = try await Subprocess.run( self.cmdExe, arguments: ["/c", "type \(theMysteriousIsland.string)"], - output: .sequence, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in var buffer = Data() - for try await chunk in subprocess.standardOutput { + for try await chunk in standardOutput { let currentChunk = chunk._withUnsafeBytes { Data($0) } buffer += currentChunk } @@ -692,11 +689,11 @@ extension SubprocessWindowsTests { self.cmdExe, // This command will intentionally hang arguments: ["/c", "type con"], - output: .discarded, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in // Make sure we can kill the hung process try subprocess.terminate(withExitCode: 42) + for try await _ in standardOutput {} } // If we got here, the process was terminated guard case .exited(let exitCode) = stuckProcess.terminationStatus else { @@ -714,9 +711,8 @@ extension SubprocessWindowsTests { self.cmdExe, // This command will intentionally hang arguments: ["/c", "type con"], - output: .discarded, error: .discarded - ) { subprocess in + ) { subprocess, standardOutput in try subprocess.suspend() // Now check the to make sure the procss is actually suspended // Why not spawn a nother process to do that? @@ -754,6 +750,7 @@ extension SubprocessWindowsTests { // Now finally kill the process since it's intentionally hung try subprocess.terminate(withExitCode: 0) + for try await _ in standardOutput {} } #expect(stuckProcess.terminationStatus.isSuccess) } From c2d5278e39f0496d13c38b436671dc3f649da90d Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Tue, 6 May 2025 13:52:21 -0700 Subject: [PATCH 2/4] Make CreatedPipe ~Copyable --- Sources/Subprocess/API.swift | 16 ++-- Sources/Subprocess/Configuration.swift | 46 +++++++--- .../Platforms/Subprocess+Darwin.swift | 91 +++++++++++-------- .../Platforms/Subprocess+Linux.swift | 67 +++++++++----- .../Platforms/Subprocess+Unix.swift | 2 +- 5 files changed, 140 insertions(+), 82 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 4c57d64..99f94ab 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -593,7 +593,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.none, .none, .some(let errorFd)): let processInput = NoInput() let processOutput = DiscardedOutput() @@ -605,7 +605,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.none, .some(let outputFd), .none): let processInput = NoInput() let processOutput = FileDescriptorOutput( @@ -616,7 +616,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.none, .some(let outputFd), .some(let errorFd)): let processInput = NoInput() let processOutput = FileDescriptorOutput( @@ -631,7 +631,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.some(let inputFd), .none, .none): let processInput = FileDescriptorInput( fileDescriptor: inputFd, @@ -643,7 +643,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.some(let inputFd), .none, .some(let errorFd)): let processInput = FileDescriptorInput( fileDescriptor: inputFd, closeAfterSpawningProcess: false @@ -657,7 +657,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.some(let inputFd), .some(let outputFd), .none): let processInput = FileDescriptorInput( fileDescriptor: inputFd, @@ -672,7 +672,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier case (.some(let inputFd), .some(let outputFd), .some(let errorFd)): let processInput = FileDescriptorInput( fileDescriptor: inputFd, @@ -690,7 +690,7 @@ public func runDetached( withInput: try processInput.createPipe(), outputPipe: try processOutput.createPipe(), errorPipe: try processError.createPipe() - ).processIdentifier + ).execution.processIdentifier } } diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 90244c0..9af704c 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -62,27 +62,32 @@ public struct Configuration: Sendable { @available(SubprocessSpan, *) #endif internal func run( - input: CreatedPipe, - output: CreatedPipe, - error: CreatedPipe, + input: consuming CreatedPipe, + output: consuming CreatedPipe, + error: consuming CreatedPipe, isolation: isolated (any Actor)? = #isolation, _ body: ((Execution, TrackedPlatformDiskIO?, TrackedPlatformDiskIO?, TrackedPlatformDiskIO?) async throws -> Result) ) async throws -> ExecutionResult { - let execution = try self.spawn( + let spawnResults = try self.spawn( withInput: input, outputPipe: output, errorPipe: error ) + let pid = spawnResults.execution.processIdentifier + + var spawnResultBox: SpawnResult?? = consume spawnResults return try await withAsyncTaskCleanupHandler { + let _spawnResult = spawnResultBox!.take()! + async let terminationStatus = try monitorProcessTermination( - forProcessWithIdentifier: execution.processIdentifier + forProcessWithIdentifier: _spawnResult.execution.processIdentifier ) // Body runs in the same isolation - let inputIO = input.createInputPlatformDiskIO() - let outputIO = output.createOutputPlatformDiskIO() - let errorIO = error.createOutputPlatformDiskIO() - let result = try await body(execution, inputIO, outputIO, errorIO) + let inputIO = _spawnResult.inputPipe.createInputPlatformDiskIO() + let outputIO = _spawnResult.outputPipe.createOutputPlatformDiskIO() + let errorIO = _spawnResult.errorPipe.createOutputPlatformDiskIO() + let result = try await body(_spawnResult.execution, inputIO, outputIO, errorIO) return ExecutionResult( terminationStatus: try await terminationStatus, value: result @@ -90,7 +95,8 @@ public struct Configuration: Sendable { } onCleanup: { // Attempt to terminate the child process await Execution.runTeardownSequence( - self.platformOptions.teardownSequence, on: execution.processIdentifier + self.platformOptions.teardownSequence, + on: pid ) } } @@ -128,9 +134,9 @@ extension Configuration { /// Close each input individually, and throw the first error if there's multiple errors thrown @Sendable internal func cleanupPreSpawn( - input: CreatedPipe, - output: CreatedPipe, - error: CreatedPipe + input: borrowing CreatedPipe, + output: borrowing CreatedPipe, + error: borrowing CreatedPipe ) throws { var inputError: Swift.Error? var outputError: Swift.Error? @@ -472,6 +478,18 @@ extension TerminationStatus: CustomStringConvertible, CustomDebugStringConvertib // MARK: - Internal +extension Configuration { + #if SubprocessSpan + @available(SubprocessSpan, *) + #endif + internal struct SpawnResult: ~Copyable { + let execution: Execution + let inputPipe: CreatedPipe + let outputPipe: CreatedPipe + let errorPipe: CreatedPipe + } +} + internal enum StringOrRawBytes: Sendable, Hashable { case string(String) case rawBytes([UInt8]) @@ -593,7 +611,7 @@ internal struct TrackedDispatchIO { } #endif -internal struct CreatedPipe { +internal struct CreatedPipe: ~Copyable { internal let readFileDescriptor: TrackedFileDescriptor? internal let writeFileDescriptor: TrackedFileDescriptor? diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 86bb09c..d46be0d 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -157,17 +157,25 @@ extension Configuration { @available(SubprocessSpan, *) #endif internal func spawn( - withInput inputPipe: CreatedPipe, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe - ) throws -> Execution { + withInput inputPipe: consuming CreatedPipe, + outputPipe: consuming CreatedPipe, + errorPipe: consuming CreatedPipe + ) throws -> SpawnResult { // Instead of checking if every possible executable path // is valid, spawn each directly and catch ENOENT let possiblePaths = self.executable.possibleExecutablePaths( withPathValue: self.environment.pathValue() ) - return try self.preSpawn { args throws -> Execution in + var inputPipeBox: CreatedPipe?? = consume inputPipe + var outputPipeBox: CreatedPipe?? = consume outputPipe + var errorPipeBox: CreatedPipe?? = consume errorPipe + + return try self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args + let _inputPipe = inputPipeBox!.take()! + let _outputPipe = outputPipeBox!.take()! + let _errorPipe = errorPipeBox!.take()! + for possibleExecutablePath in possiblePaths { var pid: pid_t = 0 @@ -187,23 +195,24 @@ extension Configuration { defer { posix_spawn_file_actions_destroy(&fileActions) } + // Input var result: Int32 = -1 - if let inputRead = inputPipe.readFileDescriptor { + if let inputRead = _inputPipe.readFileDescriptor { result = posix_spawn_file_actions_adddup2(&fileActions, inputRead.platformDescriptor, 0) guard result == 0 else { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) ) } } - if let inputWrite = inputPipe.writeFileDescriptor { + if let inputWrite = _inputPipe.writeFileDescriptor { // Close parent side result = posix_spawn_file_actions_addclose(&fileActions, inputWrite.platformDescriptor) guard result == 0 else { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) @@ -211,21 +220,21 @@ extension Configuration { } } // Output - if let outputWrite = outputPipe.writeFileDescriptor { + if let outputWrite = _outputPipe.writeFileDescriptor { result = posix_spawn_file_actions_adddup2(&fileActions, outputWrite.platformDescriptor, 1) guard result == 0 else { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) ) } } - if let outputRead = outputPipe.readFileDescriptor { + if let outputRead = _outputPipe.readFileDescriptor { // Close parent side result = posix_spawn_file_actions_addclose(&fileActions, outputRead.platformDescriptor) guard result == 0 else { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) @@ -233,21 +242,21 @@ extension Configuration { } } // Error - if let errorWrite = errorPipe.writeFileDescriptor { + if let errorWrite = _errorPipe.writeFileDescriptor { result = posix_spawn_file_actions_adddup2(&fileActions, errorWrite.platformDescriptor, 2) guard result == 0 else { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) ) } } - if let errorRead = errorPipe.readFileDescriptor { + if let errorRead = _errorPipe.readFileDescriptor { // Close parent side result = posix_spawn_file_actions_addclose(&fileActions, errorRead.platformDescriptor) guard result == 0 else { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) @@ -290,7 +299,7 @@ extension Configuration { // Error handling if chdirError != 0 || spawnAttributeError != 0 { - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) if spawnAttributeError != 0 { throw SubprocessError( code: .init(.spawnFailed), @@ -336,9 +345,9 @@ extension Configuration { } // Throw all other errors try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + input: _inputPipe, + output: _outputPipe, + error: _errorPipe ) throw SubprocessError( code: .init(.spawnFailed), @@ -346,24 +355,26 @@ extension Configuration { ) } - func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { - do { - try work() - return nil - } catch { - return error - } - } // After spawn finishes, close all child side fds - let inputCloseError = captureError { - try inputPipe.readFileDescriptor?.safelyClose() + var inputCloseError: (any Swift.Error)? = nil + do { + try _inputPipe.readFileDescriptor?.safelyClose() + } catch { + inputCloseError = error } - let outputCloseError = captureError { - try outputPipe.writeFileDescriptor?.safelyClose() + var outputCloseError: (any Swift.Error)? = nil + do { + try _outputPipe.writeFileDescriptor?.safelyClose() + } catch { + outputCloseError = error } - let errorCloseError = captureError { - try errorPipe.writeFileDescriptor?.safelyClose() + var errorCloseError: (any Swift.Error)? = nil + do { + try _errorPipe.writeFileDescriptor?.safelyClose() + } catch { + errorCloseError = error } + if let inputCloseError = inputCloseError { throw inputCloseError } @@ -374,9 +385,15 @@ extension Configuration { throw errorCloseError } - return Execution( + let execution = Execution( processIdentifier: .init(value: pid) ) + return SpawnResult( + execution: execution, + inputPipe: _inputPipe, + outputPipe: _outputPipe, + errorPipe: _errorPipe + ) } // If we reach this point, it means either the executable path @@ -384,7 +401,7 @@ extension Configuration { // provide which one is not valid, here we make a best effort guess // by checking whether the working directory is valid. This technically // still causes TOUTOC issue, but it's the best we can do for error recovery. - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) let workingDirectory = self.workingDirectory.string guard Configuration.pathAccessible(workingDirectory, mode: F_OK) else { throw SubprocessError( diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 8ec894b..8997dff 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -36,10 +36,10 @@ extension Configuration { @available(SubprocessSpan, *) #endif internal func spawn( - withInput inputPipe: CreatedPipe, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe - ) throws -> Execution { + withInput inputPipe: consuming CreatedPipe, + outputPipe: consuming CreatedPipe, + errorPipe: consuming CreatedPipe + ) throws -> SpawnResult { _setupMonitorSignalHandler() // Instead of checking if every possible executable path @@ -47,10 +47,17 @@ extension Configuration { let possiblePaths = self.executable.possibleExecutablePaths( withPathValue: self.environment.pathValue() ) + var inputPipeBox: CreatedPipe?? = consume inputPipe + var outputPipeBox: CreatedPipe?? = consume outputPipe + var errorPipeBox: CreatedPipe?? = consume errorPipe - return try self.preSpawn { args throws -> Execution in + return try self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args + let _inputPipe = inputPipeBox!.take()! + let _outputPipe = outputPipeBox!.take()! + let _errorPipe = errorPipeBox!.take()! + for possibleExecutablePath in possiblePaths { var processGroupIDPtr: UnsafeMutablePointer? = nil if let processGroupID = self.platformOptions.processGroupID { @@ -66,12 +73,12 @@ extension Configuration { } // Setup input let fileDescriptors: [CInt] = [ - inputPipe.readFileDescriptor?.platformDescriptor ?? -1, - inputPipe.writeFileDescriptor?.platformDescriptor ?? -1, - outputPipe.writeFileDescriptor?.platformDescriptor ?? -1, - outputPipe.readFileDescriptor?.platformDescriptor ?? -1, - errorPipe.writeFileDescriptor?.platformDescriptor ?? -1, - errorPipe.readFileDescriptor?.platformDescriptor ?? -1, + _inputPipe.readFileDescriptor?.platformDescriptor ?? -1, + _inputPipe.writeFileDescriptor?.platformDescriptor ?? -1, + _outputPipe.writeFileDescriptor?.platformDescriptor ?? -1, + _outputPipe.readFileDescriptor?.platformDescriptor ?? -1, + _errorPipe.writeFileDescriptor?.platformDescriptor ?? -1, + _errorPipe.readFileDescriptor?.platformDescriptor ?? -1, ] let workingDirectory: String = self.workingDirectory.string @@ -108,9 +115,9 @@ extension Configuration { } // Throw all other errors try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + input: _inputPipe, + output: _outputPipe, + error: _errorPipe ) throw SubprocessError( code: .init(.spawnFailed), @@ -126,15 +133,25 @@ extension Configuration { } } // After spawn finishes, close all child side fds - let inputCloseError = captureError { - try inputPipe.readFileDescriptor?.safelyClose() + var inputCloseError: (any Swift.Error)? = nil + do { + try _inputPipe.readFileDescriptor?.safelyClose() + } catch { + inputCloseError = error } - let outputCloseError = captureError { - try outputPipe.writeFileDescriptor?.safelyClose() + var outputCloseError: (any Swift.Error)? = nil + do { + try _outputPipe.writeFileDescriptor?.safelyClose() + } catch { + outputCloseError = error } - let errorCloseError = captureError { - try errorPipe.writeFileDescriptor?.safelyClose() + var errorCloseError: (any Swift.Error)? = nil + do { + try _errorPipe.writeFileDescriptor?.safelyClose() + } catch { + errorCloseError = error } + if let inputCloseError = inputCloseError { throw inputCloseError } @@ -145,9 +162,15 @@ extension Configuration { throw errorCloseError } - return Execution( + let execution = Execution( processIdentifier: .init(value: pid) ) + return SpawnResult( + execution: execution, + inputPipe: _inputPipe, + outputPipe: _outputPipe, + errorPipe: _errorPipe + ) } // If we reach this point, it means either the executable path @@ -155,7 +178,7 @@ extension Configuration { // provide which one is not valid, here we make a best effort guess // by checking whether the working directory is valid. This technically // still causes TOUTOC issue, but it's the best we can do for error recovery. - try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) + try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) let workingDirectory = self.workingDirectory.string guard Configuration.pathAccessible(workingDirectory, mode: F_OK) else { throw SubprocessError( diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 9758b86..97f5454 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -322,7 +322,7 @@ extension Configuration { supplementaryGroups: [gid_t]? ) - internal func preSpawn( + internal func preSpawn( _ work: (PreSpawnArgs) throws -> Result ) throws -> Result { // Prepare environment From 51e18059101a5b8ae14e65365846487f82e96688 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 7 May 2025 13:18:52 -0700 Subject: [PATCH 3/4] Make Pipes and FileDescriptor ~Copyable --- Sources/Subprocess/API.swift | 95 +++--- Sources/Subprocess/AsyncBufferSequence.swift | 24 +- Sources/Subprocess/Configuration.swift | 213 ++++++++---- Sources/Subprocess/IO/Input.swift | 21 +- Sources/Subprocess/IO/Output.swift | 32 +- .../Platforms/Subprocess+Darwin.swift | 190 +++++++---- .../Platforms/Subprocess+Linux.swift | 93 +++--- .../Platforms/Subprocess+Unix.swift | 64 ++-- .../Platforms/Subprocess+Windows.swift | 307 ++++++++++-------- Sources/Subprocess/Span+Subprocess.swift | 13 + .../Input+Foundation.swift | 8 +- 11 files changed, 616 insertions(+), 444 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 99f94ab..e32636b 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -109,48 +109,25 @@ public func run< output: try output.createPipe(), error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in - // Write input, capture output and error in parallel - return try await withThrowingTaskGroup( - of: OutputCapturingState.self, - returning: RunResult.self - ) { group in - group.addTask { - let stdout = try await output.captureOutput( - from: outputIO - ) - return .standardOutputCaptured(stdout) - } - group.addTask { - let stderr = try await error.captureOutput( - from: errorIO - ) - return .standardErrorCaptured(stderr) - } + var inputIOBox: TrackedPlatformDiskIO? = consume inputIO + var outputIOBox: TrackedPlatformDiskIO? = consume outputIO + var errorIOBox: TrackedPlatformDiskIO? = consume errorIO - // Write span at the same isolation - if let writeFd = inputIO { - let writer = StandardInputWriter(diskIO: writeFd) - _ = try await writer.write(input.bytes) - try await writer.finish() - } - - var stdout: Output.OutputType! - var stderror: Error.OutputType! - while let state = try await group.next() { - switch state { - case .standardOutputCaptured(let output): - stdout = output - case .standardErrorCaptured(let error): - stderror = error - } - } - - return ( - processIdentifier: execution.processIdentifier, - standardOutput: stdout, - standardError: stderror - ) + // Write input, capture output and error in parallel + async let stdout = try output.captureOutput(from: outputIOBox.take()) + async let stderr = try error.captureOutput(from: errorIOBox.take()) + // Write span at the same isolation + if let writeFd = inputIOBox.take() { + let writer = StandardInputWriter(diskIO: writeFd) + _ = try await writer.write(input._bytes) + try await writer.finish() } + + return ( + processIdentifier: execution.processIdentifier, + standardOutput: try await stdout, + standardError: try await stderr + ) } return CollectedResult( @@ -207,12 +184,15 @@ public func run( output: try output.createPipe(), error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in + var inputIOBox: TrackedPlatformDiskIO? = consume inputIO + var outputIOBox: TrackedPlatformDiskIO? = consume outputIO return try await withThrowingTaskGroup( of: Void.self, returning: Result.self ) { group in + var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take() group.addTask { - if let inputIO = inputIO { + if let inputIO = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: inputIO) try await input.write(with: writer) try await writer.finish() @@ -220,7 +200,7 @@ public func run( } // Body runs in the same isolation - let outputSequence = AsyncBufferSequence(diskIO: outputIO!) + let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO()) let result = try await body(execution, outputSequence) try await group.waitForAll() return result @@ -254,12 +234,15 @@ public func run( output: try output.createPipe(), error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in + var inputIOBox: TrackedPlatformDiskIO? = consume inputIO + var errorIOBox: TrackedPlatformDiskIO? = consume errorIO return try await withThrowingTaskGroup( of: Void.self, returning: Result.self ) { group in + var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take() group.addTask { - if let inputIO = inputIO { + if let inputIO = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: inputIO) try await input.write(with: writer) try await writer.finish() @@ -267,7 +250,7 @@ public func run( } // Body runs in the same isolation - let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO()) let result = try await body(execution, errorSequence) try await group.waitForAll() return result @@ -303,7 +286,7 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO()) return try await body(execution, writer, outputSequence) } } @@ -336,7 +319,7 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO()) return try await body(execution, writer, errorSequence) } } @@ -393,8 +376,8 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO()) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO()) return try await body(execution, writer, outputSequence, errorSequence) } } @@ -433,12 +416,18 @@ public func run< error: try error.createPipe() ) { (execution, inputIO, outputIO, errorIO) -> RunResult in // Write input, capture output and error in parallel + var inputIOBox: TrackedPlatformDiskIO? = consume inputIO + var outputIOBox: TrackedPlatformDiskIO? = consume outputIO + var errorIOBox: TrackedPlatformDiskIO? = consume errorIO return try await withThrowingTaskGroup( of: OutputCapturingState?.self, returning: RunResult.self ) { group in + var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take() + var outputIOContainer: TrackedPlatformDiskIO? = outputIOBox.take() + var errorIOContainer: TrackedPlatformDiskIO? = errorIOBox.take() group.addTask { - if let writeFd = inputIO { + if let writeFd = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: writeFd) try await input.write(with: writer) try await writer.finish() @@ -447,13 +436,13 @@ public func run< } group.addTask { let stdout = try await output.captureOutput( - from: outputIO + from: outputIOContainer.take() ) return .standardOutputCaptured(stdout) } group.addTask { let stderr = try await error.captureOutput( - from: errorIO + from: errorIOContainer.take() ) return .standardErrorCaptured(stderr) } @@ -514,8 +503,8 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO()) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO()) return try await body(execution, writer, outputSequence, errorSequence) } } diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index deb62db..442e07e 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -15,6 +15,10 @@ @preconcurrency import SystemPackage #endif +#if !os(Windows) +internal import Dispatch +#endif + #if SubprocessSpan @available(SubprocessSpan, *) #endif @@ -22,16 +26,22 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Failure = any Swift.Error public typealias Element = Buffer + #if os(Windows) + internal typealias DiskIO = FileDescriptor + #else + internal typealias DiskIO = DispatchIO + #endif + @_nonSendable public struct Iterator: AsyncIteratorProtocol { public typealias Element = Buffer - private let diskIO: TrackedPlatformDiskIO + private let diskIO: DiskIO private var buffer: [UInt8] private var currentPosition: Int private var finished: Bool - internal init(diskIO: TrackedPlatformDiskIO) { + internal init(diskIO: DiskIO) { self.diskIO = diskIO self.buffer = [] self.currentPosition = 0 @@ -44,16 +54,20 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { ) if data == nil { // We finished reading. Close the file descriptor now - try self.diskIO.safelyClose() + #if os(Windows) + try self.diskIO.close() + #else + self.diskIO.close() + #endif return nil } return data } } - private let diskIO: TrackedPlatformDiskIO + private let diskIO: DiskIO - internal init(diskIO: TrackedPlatformDiskIO) { + internal init(diskIO: DiskIO) { self.diskIO = diskIO } diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 9af704c..d21e7b1 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -66,7 +66,7 @@ public struct Configuration: Sendable { output: consuming CreatedPipe, error: consuming CreatedPipe, isolation: isolated (any Actor)? = #isolation, - _ body: ((Execution, TrackedPlatformDiskIO?, TrackedPlatformDiskIO?, TrackedPlatformDiskIO?) async throws -> Result) + _ body: ((Execution, consuming TrackedPlatformDiskIO?, consuming TrackedPlatformDiskIO?, consuming TrackedPlatformDiskIO?) async throws -> Result) ) async throws -> ExecutionResult { let spawnResults = try self.spawn( withInput: input, @@ -78,15 +78,16 @@ public struct Configuration: Sendable { var spawnResultBox: SpawnResult?? = consume spawnResults return try await withAsyncTaskCleanupHandler { - let _spawnResult = spawnResultBox!.take()! + var _spawnResult = spawnResultBox!.take()! + let inputIO = _spawnResult.inputWriteEnd() + let outputIO = _spawnResult.outputReadEnd() + let errorIO = _spawnResult.errorReadEnd() + let processIdentifier = _spawnResult.execution.processIdentifier async let terminationStatus = try monitorProcessTermination( - forProcessWithIdentifier: _spawnResult.execution.processIdentifier + forProcessWithIdentifier: processIdentifier ) // Body runs in the same isolation - let inputIO = _spawnResult.inputPipe.createInputPlatformDiskIO() - let outputIO = _spawnResult.outputPipe.createOutputPlatformDiskIO() - let errorIO = _spawnResult.errorPipe.createOutputPlatformDiskIO() let result = try await body(_spawnResult.execution, inputIO, outputIO, errorIO) return ExecutionResult( terminationStatus: try await terminationStatus, @@ -133,44 +134,49 @@ extension Configuration: CustomStringConvertible, CustomDebugStringConvertible { extension Configuration { /// Close each input individually, and throw the first error if there's multiple errors thrown @Sendable - internal func cleanupPreSpawn( - input: borrowing CreatedPipe, - output: borrowing CreatedPipe, - error: borrowing CreatedPipe + internal func safelyCloseMultuple( + inputRead: consuming TrackedFileDescriptor?, + inputWrite: consuming TrackedFileDescriptor?, + outputRead: consuming TrackedFileDescriptor?, + outputWrite: consuming TrackedFileDescriptor?, + errorRead: consuming TrackedFileDescriptor?, + errorWrite: consuming TrackedFileDescriptor? ) throws { - var inputError: Swift.Error? - var outputError: Swift.Error? - var errorError: Swift.Error? + var possibleError: (any Swift.Error)? = nil do { - try input.readFileDescriptor?.safelyClose() - try input.writeFileDescriptor?.safelyClose() + try inputRead?.safelyClose() } catch { - inputError = error + possibleError = error } - do { - try output.readFileDescriptor?.safelyClose() - try output.writeFileDescriptor?.safelyClose() + try inputWrite?.safelyClose() } catch { - outputError = error + possibleError = error } - do { - try error.readFileDescriptor?.safelyClose() - try error.writeFileDescriptor?.safelyClose() + try outputRead?.safelyClose() } catch { - errorError = error + possibleError = error } - - if let inputError = inputError { - throw inputError + do { + try outputWrite?.safelyClose() + } catch { + possibleError = error } - if let outputError = outputError { - throw outputError + do { + try errorRead?.safelyClose() + } catch { + possibleError = error } - if let errorError = errorError { - throw errorError + do { + try errorWrite?.safelyClose() + } catch { + possibleError = error + } + + if let actualError = possibleError { + throw actualError } } } @@ -482,11 +488,39 @@ extension Configuration { #if SubprocessSpan @available(SubprocessSpan, *) #endif + /// After Spawn finishes, child side file descriptors + /// (input read, output write, error write) will be closed + /// by `spawn()`. It returns the parent side file descriptors + /// via `SpawnResult` to perform actual reads internal struct SpawnResult: ~Copyable { let execution: Execution - let inputPipe: CreatedPipe - let outputPipe: CreatedPipe - let errorPipe: CreatedPipe + var _inputWriteEnd: TrackedPlatformDiskIO? + var _outputReadEnd: TrackedPlatformDiskIO? + var _errorReadEnd: TrackedPlatformDiskIO? + + init( + execution: Execution, + inputWriteEnd: consuming TrackedPlatformDiskIO?, + outputReadEnd: consuming TrackedPlatformDiskIO?, + errorReadEnd: consuming TrackedPlatformDiskIO? + ) { + self.execution = execution + self._inputWriteEnd = consume inputWriteEnd + self._outputReadEnd = consume outputReadEnd + self._errorReadEnd = consume errorReadEnd + } + + mutating func inputWriteEnd() -> TrackedPlatformDiskIO? { + return self._inputWriteEnd.take() + } + + mutating func outputReadEnd() -> TrackedPlatformDiskIO? { + return self._outputReadEnd.take() + } + + mutating func errorReadEnd() -> TrackedPlatformDiskIO? { + return self._errorReadEnd.take() + } } } @@ -548,8 +582,8 @@ internal enum StringOrRawBytes: Sendable, Hashable { /// A wrapped `FileDescriptor` and whether it should be closed /// automactially when done. -internal struct TrackedFileDescriptor { - internal let closeWhenDone: Bool +internal struct TrackedFileDescriptor: ~Copyable { + internal var closeWhenDone: Bool internal let fileDescriptor: FileDescriptor internal init( @@ -560,10 +594,21 @@ internal struct TrackedFileDescriptor { self.closeWhenDone = closeWhenDone } - internal func safelyClose() throws { + #if os(Windows) + consuming func consumeDiskIO() -> FileDescriptor { + let result = self.fileDescriptor + // Transfer the ownership out and therefor + // don't perform close on deinit + self.closeWhenDone = false + return result + } + #endif + + internal mutating func safelyClose() throws { guard self.closeWhenDone else { return } + closeWhenDone = false do { try fileDescriptor.close() @@ -571,13 +616,51 @@ internal struct TrackedFileDescriptor { guard let errno: Errno = error as? Errno else { throw error } - if errno != .badFileDescriptor { - throw errno + // Getting `.badFileDescriptor` suggests that the file descriptor + // might have been closed unexpectedly. This can pose security risks + // if another part of the code inadvertently reuses the same file descriptor + // number. This problem is especially concerning on Unix systems due to POSIX’s + // guarantee of using the lowest available file descriptor number, making reuse + // more probable. We use `preconditionFailure` upon receiving `.badFileDescriptor` + // to prevent accidentally closing a different file descriptor. + guard errno != .badFileDescriptor else { + preconditionFailure( + "FileDescriptor \(fileDescriptor.rawValue) is already closed" + ) } + // Throw other kinds of errors to allow user to catch them + throw error } } - internal var platformDescriptor: PlatformFileDescriptor { + deinit { + guard self.closeWhenDone else { + return + } + + do { + try fileDescriptor.close() + } catch { + guard let errno: Errno = error as? Errno else { + return + } + // Getting `.badFileDescriptor` suggests that the file descriptor + // might have been closed unexpectedly. This can pose security risks + // if another part of the code inadvertently reuses the same file descriptor + // number. This problem is especially concerning on Unix systems due to POSIX’s + // guarantee of using the lowest available file descriptor number, making reuse + // more probable. We use `preconditionFailure` upon receiving `.badFileDescriptor` + // to prevent accidentally closing a different file descriptor. + guard errno != .badFileDescriptor else { + preconditionFailure( + "FileDescriptor \(fileDescriptor.rawValue) is already closed" + ) + } + // Otherwise ignore the error + } + } + + internal func platformDescriptor() -> PlatformFileDescriptor { return self.fileDescriptor.platformDescriptor } } @@ -585,9 +668,9 @@ internal struct TrackedFileDescriptor { #if !os(Windows) /// A wrapped `DispatchIO` and whether it should be closed /// automactially when done. -internal struct TrackedDispatchIO { - internal let closeWhenDone: Bool - internal let dispatchIO: DispatchIO +internal struct TrackedDispatchIO: ~Copyable { + internal var closeWhenDone: Bool + internal var dispatchIO: DispatchIO internal init( _ dispatchIO: DispatchIO, @@ -597,39 +680,59 @@ internal struct TrackedDispatchIO { self.closeWhenDone = closeWhenDone } - internal func safelyClose() throws { + consuming func consumeDiskIO() -> DispatchIO { + let result = self.dispatchIO + // Transfer the ownership out and therefor + // don't perform close on deinit + self.closeWhenDone = false + return result + } + + internal mutating func safelyClose() throws { guard self.closeWhenDone else { return } - + closeWhenDone = false dispatchIO.close() } - internal var platformDescriptor: PlatformFileDescriptor { - return self.dispatchIO.fileDescriptor + deinit { + guard self.closeWhenDone else { + return + } + + dispatchIO.close() } } #endif internal struct CreatedPipe: ~Copyable { - internal let readFileDescriptor: TrackedFileDescriptor? - internal let writeFileDescriptor: TrackedFileDescriptor? + internal var _readFileDescriptor: TrackedFileDescriptor? + internal var _writeFileDescriptor: TrackedFileDescriptor? internal init( - readFileDescriptor: TrackedFileDescriptor?, - writeFileDescriptor: TrackedFileDescriptor? + readFileDescriptor: consuming TrackedFileDescriptor?, + writeFileDescriptor: consuming TrackedFileDescriptor? ) { - self.readFileDescriptor = readFileDescriptor - self.writeFileDescriptor = writeFileDescriptor + self._readFileDescriptor = readFileDescriptor + self._writeFileDescriptor = writeFileDescriptor + } + + mutating func readFileDescriptor() -> TrackedFileDescriptor? { + return self._readFileDescriptor.take() + } + + mutating func writeFileDescriptor() -> TrackedFileDescriptor? { + return self._writeFileDescriptor.take() } internal init(closeWhenDone: Bool) throws { let pipe = try FileDescriptor.ssp_pipe() - self.readFileDescriptor = .init( + self._readFileDescriptor = .init( pipe.readEnd, closeWhenDone: closeWhenDone ) - self.writeFileDescriptor = .init( + self._writeFileDescriptor = .init( pipe.writeEnd, closeWhenDone: closeWhenDone ) diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index 5c4e7a8..65e5da8 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -212,9 +212,9 @@ extension InputProtocol { /// A writer that writes to the standard input of the subprocess. public final actor StandardInputWriter: Sendable { - internal let diskIO: TrackedPlatformDiskIO + internal var diskIO: TrackedPlatformDiskIO - init(diskIO: TrackedPlatformDiskIO) { + init(diskIO: consuming TrackedPlatformDiskIO) { self.diskIO = diskIO } @@ -258,23 +258,6 @@ public final actor StandardInputWriter: Sendable { } } - -// MARK: - InputPipe -internal struct InputPipe { - // On Darwin and Linux, parent end (write end) should be - // wrapped as `DispatchIO` for writing - internal let readEnd: TrackedFileDescriptor? - internal let writeEnd: TrackedPlatformDiskIO? - - internal init( - readEnd: TrackedFileDescriptor?, - writeEnd: TrackedPlatformDiskIO? - ) { - self.readEnd = readEnd - self.writeEnd = writeEnd - } -} - extension StringProtocol { #if SubprocessFoundation private func convertEncoding( diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index b18f493..dae7ede 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -159,14 +159,16 @@ public struct BytesOutput: OutputProtocol { public let maxSize: Int internal func captureOutput( - from diskIO: TrackedPlatformDiskIO? + from diskIO: consuming TrackedPlatformDiskIO? ) async throws -> [UInt8] { + var diskIOBox: TrackedPlatformDiskIO? = consume diskIO return try await withCheckedThrowingContinuation { continuation in - guard let diskIO = diskIO else { + let _diskIO = diskIOBox.take() + guard let _diskIO = _diskIO else { // Show not happen due to type system constraints fatalError("Trying to capture output without file descriptor") } - diskIO.readUntilEOF(upToLength: self.maxSize) { result in + _diskIO.readUntilEOF(upToLength: self.maxSize) { result in switch result { case .success(let data): // FIXME: remove workaround for @@ -288,21 +290,6 @@ extension OutputProtocol { } #endif -// MARK: - OutputPipe -internal struct OutputPipe { - // On Darwin and Linux, parent end (read end) should be - // wrapped as `DispatchIO` for reading - internal let readEnd: TrackedPlatformDiskIO? - internal let writeEnd: TrackedFileDescriptor? - - internal init( - readEnd: TrackedPlatformDiskIO?, - writeEnd: TrackedFileDescriptor? - ) { - self.readEnd = readEnd - self.writeEnd = writeEnd - } -} // MARK: - Default Implementations #if SubprocessSpan @@ -323,22 +310,23 @@ extension OutputProtocol { /// Capture the output from the subprocess up to maxSize @_disfavoredOverload internal func captureOutput( - from diskIO: TrackedPlatformDiskIO? + from diskIO: consuming TrackedPlatformDiskIO? ) async throws -> OutputType { if let bytesOutput = self as? BytesOutput { return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType } + var diskIOBox: TrackedPlatformDiskIO? = consume diskIO return try await withCheckedThrowingContinuation { continuation in if OutputType.self == Void.self { continuation.resume(returning: () as! OutputType) return } - guard let diskIO = diskIO else { + guard let _diskIO = diskIOBox.take() else { // Show not happen due to type system constraints fatalError("Trying to capture output without file descriptor") } - diskIO.readUntilEOF(upToLength: self.maxSize) { result in + _diskIO.readUntilEOF(upToLength: self.maxSize) { result in do { switch result { case .success(let data): @@ -362,7 +350,7 @@ extension OutputProtocol { @available(SubprocessSpan, *) #endif extension OutputProtocol where OutputType == Void { - internal func captureOutput(from fileDescriptor: TrackedPlatformDiskIO?) async throws {} + internal func captureOutput(from fileDescriptor: consuming TrackedPlatformDiskIO?) async throws {} #if SubprocessSpan /// Convert the output from Data to expected output type diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index d46be0d..9a56106 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -166,15 +166,22 @@ extension Configuration { let possiblePaths = self.executable.possibleExecutablePaths( withPathValue: self.environment.pathValue() ) - var inputPipeBox: CreatedPipe?? = consume inputPipe - var outputPipeBox: CreatedPipe?? = consume outputPipe - var errorPipeBox: CreatedPipe?? = consume errorPipe + var inputPipeBox: CreatedPipe? = consume inputPipe + var outputPipeBox: CreatedPipe? = consume outputPipe + var errorPipeBox: CreatedPipe? = consume errorPipe return try self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args - let _inputPipe = inputPipeBox!.take()! - let _outputPipe = outputPipeBox!.take()! - let _errorPipe = errorPipeBox!.take()! + var _inputPipe = inputPipeBox.take()! + var _outputPipe = outputPipeBox.take()! + var _errorPipe = errorPipeBox.take()! + + let inputReadFileDescriptor: TrackedFileDescriptor? = _inputPipe.readFileDescriptor() + let inputWriteFileDescriptor: TrackedFileDescriptor? = _inputPipe.writeFileDescriptor() + let outputReadFileDescriptor: TrackedFileDescriptor? = _outputPipe.readFileDescriptor() + let outputWriteFileDescriptor: TrackedFileDescriptor? = _outputPipe.writeFileDescriptor() + let errorReadFileDescriptor: TrackedFileDescriptor? = _errorPipe.readFileDescriptor() + let errorWriteFileDescriptor: TrackedFileDescriptor? = _errorPipe.writeFileDescriptor() for possibleExecutablePath in possiblePaths { var pid: pid_t = 0 @@ -198,21 +205,38 @@ extension Configuration { // Input var result: Int32 = -1 - if let inputRead = _inputPipe.readFileDescriptor { - result = posix_spawn_file_actions_adddup2(&fileActions, inputRead.platformDescriptor, 0) + if inputReadFileDescriptor != nil { + result = posix_spawn_file_actions_adddup2( + &fileActions, inputReadFileDescriptor!.platformDescriptor(), 0) guard result == 0 else { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) ) } } - if let inputWrite = _inputPipe.writeFileDescriptor { + if inputWriteFileDescriptor != nil { // Close parent side - result = posix_spawn_file_actions_addclose(&fileActions, inputWrite.platformDescriptor) + result = posix_spawn_file_actions_addclose( + &fileActions, inputWriteFileDescriptor!.platformDescriptor() + ) guard result == 0 else { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) @@ -220,21 +244,39 @@ extension Configuration { } } // Output - if let outputWrite = _outputPipe.writeFileDescriptor { - result = posix_spawn_file_actions_adddup2(&fileActions, outputWrite.platformDescriptor, 1) + if outputWriteFileDescriptor != nil { + result = posix_spawn_file_actions_adddup2( + &fileActions, outputWriteFileDescriptor!.platformDescriptor(), 1 + ) guard result == 0 else { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) ) } } - if let outputRead = _outputPipe.readFileDescriptor { + if outputReadFileDescriptor != nil { // Close parent side - result = posix_spawn_file_actions_addclose(&fileActions, outputRead.platformDescriptor) + result = posix_spawn_file_actions_addclose( + &fileActions, outputReadFileDescriptor!.platformDescriptor() + ) guard result == 0 else { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) @@ -242,21 +284,39 @@ extension Configuration { } } // Error - if let errorWrite = _errorPipe.writeFileDescriptor { - result = posix_spawn_file_actions_adddup2(&fileActions, errorWrite.platformDescriptor, 2) + if errorWriteFileDescriptor != nil { + result = posix_spawn_file_actions_adddup2( + &fileActions, errorWriteFileDescriptor!.platformDescriptor(), 2 + ) guard result == 0 else { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) ) } } - if let errorRead = _errorPipe.readFileDescriptor { + if errorReadFileDescriptor != nil { // Close parent side - result = posix_spawn_file_actions_addclose(&fileActions, errorRead.platformDescriptor) + result = posix_spawn_file_actions_addclose( + &fileActions, errorReadFileDescriptor!.platformDescriptor() + ) guard result == 0 else { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: result) @@ -299,20 +359,28 @@ extension Configuration { // Error handling if chdirError != 0 || spawnAttributeError != 0 { - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) + + let error: SubprocessError if spawnAttributeError != 0 { - throw SubprocessError( + error = SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: spawnAttributeError) ) - } - - if chdirError != 0 { - throw SubprocessError( + } else { + error = SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: spawnAttributeError) ) } + throw error } // Run additional config if let spawnConfig = self.platformOptions.preSpawnProcessConfigurator { @@ -344,10 +412,13 @@ extension Configuration { continue } // Throw all other errors - try self.cleanupPreSpawn( - input: _inputPipe, - output: _outputPipe, - error: _errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) throw SubprocessError( code: .init(.spawnFailed), @@ -356,43 +427,23 @@ extension Configuration { } // After spawn finishes, close all child side fds - var inputCloseError: (any Swift.Error)? = nil - do { - try _inputPipe.readFileDescriptor?.safelyClose() - } catch { - inputCloseError = error - } - var outputCloseError: (any Swift.Error)? = nil - do { - try _outputPipe.writeFileDescriptor?.safelyClose() - } catch { - outputCloseError = error - } - var errorCloseError: (any Swift.Error)? = nil - do { - try _errorPipe.writeFileDescriptor?.safelyClose() - } catch { - errorCloseError = error - } - - if let inputCloseError = inputCloseError { - throw inputCloseError - } - if let outputCloseError = outputCloseError { - throw outputCloseError - } - if let errorCloseError = errorCloseError { - throw errorCloseError - } + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: nil, + outputRead: nil, + outputWrite: outputWriteFileDescriptor, + errorRead: nil, + errorWrite: errorWriteFileDescriptor + ) let execution = Execution( processIdentifier: .init(value: pid) ) return SpawnResult( execution: execution, - inputPipe: _inputPipe, - outputPipe: _outputPipe, - errorPipe: _errorPipe + inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(), + outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(), + errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO() ) } @@ -401,7 +452,14 @@ extension Configuration { // provide which one is not valid, here we make a best effort guess // by checking whether the working directory is valid. This technically // still causes TOUTOC issue, but it's the best we can do for error recovery. - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) let workingDirectory = self.workingDirectory.string guard Configuration.pathAccessible(workingDirectory, mode: F_OK) else { throw SubprocessError( diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 8997dff..a488a52 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -47,16 +47,23 @@ extension Configuration { let possiblePaths = self.executable.possibleExecutablePaths( withPathValue: self.environment.pathValue() ) - var inputPipeBox: CreatedPipe?? = consume inputPipe - var outputPipeBox: CreatedPipe?? = consume outputPipe - var errorPipeBox: CreatedPipe?? = consume errorPipe + var inputPipeBox: CreatedPipe? = consume inputPipe + var outputPipeBox: CreatedPipe? = consume outputPipe + var errorPipeBox: CreatedPipe? = consume errorPipe return try self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args - let _inputPipe = inputPipeBox!.take()! - let _outputPipe = outputPipeBox!.take()! - let _errorPipe = errorPipeBox!.take()! + var _inputPipe = inputPipeBox.take()! + var _outputPipe = outputPipeBox.take()! + var _errorPipe = errorPipeBox.take()! + + let inputReadFileDescriptor: TrackedFileDescriptor? = _inputPipe.readFileDescriptor() + let inputWriteFileDescriptor: TrackedFileDescriptor? = _inputPipe.writeFileDescriptor() + let outputReadFileDescriptor: TrackedFileDescriptor? = _outputPipe.readFileDescriptor() + let outputWriteFileDescriptor: TrackedFileDescriptor? = _outputPipe.writeFileDescriptor() + let errorReadFileDescriptor: TrackedFileDescriptor? = _errorPipe.readFileDescriptor() + let errorWriteFileDescriptor: TrackedFileDescriptor? = _errorPipe.writeFileDescriptor() for possibleExecutablePath in possiblePaths { var processGroupIDPtr: UnsafeMutablePointer? = nil @@ -73,12 +80,12 @@ extension Configuration { } // Setup input let fileDescriptors: [CInt] = [ - _inputPipe.readFileDescriptor?.platformDescriptor ?? -1, - _inputPipe.writeFileDescriptor?.platformDescriptor ?? -1, - _outputPipe.writeFileDescriptor?.platformDescriptor ?? -1, - _outputPipe.readFileDescriptor?.platformDescriptor ?? -1, - _errorPipe.writeFileDescriptor?.platformDescriptor ?? -1, - _errorPipe.readFileDescriptor?.platformDescriptor ?? -1, + inputReadFileDescriptor?.platformDescriptor() ?? -1, + inputWriteFileDescriptor?.platformDescriptor() ?? -1, + outputWriteFileDescriptor?.platformDescriptor() ?? -1, + outputReadFileDescriptor?.platformDescriptor() ?? -1, + errorWriteFileDescriptor?.platformDescriptor() ?? -1, + errorWriteFileDescriptor?.platformDescriptor() ?? -1, ] let workingDirectory: String = self.workingDirectory.string @@ -114,10 +121,13 @@ extension Configuration { continue } // Throw all other errors - try self.cleanupPreSpawn( - input: _inputPipe, - output: _outputPipe, - error: _errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) throw SubprocessError( code: .init(.spawnFailed), @@ -133,43 +143,23 @@ extension Configuration { } } // After spawn finishes, close all child side fds - var inputCloseError: (any Swift.Error)? = nil - do { - try _inputPipe.readFileDescriptor?.safelyClose() - } catch { - inputCloseError = error - } - var outputCloseError: (any Swift.Error)? = nil - do { - try _outputPipe.writeFileDescriptor?.safelyClose() - } catch { - outputCloseError = error - } - var errorCloseError: (any Swift.Error)? = nil - do { - try _errorPipe.writeFileDescriptor?.safelyClose() - } catch { - errorCloseError = error - } - - if let inputCloseError = inputCloseError { - throw inputCloseError - } - if let outputCloseError = outputCloseError { - throw outputCloseError - } - if let errorCloseError = errorCloseError { - throw errorCloseError - } + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: nil, + outputRead: nil, + outputWrite: outputWriteFileDescriptor, + errorRead: nil, + errorWrite: errorWriteFileDescriptor + ) let execution = Execution( processIdentifier: .init(value: pid) ) return SpawnResult( execution: execution, - inputPipe: _inputPipe, - outputPipe: _outputPipe, - errorPipe: _errorPipe + inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(), + outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(), + errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO() ) } @@ -178,7 +168,14 @@ extension Configuration { // provide which one is not valid, here we make a best effort guess // by checking whether the working directory is valid. This technically // still causes TOUTOC issue, but it's the best we can do for error recovery. - try self.cleanupPreSpawn(input: _inputPipe, output: _outputPipe, error: _errorPipe) + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor + ) let workingDirectory = self.workingDirectory.string guard Configuration.pathAccessible(workingDirectory, mode: F_OK) else { throw SubprocessError( diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 97f5454..d61df5a 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -392,53 +392,32 @@ extension FileDescriptor { internal typealias PlatformFileDescriptor = CInt internal typealias TrackedPlatformDiskIO = TrackedDispatchIO -extension CreatedPipe { - internal func createInputPlatformDiskIO() -> TrackedPlatformDiskIO? { - if let writeFileDescriptor = self.writeFileDescriptor { - let dispatchIO: DispatchIO = DispatchIO( - type: .stream, - fileDescriptor: writeFileDescriptor.platformDescriptor, - queue: .global(), - cleanupHandler: { error in - // Close the file descriptor - if writeFileDescriptor.closeWhenDone { - try? writeFileDescriptor.safelyClose() - } +extension TrackedFileDescriptor { + internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { + let dispatchIO: DispatchIO = DispatchIO( + type: .stream, + fileDescriptor: self.platformDescriptor(), + queue: .global(), + cleanupHandler: { error in + // Close the file descriptor + if self.closeWhenDone { + try? self.safelyClose() } - ) - return .init(dispatchIO, closeWhenDone: writeFileDescriptor.closeWhenDone) - } - return nil - } - - internal func createOutputPlatformDiskIO() -> TrackedPlatformDiskIO? { - if let readFileDescriptor = self.readFileDescriptor { - let dispatchIO: DispatchIO = DispatchIO( - type: .stream, - fileDescriptor: readFileDescriptor.platformDescriptor, - queue: .global(), - cleanupHandler: { error in - // Close the file descriptor - if readFileDescriptor.closeWhenDone { - try? readFileDescriptor.safelyClose() - } - } - ) - return .init(dispatchIO, closeWhenDone: readFileDescriptor.closeWhenDone) - } - return nil + } + ) + return .init(dispatchIO, closeWhenDone: self.closeWhenDone) } } // MARK: - TrackedDispatchIO extensions -extension TrackedDispatchIO { -#if SubprocessSpan +extension DispatchIO { + #if SubprocessSpan @available(SubprocessSpan, *) -#endif - package func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { + #endif + internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { return try await withCheckedThrowingContinuation { continuation in var buffer: DispatchData = .empty - self.dispatchIO.read( + self.read( offset: 0, length: maxLength, queue: .global() @@ -469,8 +448,13 @@ extension TrackedDispatchIO { } } } +} - internal func readUntilEOF( +extension TrackedDispatchIO { + #if SubprocessSpan + @available(SubprocessSpan, *) + #endif + internal consuming func readUntilEOF( upToLength maxLength: Int, resultHandler: sending @escaping (Swift.Result) -> Void ) { diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 257b2c7..2e80677 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -25,10 +25,10 @@ extension Configuration { @available(SubprocessSpan, *) #endif internal func spawn( - withInput inputPipe: CreatedPipe, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe - ) throws -> Execution { + withInput inputPipe: consuming CreatedPipe, + outputPipe: consuming CreatedPipe, + errorPipe: consuming CreatedPipe + ) throws -> SpawnResult { // Spawn differently depending on whether // we need to spawn as a user guard let userCredentials = self.platformOptions.userCredentials else { @@ -47,20 +47,31 @@ extension Configuration { } internal func spawnDirect( - withInput inputPipe: CreatedPipe, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe - ) throws -> Execution { + withInput inputPipe: consuming CreatedPipe, + outputPipe: consuming CreatedPipe, + errorPipe: consuming CreatedPipe + ) throws -> SpawnResult { let ( applicationName, commandAndArgs, environment, intendedWorkingDir ) = try self.preSpawn() + + var inputReadFileDescriptor: TrackedFileDescriptor? = inputPipe.readFileDescriptor() + var inputWriteFileDescriptor: TrackedFileDescriptor? = inputPipe.writeFileDescriptor() + var outputReadFileDescriptor: TrackedFileDescriptor? = outputPipe.readFileDescriptor() + var outputWriteFileDescriptor: TrackedFileDescriptor? = outputPipe.writeFileDescriptor() + var errorReadFileDescriptor: TrackedFileDescriptor? = errorPipe.readFileDescriptor() + var errorWriteFileDescriptor: TrackedFileDescriptor? = errorPipe.writeFileDescriptor() + var startupInfo = try self.generateStartupInfo( - withInput: inputPipe, - output: outputPipe, - error: errorPipe + withInputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) var processInfo: PROCESS_INFORMATION = PROCESS_INFORMATION() var createProcessFlags = self.generateCreateProcessFlag() @@ -91,10 +102,13 @@ extension Configuration { ) guard created else { let windowsError = GetLastError() - try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor.take(), + inputWrite: inputWriteFileDescriptor.take(), + outputRead: outputReadFileDescriptor.take(), + outputWrite: outputWriteFileDescriptor.take(), + errorRead: errorReadFileDescriptor.take(), + errorWrite: errorWriteFileDescriptor.take() ) throw SubprocessError( code: .init(.spawnFailed), @@ -108,10 +122,13 @@ extension Configuration { // We don't need the handle objects, so close it right away guard CloseHandle(processInfo.hThread) else { let windowsError = GetLastError() - try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) throw SubprocessError( code: .init(.spawnFailed), @@ -120,69 +137,71 @@ extension Configuration { } guard CloseHandle(processInfo.hProcess) else { let windowsError = GetLastError() - try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: windowsError) ) } + + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: nil, + outputRead: nil, + outputWrite: outputWriteFileDescriptor, + errorRead: nil, + errorWrite: errorWriteFileDescriptor + ) + let pid = ProcessIdentifier( value: processInfo.dwProcessId ) - - func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { - do { - try work() - return nil - } catch { - return error - } - } - // After spawn finishes, close all child side fds - let inputCloseError = captureError { - try inputPipe.readFileDescriptor?.safelyClose() - } - let outputCloseError = captureError { - try outputPipe.writeFileDescriptor?.safelyClose() - } - let errorCloseError = captureError { - try errorPipe.writeFileDescriptor?.safelyClose() - } - if let inputCloseError = inputCloseError { - throw inputCloseError - } - if let outputCloseError = outputCloseError { - throw outputCloseError - } - if let errorCloseError = errorCloseError { - throw errorCloseError - } - return Execution( + let execution = Execution( processIdentifier: pid, consoleBehavior: self.platformOptions.consoleBehavior ) + return SpawnResult( + execution: execution, + inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(), + outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(), + errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO() + ) } internal func spawnAsUser( - withInput inputPipe: CreatedPipe, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe, + withInput inputPipe: consuming CreatedPipe, + outputPipe: consuming CreatedPipe, + errorPipe: consuming CreatedPipe, userCredentials: PlatformOptions.UserCredentials - ) throws -> Execution { + ) throws -> SpawnResult { let ( applicationName, commandAndArgs, environment, intendedWorkingDir ) = try self.preSpawn() + + var inputReadFileDescriptor: TrackedFileDescriptor? = inputPipe.readFileDescriptor() + var inputWriteFileDescriptor: TrackedFileDescriptor? = inputPipe.writeFileDescriptor() + var outputReadFileDescriptor: TrackedFileDescriptor? = outputPipe.readFileDescriptor() + var outputWriteFileDescriptor: TrackedFileDescriptor? = outputPipe.writeFileDescriptor() + var errorReadFileDescriptor: TrackedFileDescriptor? = errorPipe.readFileDescriptor() + var errorWriteFileDescriptor: TrackedFileDescriptor? = errorPipe.writeFileDescriptor() + var startupInfo = try self.generateStartupInfo( - withInput: inputPipe, - output: outputPipe, - error: errorPipe + withInputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) var processInfo: PROCESS_INFORMATION = PROCESS_INFORMATION() var createProcessFlags = self.generateCreateProcessFlag() @@ -223,10 +242,13 @@ extension Configuration { ) guard created else { let windowsError = GetLastError() - try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor.take(), + inputWrite: inputWriteFileDescriptor.take(), + outputRead: outputReadFileDescriptor.take(), + outputWrite: outputWriteFileDescriptor.take(), + errorRead: errorReadFileDescriptor.take(), + errorWrite: errorWriteFileDescriptor.take() ) throw SubprocessError( code: .init(.spawnFailed), @@ -243,10 +265,13 @@ extension Configuration { // We don't need the handle objects, so close it right away guard CloseHandle(processInfo.hThread) else { let windowsError = GetLastError() - try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) throw SubprocessError( code: .init(.spawnFailed), @@ -255,50 +280,43 @@ extension Configuration { } guard CloseHandle(processInfo.hProcess) else { let windowsError = GetLastError() - try self.cleanupPreSpawn( - input: inputPipe, - output: outputPipe, - error: errorPipe + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: inputWriteFileDescriptor, + outputRead: outputReadFileDescriptor, + outputWrite: outputWriteFileDescriptor, + errorRead: errorReadFileDescriptor, + errorWrite: errorWriteFileDescriptor ) throw SubprocessError( code: .init(.spawnFailed), underlyingError: .init(rawValue: windowsError) ) } + + // After spawn finishes, close all child side fds + try self.safelyCloseMultuple( + inputRead: inputReadFileDescriptor, + inputWrite: nil, + outputRead: nil, + outputWrite: outputWriteFileDescriptor, + errorRead: nil, + errorWrite: errorWriteFileDescriptor + ) + let pid = ProcessIdentifier( value: processInfo.dwProcessId ) - func captureError(_ work: () throws -> Void) -> (any Swift.Error)? { - do { - try work() - return nil - } catch { - return error - } - } - // After spawn finishes, close all child side fds - let inputCloseError = captureError { - try inputPipe.readFileDescriptor?.safelyClose() - } - let outputCloseError = captureError { - try outputPipe.writeFileDescriptor?.safelyClose() - } - let errorCloseError = captureError { - try errorPipe.writeFileDescriptor?.safelyClose() - } - if let inputCloseError = inputCloseError { - throw inputCloseError - } - if let outputCloseError = outputCloseError { - throw outputCloseError - } - if let errorCloseError = errorCloseError { - throw errorCloseError - } - return Execution( + let execution = Execution( processIdentifier: pid, consoleBehavior: self.platformOptions.consoleBehavior ) + return SpawnResult( + execution: execution, + inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(), + outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(), + errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO() + ) } } @@ -826,9 +844,12 @@ extension Configuration { } private func generateStartupInfo( - withInput input: CreatedPipe, - output: CreatedPipe, - error: CreatedPipe + withInputRead inputReadFileDescriptor: borrowing TrackedFileDescriptor?, + inputWrite inputWriteFileDescriptor: borrowing TrackedFileDescriptor?, + outputRead outputReadFileDescriptor: borrowing TrackedFileDescriptor?, + outputWrite outputWriteFileDescriptor: borrowing TrackedFileDescriptor?, + errorRead errorReadFileDescriptor: borrowing TrackedFileDescriptor?, + errorWrite errorWriteFileDescriptor: borrowing TrackedFileDescriptor?, ) throws -> STARTUPINFOW { var info: STARTUPINFOW = STARTUPINFOW() info.cb = DWORD(MemoryLayout.size) @@ -840,37 +861,37 @@ extension Configuration { } // Bind IOs // Input - if let inputRead = input.readFileDescriptor { - info.hStdInput = inputRead.platformDescriptor + if inputReadFileDescriptor != nil { + info.hStdInput = inputReadFileDescriptor!.platformDescriptor() } - if let inputWrite = input.writeFileDescriptor { + if inputWriteFileDescriptor != nil { // Set parent side to be uninhertable SetHandleInformation( - inputWrite.platformDescriptor, + inputWriteFileDescriptor!.platformDescriptor(), DWORD(HANDLE_FLAG_INHERIT), 0 ) } // Output - if let outputWrite = output.writeFileDescriptor { - info.hStdOutput = outputWrite.platformDescriptor + if outputWriteFileDescriptor != nil { + info.hStdOutput = outputWriteFileDescriptor!.platformDescriptor() } - if let outputRead = output.readFileDescriptor { + if outputReadFileDescriptor != nil { // Set parent side to be uninhertable SetHandleInformation( - outputRead.platformDescriptor, + outputReadFileDescriptor!.platformDescriptor(), DWORD(HANDLE_FLAG_INHERIT), 0 ) } // Error - if let errorWrite = error.writeFileDescriptor { - info.hStdError = errorWrite.platformDescriptor + if errorWriteFileDescriptor != nil { + info.hStdError = errorWriteFileDescriptor!.platformDescriptor() } - if let errorRead = error.readFileDescriptor { + if errorReadFileDescriptor != nil { // Set parent side to be uninhertable SetHandleInformation( - errorRead.platformDescriptor, + errorReadFileDescriptor!.platformDescriptor(), DWORD(HANDLE_FLAG_INHERIT), 0 ) @@ -1049,19 +1070,8 @@ extension FileDescriptor { } } -extension CreatedPipe { - /// On Windows, we use file descriptors directly - internal func createInputPlatformDiskIO() -> TrackedPlatformDiskIO? { - return self.writeFileDescriptor - } - - internal func createOutputPlatformDiskIO() -> TrackedPlatformDiskIO? { - return self.readFileDescriptor - } -} - -extension TrackedFileDescriptor { - internal func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { +extension FileDescriptor { + internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { return try await withCheckedThrowingContinuation { continuation in self.readUntilEOF( upToLength: maxLength @@ -1070,7 +1080,7 @@ extension TrackedFileDescriptor { case .failure(let error): continuation.resume(throwing: error) case .success(let bytes): - continuation.resume(returning: SequenceOutput.Buffer(data: bytes)) + continuation.resume(returning: AsyncBufferSequence.Buffer(data: bytes)) } } } @@ -1094,7 +1104,7 @@ extension TrackedFileDescriptor { let bufferPtr = baseAddress.advanced(by: totalBytesRead) var bytesRead: DWORD = 0 let readSucceed = ReadFile( - self.fileDescriptor.platformDescriptor, + self.platformDescriptor, UnsafeMutableRawPointer(mutating: bufferPtr), DWORD(maxLength - totalBytesRead), &bytesRead, @@ -1134,16 +1144,39 @@ extension TrackedFileDescriptor { } } } +} + +extension TrackedFileDescriptor { + internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { + return .init( + self.fileDescriptor, + closeWhenDone: self.closeWhenDone + ) + } + + internal func readUntilEOF( + upToLength maxLength: Int, + resultHandler: @Sendable @escaping (Swift.Result<[UInt8], any (Error & Sendable)>) -> Void + ) { + self.fileDescriptor.readUntilEOF( + upToLength: maxLength, + resultHandler: resultHandler + ) + } #if SubprocessSpan @available(SubprocessSpan, *) internal func write( _ span: borrowing RawSpan ) async throws -> Int { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let fileDescriptor = self.fileDescriptor + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in span.withUnsafeBytes { ptr in // TODO: Use WriteFileEx for asyc here - self.write(ptr) { writtenLength, error in + Self.write( + ptr, + to: fileDescriptor + ) { writtenLength, error in if let error = error { continuation.resume(throwing: error) } else { @@ -1160,9 +1193,13 @@ extension TrackedFileDescriptor { ) async throws -> Int { try await withCheckedThrowingContinuation { continuation in // TODO: Figure out a better way to asynchornously write + let fd = self.fileDescriptor DispatchQueue.global(qos: .userInitiated).async { array.withUnsafeBytes { - self.write($0) { writtenLength, error in + Self.write( + $0, + to: fd + ) { writtenLength, error in if let error = error { continuation.resume(throwing: error) } else { @@ -1174,13 +1211,15 @@ extension TrackedFileDescriptor { } } - internal func write( + internal static func write( _ ptr: UnsafeRawBufferPointer, + to fileDescriptor: FileDescriptor, completion: @escaping (Int, Swift.Error?) -> Void ) { + let handle = HANDLE(bitPattern: _get_osfhandle(fileDescriptor.rawValue))! var writtenBytes: DWORD = 0 let writeSucceed = WriteFile( - self.fileDescriptor.platformDescriptor, + handle, ptr.baseAddress, DWORD(ptr.count), &writtenBytes, diff --git a/Sources/Subprocess/Span+Subprocess.swift b/Sources/Subprocess/Span+Subprocess.swift index 14d47aa..366b271 100644 --- a/Sources/Subprocess/Span+Subprocess.swift +++ b/Sources/Subprocess/Span+Subprocess.swift @@ -39,6 +39,19 @@ public func _overrideLifetime< dependent } +@available(SubprocessSpan, *) +extension Span where Element: BitwiseCopyable { + + internal var _bytes: RawSpan { + @lifetime(copy self) + @_alwaysEmitIntoClient + get { + let rawSpan = RawSpan(_elements: self) + return _overrideLifetime(of: rawSpan, copyingFrom: self) + } + } +} + #if canImport(Glibc) || canImport(Bionic) || canImport(Musl) internal import Dispatch diff --git a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift index 3a18818..39a53ce 100644 --- a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift @@ -133,11 +133,15 @@ extension TrackedFileDescriptor { internal func write( _ data: Data ) async throws -> Int { - try await withCheckedThrowingContinuation { continuation in + let fileDescriptor = self.fileDescriptor + return try await withCheckedThrowingContinuation { continuation in // TODO: Figure out a better way to asynchornously write DispatchQueue.global(qos: .userInitiated).async { data.withUnsafeBytes { - self.write($0) { writtenLength, error in + Self.write( + $0, + to: fileDescriptor + ) { writtenLength, error in if let error = error { continuation.resume(throwing: error) } else { From 7c35edb83a85bb466fca3d4fcba90425cf799b41 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 14 May 2025 14:56:27 -0700 Subject: [PATCH 4/4] Removed support for Swift 6.0. Now Subprocess requires Swift 6.1.0 and above --- Package@swift-6.0.swift | 76 -------------------------- README.md | 4 +- Sources/Subprocess/Configuration.swift | 8 +-- 3 files changed, 6 insertions(+), 82 deletions(-) delete mode 100644 Package@swift-6.0.swift diff --git a/Package@swift-6.0.swift b/Package@swift-6.0.swift deleted file mode 100644 index b22dfcb..0000000 --- a/Package@swift-6.0.swift +++ /dev/null @@ -1,76 +0,0 @@ -// swift-tools-version: 6.0 -// The swift-tools-version declares the minimum version of Swift required to build this package. - -import PackageDescription - -let availabilityMacro: SwiftSetting = .enableExperimentalFeature( - "AvailabilityMacro=SubprocessSpan: macOS 9999" -) - -let package = Package( - name: "Subprocess", - platforms: [.macOS(.v13)], - products: [ - .library( - name: "Subprocess", - targets: ["Subprocess"] - ) - ], - dependencies: [ - .package( - url: "https://github.com/apple/swift-system", - from: "1.4.2" - ), - .package( - url: "https://github.com/apple/swift-docc-plugin", - from: "1.4.3" - ), - ], - targets: [ - .target( - name: "Subprocess", - dependencies: [ - "_SubprocessCShims", - .product(name: "SystemPackage", package: "swift-system"), - ], - path: "Sources/Subprocess", - exclude: [ - "Span+Subprocess.swift", - "SubprocessFoundation/Span+SubprocessFoundation.swift", - ], - swiftSettings: [ - .enableExperimentalFeature("StrictConcurrency"), - .define("SubprocessFoundation"), - availabilityMacro, - ] - ), - .testTarget( - name: "SubprocessTests", - dependencies: [ - "_SubprocessCShims", - "Subprocess", - "TestResources", - .product(name: "SystemPackage", package: "swift-system"), - ], - swiftSettings: [ - availabilityMacro - ] - ), - - .target( - name: "TestResources", - dependencies: [ - .product(name: "SystemPackage", package: "swift-system") - ], - path: "Tests/TestResources", - resources: [ - .copy("Resources") - ] - ), - - .target( - name: "_SubprocessCShims", - path: "Sources/_SubprocessCShims" - ), - ] -) diff --git a/README.md b/README.md index a7eff94..4a9eccd 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Then, adding the `Subprocess` module to your target dependencies: ) ``` -On Swift 6.1 and above, `Subprocess` offers two [package traits](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0450-swiftpm-package-traits.md): +`Subprocess` offers two [package traits](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0450-swiftpm-package-traits.md): - `SubprocessFoundation`: includes a dependency on `Foundation` and adds extensions on Foundation types like `Data`. This trait is enabled by default. - `SubprocessSpan`: makes Subprocess’ API, mainly `OutputProtocol`, `RawSpan` based. This trait is enabled whenever `RawSpan` is available and should only be disabled when `RawSpan` is not available. @@ -35,7 +35,7 @@ Please find the API proposal [here](https://github.com/swiftlang/swift-foundatio ### Swift Versions -The minimal supported Swift version is Swift 6.0. +The minimal supported Swift version is **Swift 6.1**. To experiment with the `SubprocessSpan` trait, Swift 6.2 is required. Currently, you can download the Swift 6.2 toolchain (`main` development snapshot) [here](https://www.swift.org/install/macos/#development-snapshots). diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index d21e7b1..9908e88 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -621,10 +621,10 @@ internal struct TrackedFileDescriptor: ~Copyable { // if another part of the code inadvertently reuses the same file descriptor // number. This problem is especially concerning on Unix systems due to POSIX’s // guarantee of using the lowest available file descriptor number, making reuse - // more probable. We use `preconditionFailure` upon receiving `.badFileDescriptor` + // more probable. We use `fatalError` upon receiving `.badFileDescriptor` // to prevent accidentally closing a different file descriptor. guard errno != .badFileDescriptor else { - preconditionFailure( + fatalError( "FileDescriptor \(fileDescriptor.rawValue) is already closed" ) } @@ -649,10 +649,10 @@ internal struct TrackedFileDescriptor: ~Copyable { // if another part of the code inadvertently reuses the same file descriptor // number. This problem is especially concerning on Unix systems due to POSIX’s // guarantee of using the lowest available file descriptor number, making reuse - // more probable. We use `preconditionFailure` upon receiving `.badFileDescriptor` + // more probable. We use `fatalError` upon receiving `.badFileDescriptor` // to prevent accidentally closing a different file descriptor. guard errno != .badFileDescriptor else { - preconditionFailure( + fatalError( "FileDescriptor \(fileDescriptor.rawValue) is already closed" ) }