Skip to content

Commit

Permalink
Add termination of body in helpers that use task groups, add document…
Browse files Browse the repository at this point in the history
…ation for with-style helpers
  • Loading branch information
xtremekforever committed Feb 13, 2025
1 parent 705e14a commit c43bcc5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
14 changes: 14 additions & 0 deletions Sources/MassTransit/Utils/withMassTransitConnection.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
import Logging
import RabbitMq

/// Run a `body` with a MassTransit connection in scope.
///
/// This helper makes it easy to connect to a RabbitMQ broker using a `BasicConnection`
/// and create a `MassTransit` instance that can be used in the passed `body`. When the
/// `body` exits, the connection is closed.
///
/// - Parameters:
/// - connectionString: Connection string to use to configure the `BasicConnection`.
/// - connectionConfiguration: The configuration for the `BasicConnection`.
/// - connect: Whether or not to actually connect to the broker.
/// This can be used to test failure scenarios or delay connecting to the broker until later.
/// - logger: Logger to use for the `BasicConnection` and `MassTransit` instance.
/// - body: The body to run. When the body exits, the connection is closed.
/// - Throws: Any error thrown by the `BasicConnection` or `body`.
public func withMassTransitConnection(
connectionString: String = "amqp://guest:guest@localhost/%2F",
connectionConfiguration: ConnectionConfiguration = .init(),
Expand Down
21 changes: 18 additions & 3 deletions Sources/MassTransit/Utils/withRetryingMassTransitConnection.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import Logging
import RabbitMq

/// Run a `body` with a MassTransit connection in scope on a `RetryingConnection`.
///
/// This helper makes it easy to connect to a RabbitMQ broker using a `RetryingConnection`
/// and create a `MassTransit` instance that can be used in the passed `body`.
///
/// - Parameters:
/// - connectionString: Connection string to use to configure the `RetryingConnection`.
/// - connectionConfiguration: The configuration for the `RetryingConnection`.
/// - reconnectionInterval: The reconnection interval to use for retrying the connection.
/// - logger: Logger to use for the `RetryingConnection` and `MassTransit` instance.
/// - body: The body to run. When the body exits, the connection is closed.
/// - Throws: Any error thrown by the `RetryingConnection` or `body`.
public func withRetryingMassTransitConnection(
connectionString: String = "amqp://guest:guest@localhost/%2F",
connectionConfiguration: ConnectionConfiguration = .init(),
Expand All @@ -14,13 +26,16 @@ public func withRetryingMassTransitConnection(
)
let massTransit = MassTransit(connection, logger: logger)

try await withThrowingDiscardingTaskGroup { group in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
await connection.run()
}
group.addTask {
try await body(connection, massTransit)
}

try await body(connection, massTransit)

// Exit task group once one task exits
try await group.next()
group.cancelAll()
}
}
21 changes: 19 additions & 2 deletions Sources/MassTransit/Utils/withRunningMassTransitConsumer.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import Logging
import RabbitMq

/// Run a `body` with a running `MassTransitConsumer` on a given `Connection`.
///
/// This is useful for easily testing a `MassTransitConsumer` that is constructed with a
/// consumer name and configuration, that gets terminated when the `body` exits.
///
/// - Parameters:
/// - connection: The `Connection` to use for the `MassTransitConsumer`.
/// - consumerName: The name of the consumer to use. This will be used for the `queueName` and `exchangeName` of the consumr.
/// - routingKey: Optional routing key to use for this consumer.
/// - configuration: Consumer configuration to use when constructing the consumer.
/// - retryInterval: Retry interval to use for retrying consume and bind operations.
/// - body: The body to run. When the body exits, the consumer is terminated along with streams.
/// - Throws: Any error thrown by the `MassTransitConsumer` or `body`.
public func withRunningMassTransitConsumer(
using connection: Connection,
consumerName: String,
Expand All @@ -15,7 +28,7 @@ public func withRunningMassTransitConsumer(
logger: connection.logger
)

try await withThrowingDiscardingTaskGroup { group in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
do {
try await consumer.run()
Expand All @@ -26,8 +39,12 @@ public func withRunningMassTransitConsumer(
}
}

try await body(consumer)
group.addTask {
try await body(consumer)
}

// Exit task group once one task exits
try await group.next()
group.cancelAll()
}
}

0 comments on commit c43bcc5

Please sign in to comment.