Skip to content

Commit 2754c5f

Browse files
committed
Make Pipes and FileDescriptor ~Copyable
1 parent 495232d commit 2754c5f

11 files changed

+612
-382
lines changed

Sources/Subprocess/API.swift

+34-17
Original file line numberDiff line numberDiff line change
@@ -110,27 +110,32 @@ public func run<
110110
error: try error.createPipe()
111111
) { execution, inputIO, outputIO, errorIO in
112112
// Write input, capture output and error in parallel
113+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
114+
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
115+
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
113116
return try await withThrowingTaskGroup(
114117
of: OutputCapturingState<Output.OutputType, Error.OutputType>.self,
115118
returning: RunResult.self
116119
) { group in
120+
var outputIOContainer: TrackedPlatformDiskIO? = outputIOBox.take()
121+
var errorIOContainer: TrackedPlatformDiskIO? = errorIOBox.take()
117122
group.addTask {
118123
let stdout = try await output.captureOutput(
119-
from: outputIO
124+
from: outputIOContainer.take()
120125
)
121126
return .standardOutputCaptured(stdout)
122127
}
123128
group.addTask {
124129
let stderr = try await error.captureOutput(
125-
from: errorIO
130+
from: errorIOContainer.take()
126131
)
127132
return .standardErrorCaptured(stderr)
128133
}
129134

130135
// Write span at the same isolation
131-
if let writeFd = inputIO {
136+
if let writeFd = inputIOBox.take() {
132137
let writer = StandardInputWriter(diskIO: writeFd)
133-
_ = try await writer.write(input.bytes)
138+
_ = try await writer.write(input._bytes)
134139
try await writer.finish()
135140
}
136141

@@ -207,20 +212,23 @@ public func run<Result, Input: InputProtocol, Error: OutputProtocol>(
207212
output: try output.createPipe(),
208213
error: try error.createPipe()
209214
) { execution, inputIO, outputIO, errorIO in
215+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
216+
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
210217
return try await withThrowingTaskGroup(
211218
of: Void.self,
212219
returning: Result.self
213220
) { group in
221+
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
214222
group.addTask {
215-
if let inputIO = inputIO {
223+
if let inputIO = inputIOContainer.take() {
216224
let writer = StandardInputWriter(diskIO: inputIO)
217225
try await input.write(with: writer)
218226
try await writer.finish()
219227
}
220228
}
221229

222230
// Body runs in the same isolation
223-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
231+
let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO())
224232
let result = try await body(execution, outputSequence)
225233
try await group.waitForAll()
226234
return result
@@ -254,20 +262,23 @@ public func run<Result, Input: InputProtocol, Output: OutputProtocol>(
254262
output: try output.createPipe(),
255263
error: try error.createPipe()
256264
) { execution, inputIO, outputIO, errorIO in
265+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
266+
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
257267
return try await withThrowingTaskGroup(
258268
of: Void.self,
259269
returning: Result.self
260270
) { group in
271+
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
261272
group.addTask {
262-
if let inputIO = inputIO {
273+
if let inputIO = inputIOContainer.take() {
263274
let writer = StandardInputWriter(diskIO: inputIO)
264275
try await input.write(with: writer)
265276
try await writer.finish()
266277
}
267278
}
268279

269280
// Body runs in the same isolation
270-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
281+
let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO())
271282
let result = try await body(execution, errorSequence)
272283
try await group.waitForAll()
273284
return result
@@ -303,7 +314,7 @@ public func run<Result, Error: OutputProtocol>(
303314
error: try error.createPipe()
304315
) { execution, inputIO, outputIO, errorIO in
305316
let writer = StandardInputWriter(diskIO: inputIO!)
306-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
317+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
307318
return try await body(execution, writer, outputSequence)
308319
}
309320
}
@@ -336,7 +347,7 @@ public func run<Result, Output: OutputProtocol>(
336347
error: try error.createPipe()
337348
) { execution, inputIO, outputIO, errorIO in
338349
let writer = StandardInputWriter(diskIO: inputIO!)
339-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
350+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
340351
return try await body(execution, writer, errorSequence)
341352
}
342353
}
@@ -393,8 +404,8 @@ public func run<Result>(
393404
error: try error.createPipe()
394405
) { execution, inputIO, outputIO, errorIO in
395406
let writer = StandardInputWriter(diskIO: inputIO!)
396-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
397-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
407+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
408+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
398409
return try await body(execution, writer, outputSequence, errorSequence)
399410
}
400411
}
@@ -433,12 +444,18 @@ public func run<
433444
error: try error.createPipe()
434445
) { (execution, inputIO, outputIO, errorIO) -> RunResult in
435446
// Write input, capture output and error in parallel
447+
var inputIOBox: TrackedPlatformDiskIO? = consume inputIO
448+
var outputIOBox: TrackedPlatformDiskIO? = consume outputIO
449+
var errorIOBox: TrackedPlatformDiskIO? = consume errorIO
436450
return try await withThrowingTaskGroup(
437451
of: OutputCapturingState<Output.OutputType, Error.OutputType>?.self,
438452
returning: RunResult.self
439453
) { group in
454+
var inputIOContainer: TrackedPlatformDiskIO? = inputIOBox.take()
455+
var outputIOContainer: TrackedPlatformDiskIO? = outputIOBox.take()
456+
var errorIOContainer: TrackedPlatformDiskIO? = errorIOBox.take()
440457
group.addTask {
441-
if let writeFd = inputIO {
458+
if let writeFd = inputIOContainer.take() {
442459
let writer = StandardInputWriter(diskIO: writeFd)
443460
try await input.write(with: writer)
444461
try await writer.finish()
@@ -447,13 +464,13 @@ public func run<
447464
}
448465
group.addTask {
449466
let stdout = try await output.captureOutput(
450-
from: outputIO
467+
from: outputIOContainer.take()
451468
)
452469
return .standardOutputCaptured(stdout)
453470
}
454471
group.addTask {
455472
let stderr = try await error.captureOutput(
456-
from: errorIO
473+
from: errorIOContainer.take()
457474
)
458475
return .standardErrorCaptured(stderr)
459476
}
@@ -514,8 +531,8 @@ public func run<Result>(
514531
error: try error.createPipe()
515532
) { execution, inputIO, outputIO, errorIO in
516533
let writer = StandardInputWriter(diskIO: inputIO!)
517-
let outputSequence = AsyncBufferSequence(diskIO: outputIO!)
518-
let errorSequence = AsyncBufferSequence(diskIO: errorIO!)
534+
let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO())
535+
let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO())
519536
return try await body(execution, writer, outputSequence, errorSequence)
520537
}
521538
}

Sources/Subprocess/AsyncBufferSequence.swift

+17-5
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,31 @@
1515
@preconcurrency import SystemPackage
1616
#endif
1717

18+
internal import Dispatch
19+
1820
#if SubprocessSpan
1921
@available(SubprocessSpan, *)
2022
#endif
2123
public struct AsyncBufferSequence: AsyncSequence, Sendable {
2224
public typealias Failure = any Swift.Error
2325
public typealias Element = Buffer
2426

27+
#if os(Windows)
28+
internal typealias DiskIO = FileDescriptor
29+
#else
30+
internal typealias DiskIO = DispatchIO
31+
#endif
32+
2533
@_nonSendable
2634
public struct Iterator: AsyncIteratorProtocol {
2735
public typealias Element = Buffer
2836

29-
private let diskIO: TrackedPlatformDiskIO
37+
private let diskIO: DiskIO
3038
private var buffer: [UInt8]
3139
private var currentPosition: Int
3240
private var finished: Bool
3341

34-
internal init(diskIO: TrackedPlatformDiskIO) {
42+
internal init(diskIO: DiskIO) {
3543
self.diskIO = diskIO
3644
self.buffer = []
3745
self.currentPosition = 0
@@ -44,16 +52,20 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
4452
)
4553
if data == nil {
4654
// We finished reading. Close the file descriptor now
47-
try self.diskIO.safelyClose()
55+
#if os(Windows)
56+
try self.diskIO.close()
57+
#else
58+
self.diskIO.close()
59+
#endif
4860
return nil
4961
}
5062
return data
5163
}
5264
}
5365

54-
private let diskIO: TrackedPlatformDiskIO
66+
private let diskIO: DiskIO
5567

56-
internal init(diskIO: TrackedPlatformDiskIO) {
68+
internal init(diskIO: DiskIO) {
5769
self.diskIO = diskIO
5870
}
5971

0 commit comments

Comments
 (0)