Skip to content

Commit

Permalink
Implement retry functionality for binding in the MassTransitConsumer
Browse files Browse the repository at this point in the history
 - We track the options used to bind the exchanges for the message type consumers, then once we are able to reconnect, we re-bind the exchanges as desired.
  • Loading branch information
xtremekforever committed Feb 11, 2025
1 parent bb3b870 commit b5401bc
Showing 1 changed file with 82 additions and 78 deletions.
160 changes: 82 additions & 78 deletions Sources/MassTransit/MassTransitConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ public actor MassTransitConsumer: Service {
let onConsumerReady: (@Sendable () async throws -> Void)?
let logger: Logger

struct MessageTypeConsumer {
var messageExchange: String
var exchangeOptions: ExchangeOptions
var routingKey: String
var bindingOptions: BindingOptions
var handler: (ByteBuffer) throws -> Void
}

private(set) public var isConsumerReady = false
private var consumers: [String: (ByteBuffer) throws -> Void] = [:]
private var consumers: [String: MessageTypeConsumer] = [:]

/// Create the MassTransit Consumer.
///
Expand Down Expand Up @@ -54,16 +62,29 @@ public actor MassTransitConsumer: Service {
}

private func createMessageConsumer<T: MassTransitMessage>(
_: T.Type, messageType: String
) -> AnyAsyncSequence<MassTransitWrapper<T>> {
_: T.Type,
_ messageType: String,
_ messageExchange: String,
_ exchangeOptions: ExchangeOptions,
_ routingKey: String,
_ bindingOptions: BindingOptions
) async throws -> AnyAsyncSequence<MassTransitWrapper<T>> {
assert(consumers[messageType] == nil, "Consumer for \(messageType) is already registered!")

// We need to declare & bind an exchange for this message
try await bindMessageExchange(messageExchange, exchangeOptions, routingKey, bindingOptions)

// Create a stream + continuation
logger.info("Consuming messages of type \(messageType) on queue \(queueName)...")
let (stream, continuation) = AsyncStream.makeStream(of: MassTransitWrapper<T>.self)

// Create a message handler
consumers[messageType] = { buffer in
consumers[messageType] = MessageTypeConsumer(
messageExchange: messageExchange,
exchangeOptions: exchangeOptions,
routingKey: routingKey,
bindingOptions: bindingOptions
) { buffer in
let wrapper = try MassTransitWrapper(T.self, from: buffer)
continuation.yield(wrapper)
}
Expand All @@ -86,73 +107,29 @@ public actor MassTransitConsumer: Service {
consumers.removeValue(forKey: urn(from: messageType))
}

private func waitForConnectionAndConsumer(timeout: Duration) async {
let start = ContinuousClock().now
while !Task.isCancelledOrShuttingDown {
if await connection.isConnected && isConsumerReady {
break
}

if ContinuousClock().now - start >= timeout {
break
}

await gracefulCancellableDelay(connection.connectionPollingInterval)
}
}

private func bindMessageExchange(
_ messageExchange: String,
_ exchangeOptions: ExchangeOptions,
_ routingKey: String,
_ bindingOptions: BindingOptions
) async throws {
var firstAttempt = true
let firstAttemptStart = ContinuousClock().now

// Wait for connection and consumer before attempting to bind
await waitForConnectionAndConsumer(timeout: retryInterval)

while !Task.isCancelledOrShuttingDown {
do {
guard let channel = try await connection.getChannel() else {
throw AMQPConnectionError.connectionClosed(replyCode: nil, replyText: nil)
}

// Declare messageExchange using options
try await channel.exchangeDeclare(messageExchange, exchangeOptions, logger)

// Bind messageExchange to main exchange for this consumer
try await channel.exchangeBind(
exchangeName, messageExchange, routingKey, bindingOptions, logger
)

return
} catch AMQPConnectionError.connectionClosed(_, _) {
if !firstAttempt {
logger.error(
"Connection closed while setting up message binding \(messageExchange)"
)
}

// Wait for connection, timeout after retryInterval
await waitForConnectionAndConsumer(timeout: retryInterval)
try await withRetryingConnectionBody(
connection, operationName: "setting up message binding \(messageExchange)",
retryInterval: retryInterval, exitOnSuccess: true
) {
guard let channel = try await self.connection.getChannel() else {
throw AMQPConnectionError.connectionClosed(replyCode: nil, replyText: nil)
}

firstAttempt = false
} catch {
// If this is our first attempt to connect, keep trying until we reach the timeout
if firstAttempt && ContinuousClock().now - firstAttemptStart < retryInterval {
await gracefulCancellableDelay(connection.connectionPollingInterval)
continue
}
// Declare messageExchange using options
try await channel.exchangeDeclare(messageExchange, exchangeOptions, self.logger)

logger.error("Error setting up message binding \(messageExchange): \(error)")
// Bind messageExchange to main exchange for this consumer
try await channel.exchangeBind(
self.exchangeName, messageExchange, routingKey, bindingOptions, self.logger
)

// Consume retry
logger.debug("Will retry setting up \(messageExchange) in \(retryInterval)")
try await Task.sleep(for: retryInterval)
firstAttempt = false
}
return
}
}

Expand Down Expand Up @@ -183,11 +160,10 @@ public actor MassTransitConsumer: Service {
// Determine message type
let messageType = customMessageType ?? messageExchange

// We need to declare & bind an exchange for this message
try await bindMessageExchange(messageExchange, exchangeOptions, routingKey, bindingOptions)

// Create a stream and message handler
let consumerStream = createMessageConsumer(T.self, messageType: messageType)
let consumerStream = try await createMessageConsumer(
T.self, messageType, messageExchange, exchangeOptions, routingKey, bindingOptions
)

return .init(
consumerStream.compactMap { wrapper in
Expand Down Expand Up @@ -223,11 +199,10 @@ public actor MassTransitConsumer: Service {
// Determine message type
let messageType = customMessageType ?? messageExchange

// We need to declare & bind an exchange for this message
try await bindMessageExchange(messageExchange, exchangeOptions, routingKey, bindingOptions)

// Create a stream and message handler
let consumerStream = createMessageConsumer(T.self, messageType: messageType)
let consumerStream = try await createMessageConsumer(
T.self, messageType, messageExchange, exchangeOptions, routingKey, bindingOptions
)

return .init(
consumerStream.compactMap { wrapper in
Expand All @@ -245,6 +220,25 @@ public actor MassTransitConsumer: Service {
)
}

private func handleConsumerStarting() {
isConsumerReady = false
}

private func handleConsumerSuccess() async throws {
try await self.onConsumerReady?()
self.isConsumerReady = true

// Re-bind consumer exchanges
for messageTypeConsumer in consumers.values {
try await bindMessageExchange(
messageTypeConsumer.messageExchange,
messageTypeConsumer.exchangeOptions,
messageTypeConsumer.routingKey,
messageTypeConsumer.bindingOptions
)
}
}

/// Run the consumer.
///
/// This is *REQUIRED* to run the RabbitMq consumer and process messages from the resulting
Expand All @@ -253,13 +247,23 @@ public actor MassTransitConsumer: Service {
public func run() async throws {
logger.info("Starting consumer on queue \(queueName)...")
let consumer = configuration.createConsumer(using: connection, queueName, exchangeName, routingKey)
let consumeStream = try await consumer.retryingConsumeBuffer(retryInterval: retryInterval)
try await onConsumerReady?()
isConsumerReady = true

// Consume messages from the consumer
for await buffer in consumeStream.cancelOnGracefulShutdown() {
process(buffer)
try await withRetryingConnectionBody(
connection, operationName: "consuming from queue \(queueName)", retryInterval: retryInterval
) {
// When we are starting the consumer, it is not ready
await self.handleConsumerStarting()

// Try to consume
let consumeStream = try await consumer.consumeBuffer()

// Consumer is ready, let everyone know and bind any pending consumers
try await self.handleConsumerSuccess()

// Consume messages from the consumer
for await buffer in consumeStream.cancelOnGracefulShutdown() {
await self.process(buffer)
}
}
}

Expand All @@ -274,13 +278,13 @@ public actor MassTransitConsumer: Service {

// Looking for matching consumers
for messageType in wrapper.messageType.map({ $0.replacingOccurrences(of: "urn:message:", with: "") }) {
guard let handler = consumers[messageType] else {
guard let messageTypeConsumer = consumers[messageType] else {
continue
}

// If there is a matching type, try to process it
logger.trace("Message type associated for queue \(queueName): \(messageType)")
try handler(buffer)
try messageTypeConsumer.handler(buffer)
handled = true
}

Expand Down

0 comments on commit b5401bc

Please sign in to comment.