Skip to content

Commit

Permalink
Add new withRetryingConnectionBody function to retry a function body …
Browse files Browse the repository at this point in the history
…with an active Connection
  • Loading branch information
xtremekforever committed Feb 11, 2025
1 parent 73b6f16 commit bb3b870
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions Sources/MassTransit/Utils/withRetryingConnectionBody.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import AMQPClient
import Logging
import RabbitMq

func withRetryingConnectionBody(
_ connection: Connection,
operationName: String,
retryInterval: Duration = MassTransit.defaultRetryInterval,
exitOnSuccess: Bool = false,
body: @escaping @Sendable () async throws -> Void
) async throws {
var firstAttempt = true
let firstAttemptStart = ContinuousClock().now

// Wait for connection, timeout after retryInterval
await connection.waitForConnection(timeout: retryInterval)

while !Task.isCancelledOrShuttingDown {
do {
connection.logger.trace("Starting body for operation \"\(operationName)\"...")
try await body()
if exitOnSuccess {
return
}
} catch AMQPConnectionError.connectionClosed(let replyCode, let replyText) {
if !firstAttempt {
let error = AMQPConnectionError.connectionClosed(replyCode: replyCode, replyText: replyText)
connection.logger.error(
"Connection closed while \(operationName): \(error)"
)
}

// Wait for connection, timeout after retryInterval
await connection.waitForConnection(timeout: retryInterval)

firstAttempt = false
continue
} catch {
// If this is our first attempt to connect, keep trying until we reach the timeout
if firstAttempt && ContinuousClock().now - firstAttemptStart < retryInterval {
await gracefulCancellableDelay(connection.connectionPollingInterval)
continue
}

connection.logger.error("Error \(operationName): \(error)")
firstAttempt = false
}

// Retry here
connection.logger.trace("Will retry operation \"\(operationName)\" in \(retryInterval)...")
try await Task.sleep(for: retryInterval)
}
}

0 comments on commit bb3b870

Please sign in to comment.