Skip to content

Commit 256c84c

Browse files
committed
Process termination monitoring implementation on Linux conflicts with processes spawned by other means
The approach used by Subprocess to monitor subprocess termination on Linux is fundamentally flawed as it calls waitid with P_ALL, and WEXITED without WNOWAIT, which will end up reaping pids that were spawned outside the Subprocess library. Use an implementation more like swift-testing does for wait tests, which doesn't suffer from this issue. Closes #82
1 parent 80bd50f commit 256c84c

File tree

3 files changed

+207
-67
lines changed

3 files changed

+207
-67
lines changed

Sources/Subprocess/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ target_sources(Subprocess PRIVATE
1313
Execution.swift
1414
Buffer.swift
1515
Error.swift
16+
Locked.swift
1617
Teardown.swift
1718
Result.swift
1819
IO/Output.swift

Sources/Subprocess/Locked.swift

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
/// A protocol defining a type, generally platform-specific, that satisfies the
13+
/// requirements of a lock or mutex.
14+
protocol Lockable {
15+
/// Initialize the lock at the given address.
16+
///
17+
/// - Parameters:
18+
/// - lock: A pointer to uninitialized memory that should be initialized as
19+
/// an instance of this type.
20+
static func initializeLock(at lock: UnsafeMutablePointer<Self>)
21+
22+
/// Deinitialize the lock at the given address.
23+
///
24+
/// - Parameters:
25+
/// - lock: A pointer to initialized memory that should be deinitialized.
26+
static func deinitializeLock(at lock: UnsafeMutablePointer<Self>)
27+
28+
/// Acquire the lock at the given address.
29+
///
30+
/// - Parameters:
31+
/// - lock: The address of the lock to acquire.
32+
static func unsafelyAcquireLock(at lock: UnsafeMutablePointer<Self>)
33+
34+
/// Relinquish the lock at the given address.
35+
///
36+
/// - Parameters:
37+
/// - lock: The address of the lock to relinquish.
38+
static func unsafelyRelinquishLock(at lock: UnsafeMutablePointer<Self>)
39+
}
40+
41+
// MARK: -
42+
43+
/// A type that wraps a value requiring access from a synchronous caller during
44+
/// concurrent execution.
45+
///
46+
/// Instances of this type use a lock to synchronize access to their raw values.
47+
/// The lock is not recursive.
48+
///
49+
/// Instances of this type can be used to synchronize access to shared data from
50+
/// a synchronous caller. Wherever possible, use actor isolation or other Swift
51+
/// concurrency tools.
52+
///
53+
/// This type is not part of the public interface of the testing library.
54+
struct LockedWith<L, T>: RawRepresentable where L: Lockable {
55+
/// A type providing heap-allocated storage for an instance of ``Locked``.
56+
private final class _Storage: ManagedBuffer<T, L> {
57+
deinit {
58+
withUnsafeMutablePointerToElements { lock in
59+
L.deinitializeLock(at: lock)
60+
}
61+
}
62+
}
63+
64+
/// Storage for the underlying lock and wrapped value.
65+
private nonisolated(unsafe) var _storage: ManagedBuffer<T, L>
66+
67+
init(rawValue: T) {
68+
_storage = _Storage.create(minimumCapacity: 1, makingHeaderWith: { _ in rawValue })
69+
_storage.withUnsafeMutablePointerToElements { lock in
70+
L.initializeLock(at: lock)
71+
}
72+
}
73+
74+
var rawValue: T {
75+
withLock { $0 }
76+
}
77+
78+
/// Acquire the lock and invoke a function while it is held.
79+
///
80+
/// - Parameters:
81+
/// - body: A closure to invoke while the lock is held.
82+
///
83+
/// - Returns: Whatever is returned by `body`.
84+
///
85+
/// - Throws: Whatever is thrown by `body`.
86+
///
87+
/// This function can be used to synchronize access to shared data from a
88+
/// synchronous caller. Wherever possible, use actor isolation or other Swift
89+
/// concurrency tools.
90+
nonmutating func withLock<R>(_ body: (inout T) throws -> R) rethrows -> R where R: ~Copyable {
91+
try _storage.withUnsafeMutablePointers { rawValue, lock in
92+
L.unsafelyAcquireLock(at: lock)
93+
defer {
94+
L.unsafelyRelinquishLock(at: lock)
95+
}
96+
return try body(&rawValue.pointee)
97+
}
98+
}
99+
100+
/// Acquire the lock and invoke a function while it is held, yielding both the
101+
/// protected value and a reference to the underlying lock guarding it.
102+
///
103+
/// - Parameters:
104+
/// - body: A closure to invoke while the lock is held.
105+
///
106+
/// - Returns: Whatever is returned by `body`.
107+
///
108+
/// - Throws: Whatever is thrown by `body`.
109+
///
110+
/// This function is equivalent to ``withLock(_:)`` except that the closure
111+
/// passed to it also takes a reference to the underlying lock guarding this
112+
/// instance's wrapped value. This function can be used when platform-specific
113+
/// functionality such as a `pthread_cond_t` is needed. Because the caller has
114+
/// direct access to the lock and is able to unlock and re-lock it, it is
115+
/// unsafe to modify the protected value.
116+
///
117+
/// - Warning: Callers that unlock the lock _must_ lock it again before the
118+
/// closure returns. If the lock is not acquired when `body` returns, the
119+
/// effect is undefined.
120+
nonmutating func withUnsafeUnderlyingLock<R>(_ body: (UnsafeMutablePointer<L>, T) throws -> R) rethrows -> R where R: ~Copyable {
121+
try withLock { value in
122+
try _storage.withUnsafeMutablePointerToElements { lock in
123+
try body(lock, value)
124+
}
125+
}
126+
}
127+
}
128+
129+
extension LockedWith: Sendable where T: Sendable {}
130+
131+
// MARK: - Additions
132+
133+
extension LockedWith {
134+
/// Initialize an instance of this type with a raw value of `[:]`.
135+
init<K, V>() where T == Dictionary<K, V> {
136+
self.init(rawValue: [:])
137+
}
138+
}

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 68 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -266,32 +266,29 @@ extension String {
266266
internal func monitorProcessTermination(
267267
forProcessWithIdentifier pid: ProcessIdentifier
268268
) async throws -> TerminationStatus {
269+
_ = setup
269270
return try await withCheckedThrowingContinuation { continuation in
270271
_childProcessContinuations.withLock { continuations in
271-
if let existing = continuations.removeValue(forKey: pid.value),
272-
case .status(let existingStatus) = existing
273-
{
274-
// We already have existing status to report
275-
continuation.resume(returning: existingStatus)
276-
} else {
277-
// Save the continuation for handler
278-
continuations[pid.value] = .continuation(continuation)
279-
}
272+
// Save the continuation for handler
273+
let oldContinuation = continuations.updateValue(continuation, forKey: pid.value)
274+
precondition(oldContinuation == nil)
275+
276+
_ = pthread_cond_signal(_waitThreadNoChildrenCondition)
280277
}
281278
}
282279
}
283280

284-
private enum ContinuationOrStatus {
285-
case continuation(CheckedContinuation<TerminationStatus, any Error>)
286-
case status(TerminationStatus)
287-
}
288-
289-
private let _childProcessContinuations:
290-
Mutex<
291-
[pid_t: ContinuationOrStatus]
292-
> = Mutex([:])
281+
private let _childProcessContinuations =
282+
LockedWith<
283+
pthread_mutex_t,
284+
[pid_t: CheckedContinuation<TerminationStatus, any Error>]
285+
>()
293286

294-
private let signalSource: SendableSourceSignal = SendableSourceSignal()
287+
private nonisolated(unsafe) let _waitThreadNoChildrenCondition = {
288+
let result = UnsafeMutablePointer<pthread_cond_t>.allocate(capacity: 1)
289+
_ = pthread_cond_init(result, nil)
290+
return result
291+
}()
295292

296293
private extension siginfo_t {
297294
var si_status: Int32 {
@@ -316,67 +313,71 @@ private extension siginfo_t {
316313
}
317314

318315
private let setup: () = {
319-
signalSource.setEventHandler {
320-
while true {
321-
var siginfo = siginfo_t()
322-
guard waitid(P_ALL, id_t(0), &siginfo, WEXITED) == 0 || errno == EINTR else {
323-
return
324-
}
325-
var status: TerminationStatus? = nil
326-
switch siginfo.si_code {
327-
case .init(CLD_EXITED):
328-
status = .exited(siginfo.si_status)
329-
case .init(CLD_KILLED), .init(CLD_DUMPED):
330-
status = .unhandledException(siginfo.si_status)
331-
case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED):
332-
// Ignore these signals because they are not related to
333-
// process exiting
334-
break
335-
default:
336-
fatalError("Unexpected exit status: \(siginfo.si_code)")
337-
}
338-
if let status = status {
339-
_childProcessContinuations.withLock { continuations in
316+
var thread = pthread_t()
317+
_ = pthread_create(
318+
&thread,
319+
nil,
320+
{ _ -> UnsafeMutableRawPointer? in
321+
while true {
322+
var siginfo = siginfo_t()
323+
if waitid(P_ALL, id_t(0), &siginfo, WEXITED | WNOWAIT) == 0 {
340324
let pid = siginfo.si_pid
341-
if let existing = continuations.removeValue(forKey: pid),
342-
case .continuation(let c) = existing
343-
{
344-
c.resume(returning: status)
345-
} else {
346-
// We don't have continuation yet, just state status
347-
continuations[pid] = .status(status)
325+
guard pid != 0, let c = _childProcessContinuations.withLock({ $0.removeValue(forKey: pid) }) else {
326+
continue
327+
}
328+
329+
c.resume(with: Result {
330+
while true {
331+
var siginfo = siginfo_t()
332+
if waitid(P_PID, numericCast(pid), &siginfo, WEXITED) == 0 {
333+
var status: TerminationStatus? = nil
334+
switch siginfo.si_code {
335+
case .init(CLD_EXITED):
336+
return .exited(siginfo.si_status)
337+
case .init(CLD_KILLED), .init(CLD_DUMPED):
338+
return .unhandledException(siginfo.si_status)
339+
default:
340+
fatalError("Unexpected exit status: \(siginfo.si_code)")
341+
}
342+
} else if errno != EINTR {
343+
throw SubprocessError.UnderlyingError(rawValue: errno)
344+
}
345+
}
346+
})
347+
} else if errno == SIGCHLD {
348+
_childProcessContinuations.withUnsafeUnderlyingLock { lock, childProcessContinuations in
349+
if childProcessContinuations.isEmpty {
350+
_ = pthread_cond_wait(_waitThreadNoChildrenCondition, lock)
351+
}
348352
}
349353
}
350354
}
351-
}
352-
}
353-
signalSource.resume()
355+
},
356+
nil
357+
)
354358
}()
355359

356-
/// Unchecked Sendable here since this class is only explicitly
357-
/// initialized once during the lifetime of the process
358-
final class SendableSourceSignal: @unchecked Sendable {
359-
private let signalSource: DispatchSourceSignal
360+
private func _setupMonitorSignalHandler() {
361+
// Only executed once
362+
setup
363+
}
360364

361-
func setEventHandler(handler: @escaping DispatchSourceHandler) {
362-
self.signalSource.setEventHandler(handler: handler)
365+
extension pthread_mutex_t: Lockable {
366+
static func initializeLock(at lock: UnsafeMutablePointer<Self>) {
367+
_ = pthread_mutex_init(lock, nil)
363368
}
364369

365-
func resume() {
366-
self.signalSource.resume()
370+
static func deinitializeLock(at lock: UnsafeMutablePointer<Self>) {
371+
_ = pthread_mutex_destroy(lock)
367372
}
368373

369-
init() {
370-
self.signalSource = DispatchSource.makeSignalSource(
371-
signal: SIGCHLD,
372-
queue: .global()
373-
)
374+
static func unsafelyAcquireLock(at lock: UnsafeMutablePointer<Self>) {
375+
_ = pthread_mutex_lock(lock)
374376
}
375-
}
376377

377-
private func _setupMonitorSignalHandler() {
378-
// Only executed once
379-
setup
378+
static func unsafelyRelinquishLock(at lock: UnsafeMutablePointer<Self>) {
379+
_ = pthread_mutex_unlock(lock)
380+
}
380381
}
381382

382383
#endif // canImport(Glibc) || canImport(Android) || canImport(Musl)

0 commit comments

Comments
 (0)