Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash in PoolStateMachine+ConnectionGroup when closing connection while keepAlive is running #444

Merged
merged 11 commits into from
Dec 12, 2023
2 changes: 1 addition & 1 deletion Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public final class ConnectionPool<
self.observabilityDelegate.keepAliveFailed(id: connection.id, error: error)

self.modifyStateAndRunActions { state in
state.stateMachine.connectionClosed(connection)
state.stateMachine.connectionKeepAliveFailed(connection)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,21 @@ extension PoolStateMachine {
return (index, context)
}

/// Returns `true` if the connection should be explicitly closed, `false` if nothing needs to be done.
@inlinable
mutating func keepAliveFailed(_ connectionID: Connection.ID) -> Bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return an enum for this? KeepAliveFailedAction with two cases: close and none? Makes this more explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now returning an optional CloseAction. It should be explicit enough now, let me know if not.

// We don't have to inform the ConnectionStates about any of this, because the connection will close
// immediately after this or is closed already
switch self.connections.first(where: { $0.id == connectionID })?.state {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don‘t access the connection‘s state here. the state is only visible here, to allow inlining. Instead please forward a call keepAliveFailed into the ConnectionStateMachine. Also make sure to change the state to closing, if we need to explicitly close the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection state is receiving the failure now and changes the state accordingly.

case .closing, .closed, .none:
// There might've been a race between closing and keeping the connection alive.
// The connection has already been closed in that case.
return false
default:
return true
}
}

// MARK: Connection close/removal

@usableFromInline
Expand Down Expand Up @@ -547,8 +562,9 @@ extension PoolStateMachine {

if closedAction.wasRunningKeepAlive {
self.stats.runningKeepAlive -= 1
} else {
lovetodream marked this conversation as resolved.
Show resolved Hide resolved
self.stats.leasedStreams -= closedAction.usedStreams
}
self.stats.leasedStreams -= closedAction.usedStreams
self.stats.availableStreams -= closedAction.maxStreams - closedAction.usedStreams

switch closedAction.previousConnectionState {
Expand Down
8 changes: 8 additions & 0 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ struct PoolStateMachine<
return self.handleAvailableConnection(index: index, availableContext: context)
}

@inlinable
mutating func connectionKeepAliveFailed(_ connection: Connection) -> Action {
if self.connections.keepAliveFailed(connection.id) {
return self.connectionClosed(connection)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn‘t look right, if we the keep alive failed, we can‘t declare the connection dead right away, we need to create a close action. Also if we close and have waiting requests, we might need to create a new connection right away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection is now properly closed by reusing the existing closed states and actions. I've added a test to ensure a new connection is established if required.

}
return .none()
}

@inlinable
mutating func connectionIdleTimerTriggered(_ connectionID: ConnectionID) -> Action {
precondition(self.requestQueue.isEmpty)
Expand Down
88 changes: 88 additions & 0 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,94 @@ final class ConnectionPoolTests: XCTestCase {
}
}

func testKeepAliveOnClose() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(20)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

async let lease1ConnectionAsync = pool.leaseConnection()

let connection = await factory.nextConnectAttempt { connectionID in
return 1
}

let lease1Connection = try await lease1ConnectionAsync
XCTAssert(connection === lease1Connection)

pool.releaseConnection(lease1Connection)

// keep alive 1

// validate that a keep alive timer and an idle timeout timer is scheduled
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
let deadline1 = await clock.nextTimerScheduled()
print(deadline1)
XCTAssertNotNil(expectedInstants.remove(deadline1))
let deadline2 = await clock.nextTimerScheduled()
print(deadline2)
XCTAssertNotNil(expectedInstants.remove(deadline2))
XCTAssert(expectedInstants.isEmpty)

// move clock forward to keep alive
let newTime = clock.now.advanced(by: keepAliveDuration)
clock.advance(to: newTime)
print("clock advanced to: \(newTime)")

await keepAlive.nextKeepAlive { keepAliveConnection in
defer { print("keep alive 1 has run") }
XCTAssertTrue(keepAliveConnection === lease1Connection)
return true
}

// keep alive 2

let deadline3 = await clock.nextTimerScheduled()
XCTAssertEqual(deadline3, clock.now.advanced(by: keepAliveDuration))
print(deadline3)
lovetodream marked this conversation as resolved.
Show resolved Hide resolved

clock.advance(to: clock.now.advanced(by: keepAliveDuration))

// the following keep alive should not cause a crash
_ = try? await keepAlive.nextKeepAlive { keepAliveConnection in
defer { print("failing keep alive has run") }
lovetodream marked this conversation as resolved.
Show resolved Hide resolved
XCTAssertTrue(keepAliveConnection === lease1Connection)
keepAliveConnection.close()
keepAliveConnection.closeIfClosing()
XCTAssertTrue(keepAliveConnection.isClosing)
throw CancellationError() // any error
} // will fail and it's expected

taskGroup.cancelAll()

for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

func testKeepAliveWorksRacesAgainstShutdown() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
Expand Down
11 changes: 11 additions & 0 deletions Tests/ConnectionPoolModuleTests/Mocks/MockConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ final class MockConnection: PooledConnection, Sendable {
callback(nil)
}
}

var isClosing: Bool {
self.lock.withLockedValue { state in
switch state {
case .closing:
return true
case .running, .closed:
return true
lovetodream marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

extension MockConnection: CustomStringConvertible {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,35 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase {
XCTAssertEqual(afterPingIdleContext.use, .persisted)
XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1))
}

func testKeepAliveShouldNotIndicateCloseConnectionAfterClosed() {
var connections = TestPoolStateMachine.ConnectionGroup(
generator: self.idGenerator,
minimumConcurrentConnections: 0,
maximumConcurrentConnectionSoftLimit: 2,
maximumConcurrentConnectionHardLimit: 2,
keepAlive: true,
keepAliveReducesAvailableStreams: true
)

guard let firstRequest = connections.createNewDemandConnectionIfPossible() else { return XCTFail("Expected to have a request here") }

let newConnection = MockConnection(id: firstRequest.connectionID)
let (connectionIndex, establishedConnectionContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1)
XCTAssertEqual(establishedConnectionContext.info, .idle(availableStreams: 1, newIdle: true))
XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1))
let timers = connections.parkConnection(at: connectionIndex, hasBecomeIdle: true)
let keepAliveTimer = TestPoolStateMachine.ConnectionTimer(timerID: 0, connectionID: firstRequest.connectionID, usecase: .keepAlive)
let keepAliveTimerCancellationToken = MockTimerCancellationToken(keepAliveTimer)
XCTAssertNil(connections.timerScheduled(keepAliveTimer, cancelContinuation: keepAliveTimerCancellationToken))
let keepAliveAction = connections.keepAliveIfIdle(newConnection.id)
XCTAssertEqual(keepAliveAction, .init(connection: newConnection, keepAliveTimerCancellationContinuation: keepAliveTimerCancellationToken))
XCTAssertEqual(connections.stats, .init(idle: 1, runningKeepAlive: 1, availableStreams: 0))

_ = connections.closeConnectionIfIdle(newConnection.id)
guard !connections.keepAliveFailed(newConnection.id) else {
return XCTFail("Expected keepAliveFailed to be false due to closing connection")
}
XCTAssertEqual(connections.stats, .init(closing: 1))
}
}
70 changes: 70 additions & 0 deletions Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,74 @@ final class PoolStateMachineTests: XCTestCase {
XCTAssertEqual(releaseRequest1.connection, .none)
}

func testKeepAliveOnClosingConnection() {
var configuration = PoolConfiguration()
configuration.minimumConnectionCount = 0
configuration.maximumConnectionSoftLimit = 2
configuration.maximumConnectionHardLimit = 2
configuration.keepAliveDuration = .seconds(2)
configuration.idleTimeoutDuration = .seconds(4)


var stateMachine = TestPoolStateMachine(
configuration: configuration,
generator: .init(),
timerCancellationTokenType: MockTimerCancellationToken.self
)

// don't refill pool
let requests = stateMachine.refillConnections()
XCTAssertEqual(requests.count, 0)

// request connection while none exists
let request1 = MockRequest()
let leaseRequest1 = stateMachine.leaseConnection(request1)
XCTAssertEqual(leaseRequest1.connection, .makeConnection(.init(connectionID: 0), []))
XCTAssertEqual(leaseRequest1.request, .none)

// make connection 1
let connection1 = MockConnection(id: 0)
let createdAction1 = stateMachine.connectionEstablished(connection1, maxStreams: 1)
XCTAssertEqual(createdAction1.request, .leaseConnection(.init(element: request1), connection1))
XCTAssertEqual(createdAction1.connection, .none)
_ = stateMachine.releaseConnection(connection1, streams: 1)

// trigger keep alive
let keepAliveAction1 = stateMachine.connectionKeepAliveTimerTriggered(connection1.id)
XCTAssertEqual(keepAliveAction1.connection, .runKeepAlive(connection1, nil))

// fail keep alive and cause closed
let keepAliveFailed1 = stateMachine.connectionKeepAliveFailed(connection1)
XCTAssertEqual(keepAliveFailed1.connection, .cancelTimers([]))

XCTAssertTrue(connection1.isClosing)
connection1.closeIfClosing()

// request connection while none exists anymore
let request2 = MockRequest()
let leaseRequest2 = stateMachine.leaseConnection(request2)
XCTAssertEqual(leaseRequest2.connection, .makeConnection(.init(connectionID: 1), []))
XCTAssertEqual(leaseRequest2.request, .none)

// make connection 2
let connection2 = MockConnection(id: 1)
let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1)
XCTAssertEqual(createdAction2.request, .leaseConnection(.init(element: request2), connection2))
XCTAssertEqual(createdAction2.connection, .none)
_ = stateMachine.releaseConnection(connection2, streams: 1)

// trigger keep alive while connection is still open
let keepAliveAction2 = stateMachine.connectionKeepAliveTimerTriggered(connection2.id)
XCTAssertEqual(keepAliveAction2.connection, .runKeepAlive(connection2, nil))

// close connection in the middle of keep alive
_ = stateMachine.connectionClosed(connection2)
XCTAssertTrue(connection2.isClosing)
connection2.closeIfClosing()

// fail keep alive and cause closed
let keepAliveFailed2 = stateMachine.connectionKeepAliveFailed(connection2)
XCTAssertEqual(keepAliveFailed2.connection, .none)
}

}
Loading