Skip to content

Commit 51e1805

Browse files
committed
Make Pipes and FileDescriptor ~Copyable
1 parent c2d5278 commit 51e1805

11 files changed

+616
-444
lines changed

Sources/Subprocess/API.swift

+42-53
Original file line numberDiff line numberDiff line change
@@ -109,48 +109,25 @@ public func run<
109109
output: try output.createPipe(),
110110
error: try error.createPipe()
111111
) { execution, inputIO, outputIO, errorIO in
112-
// Write input, capture output and error in parallel
113-
return try await withThrowingTaskGroup(
114-
of: OutputCapturingState<Output.OutputType, Error.OutputType>.self,
115-
returning: RunResult.self
116-
) { group in
117-
group.addTask {
118-
let stdout = try await output.captureOutput(
119-
from: outputIO
120-
)
121-
return .standardOutputCaptured(stdout)
122-
}
123-
group.addTask {
124-
let stderr = try await error.captureOutput(
125-
from: errorIO
126-
)
127-
return .standardErrorCaptured(stderr)
128-
}
112+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
113+
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
114+
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
129115

130-
// Write span at the same isolation
131-
if let writeFd = inputIO {
132-
let writer = StandardInputWriter(diskIO: writeFd)
133-
_ = try await writer.write(input.bytes)
134-
try await writer.finish()
135-
}
136-
137-
var stdout: Output.OutputType!
138-
var stderror: Error.OutputType!
139-
while let state = try await group.next() {
140-
switch state {
141-
case .standardOutputCaptured(let output):
142-
stdout = output
143-
case .standardErrorCaptured(let error):
144-
stderror = error
145-
}
146-
}
147-
148-
return (
149-
processIdentifier: execution.processIdentifier,
150-
standardOutput: stdout,
151-
standardError: stderror
152-
)
116+
// Write input, capture output and error in parallel
117+
async let stdout = try output.captureOutput(from: outputIOBox.take())
118+
async let stderr = try error.captureOutput(from: errorIOBox.take())
119+
// Write span at the same isolation
120+
if let writeFd = inputIOBox.take() {
121+
let writer = StandardInputWriter(diskIO: writeFd)
122+
_ = try await writer.write(input._bytes)
123+
try await writer.finish()
153124
}
125+
126+
return (
127+
processIdentifier: execution.processIdentifier,
128+
standardOutput: try await stdout,
129+
standardError: try await stderr
130+
)
154131
}
155132

156133
return CollectedResult(
@@ -207,20 +184,23 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
207184
output: try output.createPipe(),
208185
error: try error.createPipe()
209186
) { execution, inputIO, outputIO, errorIO in
187+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
188+
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
210189
return try await withThrowingTaskGroup(
211190
of: Void.self,
212191
returning: Result.self
213192
) { group in
193+
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
214194
group.addTask {
215-
if let inputIO = inputIO {
195+
if let inputIO = inputIOContainer.take() {
216196
let writer = StandardInputWriter(diskIO: inputIO)
217197
try await input.write(with: writer)
218198
try await writer.finish()
219199
}
220200
}
221201

222202
// Body runs in the same isolation
223-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
203+
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO())
224204
let result = try await body(execution, outputSequence)
225205
try await group.waitForAll()
226206
return result
@@ -254,20 +234,23 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
254234
output: try output.createPipe(),
255235
error: try error.createPipe()
256236
) { execution, inputIO, outputIO, errorIO in
237+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
238+
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
257239
return try await withThrowingTaskGroup(
258240
of: Void.self,
259241
returning: Result.self
260242
) { group in
243+
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
261244
group.addTask {
262-
if let inputIO = inputIO {
245+
if let inputIO = inputIOContainer.take() {
263246
let writer = StandardInputWriter(diskIO: inputIO)
264247
try await input.write(with: writer)
265248
try await writer.finish()
266249
}
267250
}
268251

269252
// Body runs in the same isolation
270-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
253+
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO())
271254
let result = try await body(execution, errorSequence)
272255
try await group.waitForAll()
273256
return result
@@ -303,7 +286,7 @@ public func run<Result, Error: OutputProtocol>(
303286
error: try error.createPipe()
304287
) { execution, inputIO, outputIO, errorIO in
305288
let writer = StandardInputWriter(diskIO: inputIO!)
306-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
289+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
307290
return try await body(execution, writer, outputSequence)
308291
}
309292
}
@@ -336,7 +319,7 @@ public func run<Result, Output: OutputProtocol>(
336319
error: try error.createPipe()
337320
) { execution, inputIO, outputIO, errorIO in
338321
let writer = StandardInputWriter(diskIO: inputIO!)
339-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
322+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
340323
return try await body(execution, writer, errorSequence)
341324
}
342325
}
@@ -393,8 +376,8 @@ public func run<Result>(
393376
error: try error.createPipe()
394377
) { execution, inputIO, outputIO, errorIO in
395378
let writer = StandardInputWriter(diskIO: inputIO!)
396-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
397-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
379+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
380+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
398381
return try await body(execution, writer, outputSequence, errorSequence)
399382
}
400383
}
@@ -433,12 +416,18 @@ public func run<
433416
error: try error.createPipe()
434417
) { (execution, inputIO, outputIO, errorIO) -> RunResult in
435418
// Write input, capture output and error in parallel
419+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
420+
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
421+
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
436422
return try await withThrowingTaskGroup(
437423
of: OutputCapturingState<Output.OutputType, Error.OutputType>?.self,
438424
returning: RunResult.self
439425
) { group in
426+
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
427+
var outputIOContainer: TrackedPlatformDiskIO? = outputIOBox.take()
428+
var errorIOContainer: TrackedPlatformDiskIO? = errorIOBox.take()
440429
group.addTask {
441-
if let writeFd = inputIO {
430+
if let writeFd = inputIOContainer.take() {
442431
let writer = StandardInputWriter(diskIO: writeFd)
443432
try await input.write(with: writer)
444433
try await writer.finish()
@@ -447,13 +436,13 @@ public func run<
447436
}
448437
group.addTask {
449438
let stdout = try await output.captureOutput(
450-
from: outputIO
439+
from: outputIOContainer.take()
451440
)
452441
return .standardOutputCaptured(stdout)
453442
}
454443
group.addTask {
455444
let stderr = try await error.captureOutput(
456-
from: errorIO
445+
from: errorIOContainer.take()
457446
)
458447
return .standardErrorCaptured(stderr)
459448
}
@@ -514,8 +503,8 @@ public func run<Result>(
514503
error: try error.createPipe()
515504
) { execution, inputIO, outputIO, errorIO in
516505
let writer = StandardInputWriter(diskIO: inputIO!)
517-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
518-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
506+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
507+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
519508
return try await body(execution, writer, outputSequence, errorSequence)
520509
}
521510
}

Sources/Subprocess/AsyncBufferSequence.swift

+19-5
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,33 @@
1515
@preconcurrency import SystemPackage
1616
#endif
1717

18+
#if !os(Windows)
19+
internal import Dispatch
20+
#endif
21+
1822
#if SubprocessSpan
1923
@available(SubprocessSpan, *)
2024
#endif
2125
public struct AsyncBufferSequence: AsyncSequence, Sendable {
2226
public typealias Failure = any Swift.Error
2327
public typealias Element = Buffer
2428

29+
#if os(Windows)
30+
internal typealias DiskIO = FileDescriptor
31+
#else
32+
internal typealias DiskIO = DispatchIO
33+
#endif
34+
2535
@_nonSendable
2636
public struct Iterator: AsyncIteratorProtocol {
2737
public typealias Element = Buffer
2838

29-
private let diskIO: TrackedPlatformDiskIO
39+
private let diskIO: DiskIO
3040
private var buffer: [UInt8]
3141
private var currentPosition: Int
3242
private var finished: Bool
3343

34-
internal init(diskIO: TrackedPlatformDiskIO) {
44+
internal init(diskIO: DiskIO) {
3545
self.diskIO = diskIO
3646
self.buffer = []
3747
self.currentPosition = 0
@@ -44,16 +54,20 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
4454
)
4555
if data == nil {
4656
// We finished reading. Close the file descriptor now
47-
try self.diskIO.safelyClose()
57+
#if os(Windows)
58+
try self.diskIO.close()
59+
#else
60+
self.diskIO.close()
61+
#endif
4862
return nil
4963
}
5064
return data
5165
}
5266
}
5367

54-
private let diskIO: TrackedPlatformDiskIO
68+
private let diskIO: DiskIO
5569

56-
internal init(diskIO: TrackedPlatformDiskIO) {
70+
internal init(diskIO: DiskIO) {
5771
self.diskIO = diskIO
5872
}
5973

0 commit comments

Comments
 (0)