diff --git a/Sources/MassTransit/MassTransitConsumer.swift b/Sources/MassTransit/MassTransitConsumer.swift index d6f4f68..9edfd0b 100644 --- a/Sources/MassTransit/MassTransitConsumer.swift +++ b/Sources/MassTransit/MassTransitConsumer.swift @@ -19,8 +19,16 @@ public actor MassTransitConsumer: Service { let onConsumerReady: (@Sendable () async throws -> Void)? let logger: Logger + struct MessageTypeConsumer { + var messageExchange: String + var exchangeOptions: ExchangeOptions + var routingKey: String + var bindingOptions: BindingOptions + var handler: (ByteBuffer) throws -> Void + } + private(set) public var isConsumerReady = false - private var consumers: [String: (ByteBuffer) throws -> Void] = [:] + private var consumers: [String: MessageTypeConsumer] = [:] /// Create the MassTransit Consumer. /// @@ -54,16 +62,29 @@ public actor MassTransitConsumer: Service { } private func createMessageConsumer( - _: T.Type, messageType: String - ) -> AnyAsyncSequence> { + _: T.Type, + _ messageType: String, + _ messageExchange: String, + _ exchangeOptions: ExchangeOptions, + _ routingKey: String, + _ bindingOptions: BindingOptions + ) async throws -> AnyAsyncSequence> { assert(consumers[messageType] == nil, "Consumer for \(messageType) is already registered!") + // We need to declare & bind an exchange for this message + try await bindMessageExchange(messageExchange, exchangeOptions, routingKey, bindingOptions) + // Create a stream + continuation logger.info("Consuming messages of type \(messageType) on queue \(queueName)...") let (stream, continuation) = AsyncStream.makeStream(of: MassTransitWrapper.self) // Create a message handler - consumers[messageType] = { buffer in + consumers[messageType] = MessageTypeConsumer( + messageExchange: messageExchange, + exchangeOptions: exchangeOptions, + routingKey: routingKey, + bindingOptions: bindingOptions + ) { buffer in let wrapper = try MassTransitWrapper(T.self, from: buffer) continuation.yield(wrapper) } @@ -86,73 +107,29 @@ public actor MassTransitConsumer: Service { consumers.removeValue(forKey: urn(from: messageType)) } - private func waitForConnectionAndConsumer(timeout: Duration) async { - let start = ContinuousClock().now - while !Task.isCancelledOrShuttingDown { - if await connection.isConnected && isConsumerReady { - break - } - - if ContinuousClock().now - start >= timeout { - break - } - - await gracefulCancellableDelay(connection.connectionPollingInterval) - } - } - private func bindMessageExchange( _ messageExchange: String, _ exchangeOptions: ExchangeOptions, _ routingKey: String, _ bindingOptions: BindingOptions ) async throws { - var firstAttempt = true - let firstAttemptStart = ContinuousClock().now - - // Wait for connection and consumer before attempting to bind - await waitForConnectionAndConsumer(timeout: retryInterval) - - while !Task.isCancelledOrShuttingDown { - do { - guard let channel = try await connection.getChannel() else { - throw AMQPConnectionError.connectionClosed(replyCode: nil, replyText: nil) - } - - // Declare messageExchange using options - try await channel.exchangeDeclare(messageExchange, exchangeOptions, logger) - - // Bind messageExchange to main exchange for this consumer - try await channel.exchangeBind( - exchangeName, messageExchange, routingKey, bindingOptions, logger - ) - - return - } catch AMQPConnectionError.connectionClosed(_, _) { - if !firstAttempt { - logger.error( - "Connection closed while setting up message binding \(messageExchange)" - ) - } - - // Wait for connection, timeout after retryInterval - await waitForConnectionAndConsumer(timeout: retryInterval) + try await withRetryingConnectionBody( + connection, operationName: "setting up message binding \(messageExchange)", + retryInterval: retryInterval, exitOnSuccess: true + ) { + guard let channel = try await self.connection.getChannel() else { + throw AMQPConnectionError.connectionClosed(replyCode: nil, replyText: nil) + } - firstAttempt = false - } catch { - // If this is our first attempt to connect, keep trying until we reach the timeout - if firstAttempt && ContinuousClock().now - firstAttemptStart < retryInterval { - await gracefulCancellableDelay(connection.connectionPollingInterval) - continue - } + // Declare messageExchange using options + try await channel.exchangeDeclare(messageExchange, exchangeOptions, self.logger) - logger.error("Error setting up message binding \(messageExchange): \(error)") + // Bind messageExchange to main exchange for this consumer + try await channel.exchangeBind( + self.exchangeName, messageExchange, routingKey, bindingOptions, self.logger + ) - // Consume retry - logger.debug("Will retry setting up \(messageExchange) in \(retryInterval)") - try await Task.sleep(for: retryInterval) - firstAttempt = false - } + return } } @@ -183,11 +160,10 @@ public actor MassTransitConsumer: Service { // Determine message type let messageType = customMessageType ?? messageExchange - // We need to declare & bind an exchange for this message - try await bindMessageExchange(messageExchange, exchangeOptions, routingKey, bindingOptions) - // Create a stream and message handler - let consumerStream = createMessageConsumer(T.self, messageType: messageType) + let consumerStream = try await createMessageConsumer( + T.self, messageType, messageExchange, exchangeOptions, routingKey, bindingOptions + ) return .init( consumerStream.compactMap { wrapper in @@ -223,11 +199,10 @@ public actor MassTransitConsumer: Service { // Determine message type let messageType = customMessageType ?? messageExchange - // We need to declare & bind an exchange for this message - try await bindMessageExchange(messageExchange, exchangeOptions, routingKey, bindingOptions) - // Create a stream and message handler - let consumerStream = createMessageConsumer(T.self, messageType: messageType) + let consumerStream = try await createMessageConsumer( + T.self, messageType, messageExchange, exchangeOptions, routingKey, bindingOptions + ) return .init( consumerStream.compactMap { wrapper in @@ -245,6 +220,25 @@ public actor MassTransitConsumer: Service { ) } + private func handleConsumerStarting() { + isConsumerReady = false + } + + private func handleConsumerSuccess() async throws { + try await self.onConsumerReady?() + self.isConsumerReady = true + + // Re-bind consumer exchanges + for messageTypeConsumer in consumers.values { + try await bindMessageExchange( + messageTypeConsumer.messageExchange, + messageTypeConsumer.exchangeOptions, + messageTypeConsumer.routingKey, + messageTypeConsumer.bindingOptions + ) + } + } + /// Run the consumer. /// /// This is *REQUIRED* to run the RabbitMq consumer and process messages from the resulting @@ -253,13 +247,23 @@ public actor MassTransitConsumer: Service { public func run() async throws { logger.info("Starting consumer on queue \(queueName)...") let consumer = configuration.createConsumer(using: connection, queueName, exchangeName, routingKey) - let consumeStream = try await consumer.retryingConsumeBuffer(retryInterval: retryInterval) - try await onConsumerReady?() - isConsumerReady = true - // Consume messages from the consumer - for await buffer in consumeStream.cancelOnGracefulShutdown() { - process(buffer) + try await withRetryingConnectionBody( + connection, operationName: "consuming from queue \(queueName)", retryInterval: retryInterval + ) { + // When we are starting the consumer, it is not ready + await self.handleConsumerStarting() + + // Try to consume + let consumeStream = try await consumer.consumeBuffer() + + // Consumer is ready, let everyone know and bind any pending consumers + try await self.handleConsumerSuccess() + + // Consume messages from the consumer + for await buffer in consumeStream.cancelOnGracefulShutdown() { + await self.process(buffer) + } } } @@ -274,13 +278,13 @@ public actor MassTransitConsumer: Service { // Looking for matching consumers for messageType in wrapper.messageType.map({ $0.replacingOccurrences(of: "urn:message:", with: "") }) { - guard let handler = consumers[messageType] else { + guard let messageTypeConsumer = consumers[messageType] else { continue } // If there is a matching type, try to process it logger.trace("Message type associated for queue \(queueName): \(messageType)") - try handler(buffer) + try messageTypeConsumer.handler(buffer) handled = true }