Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash in PoolStateMachine+ConnectionGroup when closing connection while keepAlive is running #444

Merged
merged 11 commits into from
Dec 12, 2023
2 changes: 1 addition & 1 deletion Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public final class ConnectionPool<
self.observabilityDelegate.keepAliveFailed(id: connection.id, error: error)

self.modifyStateAndRunActions { state in
state.stateMachine.connectionClosed(connection)
state.stateMachine.connectionKeepAliveFailed(connection)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,30 @@ extension PoolStateMachine {
return (index, context)
}

@inlinable
mutating func keepAliveFailed(_ connectionID: Connection.ID) -> CloseAction? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
// Connection has already been closed
return nil
}

guard let closeAction = self.connections[index].keepAliveFailed() else {
return nil
}

self.stats.idle -= 1
self.stats.closing += 1
self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0
self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams

// force unwrapping the connection is fine, because a close action due to failed
// keepAlive cannot happen without a connection
return CloseAction(
connection: closeAction.connection!,
timersToCancel: closeAction.cancelTimers
)
}

// MARK: Connection close/removal

@usableFromInline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ extension PoolStateMachine {
}
}

@inlinable
mutating func keepAliveFailed() -> CloseAction? {
return self.close()
}

@inlinable
mutating func timerScheduled(
_ timer: ConnectionTimer,
Expand Down
9 changes: 9 additions & 0 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ struct PoolStateMachine<
return self.handleAvailableConnection(index: index, availableContext: context)
}

@inlinable
mutating func connectionKeepAliveFailed(_ connection: Connection) -> Action {
guard let closeAction = self.connections.keepAliveFailed(connection.id) else {
return .none()
}

return .init(request: .none, connection: .closeConnection(closeAction.connection, closeAction.timersToCancel))
}

@inlinable
mutating func connectionIdleTimerTriggered(_ connectionID: ConnectionID) -> Action {
precondition(self.requestQueue.isEmpty)
Expand Down
86 changes: 86 additions & 0 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,92 @@ final class ConnectionPoolTests: XCTestCase {
}
}

func testKeepAliveOnClose() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(20)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

async let lease1ConnectionAsync = pool.leaseConnection()

let connection = await factory.nextConnectAttempt { connectionID in
return 1
}

let lease1Connection = try await lease1ConnectionAsync
XCTAssert(connection === lease1Connection)

pool.releaseConnection(lease1Connection)

// keep alive 1

// validate that a keep alive timer and an idle timeout timer is scheduled
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
let deadline1 = await clock.nextTimerScheduled()
print(deadline1)
XCTAssertNotNil(expectedInstants.remove(deadline1))
let deadline2 = await clock.nextTimerScheduled()
print(deadline2)
XCTAssertNotNil(expectedInstants.remove(deadline2))
XCTAssert(expectedInstants.isEmpty)

// move clock forward to keep alive
let newTime = clock.now.advanced(by: keepAliveDuration)
clock.advance(to: newTime)
print("clock advanced to: \(newTime)")

await keepAlive.nextKeepAlive { keepAliveConnection in
defer { print("keep alive 1 has run") }
XCTAssertTrue(keepAliveConnection === lease1Connection)
return true
}

// keep alive 2

let deadline3 = await clock.nextTimerScheduled()
XCTAssertEqual(deadline3, clock.now.advanced(by: keepAliveDuration))
print(deadline3)
lovetodream marked this conversation as resolved.
Show resolved Hide resolved

clock.advance(to: clock.now.advanced(by: keepAliveDuration))

// the following keep alive should not cause a crash
_ = try? await keepAlive.nextKeepAlive { keepAliveConnection in
defer { print("failing keep alive has run") }
lovetodream marked this conversation as resolved.
Show resolved Hide resolved
XCTAssertTrue(keepAliveConnection === lease1Connection)
keepAliveConnection.close()
throw CancellationError() // any error
} // will fail and it's expected

taskGroup.cancelAll()

for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

func testKeepAliveWorksRacesAgainstShutdown() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,35 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase {
XCTAssertEqual(afterPingIdleContext.use, .persisted)
XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1))
}

func testKeepAliveShouldNotIndicateCloseConnectionAfterClosed() {
var connections = TestPoolStateMachine.ConnectionGroup(
generator: self.idGenerator,
minimumConcurrentConnections: 0,
maximumConcurrentConnectionSoftLimit: 2,
maximumConcurrentConnectionHardLimit: 2,
keepAlive: true,
keepAliveReducesAvailableStreams: true
)

guard let firstRequest = connections.createNewDemandConnectionIfPossible() else { return XCTFail("Expected to have a request here") }

let newConnection = MockConnection(id: firstRequest.connectionID)
let (connectionIndex, establishedConnectionContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1)
XCTAssertEqual(establishedConnectionContext.info, .idle(availableStreams: 1, newIdle: true))
XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1))
_ = connections.parkConnection(at: connectionIndex, hasBecomeIdle: true)
let keepAliveTimer = TestPoolStateMachine.ConnectionTimer(timerID: 0, connectionID: firstRequest.connectionID, usecase: .keepAlive)
let keepAliveTimerCancellationToken = MockTimerCancellationToken(keepAliveTimer)
XCTAssertNil(connections.timerScheduled(keepAliveTimer, cancelContinuation: keepAliveTimerCancellationToken))
let keepAliveAction = connections.keepAliveIfIdle(newConnection.id)
XCTAssertEqual(keepAliveAction, .init(connection: newConnection, keepAliveTimerCancellationContinuation: keepAliveTimerCancellationToken))
XCTAssertEqual(connections.stats, .init(idle: 1, runningKeepAlive: 1, availableStreams: 0))

_ = connections.closeConnectionIfIdle(newConnection.id)
guard connections.keepAliveFailed(newConnection.id) == nil else {
return XCTFail("Expected keepAliveFailed not to cause close again")
}
XCTAssertEqual(connections.stats, .init(closing: 1))
}
}
111 changes: 111 additions & 0 deletions Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,115 @@ final class PoolStateMachineTests: XCTestCase {
XCTAssertEqual(releaseRequest1.connection, .none)
}

func testKeepAliveOnClosingConnection() {
var configuration = PoolConfiguration()
configuration.minimumConnectionCount = 0
configuration.maximumConnectionSoftLimit = 2
configuration.maximumConnectionHardLimit = 2
configuration.keepAliveDuration = .seconds(2)
configuration.idleTimeoutDuration = .seconds(4)


var stateMachine = TestPoolStateMachine(
configuration: configuration,
generator: .init(),
timerCancellationTokenType: MockTimerCancellationToken.self
)

// don't refill pool
let requests = stateMachine.refillConnections()
XCTAssertEqual(requests.count, 0)

// request connection while none exists
let request1 = MockRequest()
let leaseRequest1 = stateMachine.leaseConnection(request1)
XCTAssertEqual(leaseRequest1.connection, .makeConnection(.init(connectionID: 0), []))
XCTAssertEqual(leaseRequest1.request, .none)

// make connection 1
let connection1 = MockConnection(id: 0)
let createdAction1 = stateMachine.connectionEstablished(connection1, maxStreams: 1)
XCTAssertEqual(createdAction1.request, .leaseConnection(.init(element: request1), connection1))
XCTAssertEqual(createdAction1.connection, .none)
_ = stateMachine.releaseConnection(connection1, streams: 1)

// trigger keep alive
let keepAliveAction1 = stateMachine.connectionKeepAliveTimerTriggered(connection1.id)
XCTAssertEqual(keepAliveAction1.connection, .runKeepAlive(connection1, nil))

// fail keep alive and cause closed
let keepAliveFailed1 = stateMachine.connectionKeepAliveFailed(connection1)
XCTAssertEqual(keepAliveFailed1.connection, .closeConnection(connection1, []))
connection1.closeIfClosing()

// request connection while none exists anymore
let request2 = MockRequest()
let leaseRequest2 = stateMachine.leaseConnection(request2)
XCTAssertEqual(leaseRequest2.connection, .makeConnection(.init(connectionID: 1), []))
XCTAssertEqual(leaseRequest2.request, .none)

// make connection 2
let connection2 = MockConnection(id: 1)
let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1)
XCTAssertEqual(createdAction2.request, .leaseConnection(.init(element: request2), connection2))
XCTAssertEqual(createdAction2.connection, .none)
_ = stateMachine.releaseConnection(connection2, streams: 1)

// trigger keep alive while connection is still open
let keepAliveAction2 = stateMachine.connectionKeepAliveTimerTriggered(connection2.id)
XCTAssertEqual(keepAliveAction2.connection, .runKeepAlive(connection2, nil))

// close connection in the middle of keep alive
connection2.close()
connection2.closeIfClosing()

// fail keep alive and cause closed
let keepAliveFailed2 = stateMachine.connectionKeepAliveFailed(connection2)
XCTAssertEqual(keepAliveFailed2.connection, .closeConnection(connection2, []))
}

func testConnectionIsEstablishedAfterFailedKeepAliveIfNotEnoughConnectionsLeft() {
var configuration = PoolConfiguration()
configuration.minimumConnectionCount = 1
configuration.maximumConnectionSoftLimit = 2
configuration.maximumConnectionHardLimit = 2
configuration.keepAliveDuration = .seconds(2)
configuration.idleTimeoutDuration = .seconds(4)


var stateMachine = TestPoolStateMachine(
configuration: configuration,
generator: .init(),
timerCancellationTokenType: MockTimerCancellationToken.self
)

// refill pool
let requests = stateMachine.refillConnections()
XCTAssertEqual(requests.count, 1)

// one connection should exist
let request = MockRequest()
let leaseRequest = stateMachine.leaseConnection(request)
XCTAssertEqual(leaseRequest.connection, .none)
XCTAssertEqual(leaseRequest.request, .none)

// make connection 1
let connection = MockConnection(id: 0)
let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1)
XCTAssertEqual(createdAction.request, .leaseConnection(.init(element: request), connection))
XCTAssertEqual(createdAction.connection, .none)
_ = stateMachine.releaseConnection(connection, streams: 1)

// trigger keep alive
let keepAliveAction = stateMachine.connectionKeepAliveTimerTriggered(connection.id)
XCTAssertEqual(keepAliveAction.connection, .runKeepAlive(connection, nil))

// fail keep alive, cause closed and make new connection
let keepAliveFailed = stateMachine.connectionKeepAliveFailed(connection)
XCTAssertEqual(keepAliveFailed.connection, .closeConnection(connection, []))
let connectionClosed = stateMachine.connectionClosed(connection)
XCTAssertEqual(connectionClosed.connection, .makeConnection(.init(connectionID: 1), []))
connection.closeIfClosing()
}

}
Loading