Skip to content

Commit

Permalink
Fix flaky tests due to Recorder incorrect critical section
Browse files Browse the repository at this point in the history
  • Loading branch information
groue committed Mar 17, 2024
1 parent c52db60 commit 4e0ab25
Showing 1 changed file with 93 additions and 81 deletions.
174 changes: 93 additions & 81 deletions Tests/CombineExpectations/Recorder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,53 +106,59 @@ public class Recorder<Input, Failure: Error>: Subscriber {
/// the expectation. For example, the Prefix expectation uses true, but
/// the NextOne expectation uses false.
func fulfillOnInput(_ expectation: XCTestExpectation, includingConsumed: Bool) {
synchronized {
preconditionCanFulfillExpectation()
lock.lock()

preconditionCanFulfillExpectation()

let expectedFulfillmentCount = expectation.expectedFulfillmentCount

switch state {
case .waitingForSubscription:
let exp = RecorderExpectation.onInput(expectation, remainingCount: expectedFulfillmentCount)
state = .waitingForSubscription(exp)
lock.unlock()

let expectedFulfillmentCount = expectation.expectedFulfillmentCount
case let .subscribed(subscription, _, elements):
let maxFulfillmentCount = includingConsumed
? elements.count
: elements.count - consumedCount
let fulfillmentCount = min(expectedFulfillmentCount, maxFulfillmentCount)

switch state {
case .waitingForSubscription:
let exp = RecorderExpectation.onInput(expectation, remainingCount: expectedFulfillmentCount)
state = .waitingForSubscription(exp)

case let .subscribed(subscription, _, elements):
let maxFulfillmentCount = includingConsumed
? elements.count
: elements.count - consumedCount
let fulfillmentCount = min(expectedFulfillmentCount, maxFulfillmentCount)
expectation.fulfill(count: fulfillmentCount)

let remainingCount = expectedFulfillmentCount - fulfillmentCount
if remainingCount > 0 {
let exp = RecorderExpectation.onInput(expectation, remainingCount: remainingCount)
state = .subscribed(subscription, exp, elements)
}

case .completed:
expectation.fulfill(count: expectedFulfillmentCount)
let remainingCount = expectedFulfillmentCount - fulfillmentCount
if remainingCount > 0 {
let exp = RecorderExpectation.onInput(expectation, remainingCount: remainingCount)
state = .subscribed(subscription, exp, elements)
}
lock.unlock()
expectation.fulfill(count: fulfillmentCount)

case .completed:
lock.unlock()
expectation.fulfill(count: expectedFulfillmentCount)
}
}

/// Registers the expectation so that it gets fulfilled when
/// publisher completes.
func fulfillOnCompletion(_ expectation: XCTestExpectation) {
synchronized {
preconditionCanFulfillExpectation()
lock.lock()

preconditionCanFulfillExpectation()

switch state {
case .waitingForSubscription:
let exp = RecorderExpectation.onCompletion(expectation)
state = .waitingForSubscription(exp)
lock.unlock()

switch state {
case .waitingForSubscription:
let exp = RecorderExpectation.onCompletion(expectation)
state = .waitingForSubscription(exp)

case let .subscribed(subscription, _, elements):
let exp = RecorderExpectation.onCompletion(expectation)
state = .subscribed(subscription, exp, elements)

case .completed:
expectation.fulfill()
}
case let .subscribed(subscription, _, elements):
let exp = RecorderExpectation.onCompletion(expectation)
state = .subscribed(subscription, exp, elements)
lock.unlock()

case .completed:
lock.unlock()
expectation.fulfill()
}
}

Expand All @@ -171,7 +177,7 @@ public class Recorder<Input, Failure: Error>: Subscriber {
_ completion: Subscribers.Completion<Failure>?,
_ remainingElements: ArraySlice<Input>,
_ consume: (_ count: Int) -> ()) throws -> T)
rethrows -> T
rethrows -> T
{
try synchronized {
let (elements, completion) = state.elementsAndCompletion
Expand Down Expand Up @@ -217,58 +223,64 @@ public class Recorder<Input, Failure: Error>: Subscriber {
}

public func receive(_ input: Input) -> Subscribers.Demand {
return synchronized {
switch state {
case let .subscribed(subscription, exp, elements):
var elements = elements
elements.append(input)

if case let .onInput(expectation, remainingCount: remainingCount) = exp {
assert(remainingCount > 0)
expectation.fulfill()
if remainingCount > 1 {
let exp = RecorderExpectation.onInput(expectation, remainingCount: remainingCount - 1)
state = .subscribed(subscription, exp, elements)
} else {
state = .subscribed(subscription, nil, elements)
}
} else {
lock.lock()

switch state {
case let .subscribed(subscription, exp, elements):
var elements = elements
elements.append(input)

if case let .onInput(expectation, remainingCount: remainingCount) = exp {
assert(remainingCount > 0)
expectation.fulfill()
if remainingCount > 1 {
let exp = RecorderExpectation.onInput(expectation, remainingCount: remainingCount - 1)
state = .subscribed(subscription, exp, elements)
} else {
state = .subscribed(subscription, nil, elements)
}

return .unlimited

case .waitingForSubscription:
XCTFail("Publisher recorder got unexpected input before subscription: \(String(reflecting: input))")
return .none

case .completed:
XCTFail("Publisher recorder got unexpected input after completion: \(String(reflecting: input))")
return .none
} else {
state = .subscribed(subscription, exp, elements)
}

lock.unlock()
return .unlimited

case .waitingForSubscription:
lock.unlock()
XCTFail("Publisher recorder got unexpected input before subscription: \(String(reflecting: input))")
return .none

case .completed:
lock.unlock()
XCTFail("Publisher recorder got unexpected input after completion: \(String(reflecting: input))")
return .none
}
}

public func receive(completion: Subscribers.Completion<Failure>) {
synchronized {
switch state {
case let .subscribed(_, exp, elements):
if let exp {
switch exp {
case let .onCompletion(expectation):
expectation.fulfill()
case let .onInput(expectation, remainingCount: remainingCount):
expectation.fulfill(count: remainingCount)
}
lock.lock()

switch state {
case let .subscribed(_, exp, elements):
if let exp {
switch exp {
case let .onCompletion(expectation):
expectation.fulfill()
case let .onInput(expectation, remainingCount: remainingCount):
expectation.fulfill(count: remainingCount)
}
state = .completed(elements, completion)

case .waitingForSubscription:
XCTFail("Publisher recorder got unexpected completion before subscription: \(String(describing: completion))")

case .completed:
XCTFail("Publisher recorder got unexpected completion after completion: \(String(describing: completion))")
}
state = .completed(elements, completion)
lock.unlock()

case .waitingForSubscription:
lock.unlock()
XCTFail("Publisher recorder got unexpected completion before subscription: \(String(describing: completion))")

case .completed:
lock.unlock()
XCTFail("Publisher recorder got unexpected completion after completion: \(String(describing: completion))")
}
}
}
Expand Down

0 comments on commit 4e0ab25

Please sign in to comment.