Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation change: refactors dead-letter exchange example and its description #134

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 92 additions & 28 deletions lib/broadway_rabbitmq/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -376,56 +376,120 @@ defmodule BroadwayRabbitMQ.Producer do

## Dead-letter Exchanges

Here's an example of how to use a dead-letter exchange setup with broadway_rabbitmq:
Dead-letter exchanges are normal RabbitMQ exchanges designated for receiving messages
that have been rejected elsewhere. You can reference a dead-letter exchange when
defining a queue by supplying an `"x-dead-letter-exchange"` argument and optionally an
whatyouhide marked this conversation as resolved.
Show resolved Hide resolved
`"x-dead-letter-routing-key"`. When a queue has a dead-letter exchange defined, then
failing a message with `Broadway.failed/2` or raising an exception in `Broadway.handle_message/3`
whatyouhide marked this conversation as resolved.
Show resolved Hide resolved
causes the message to be republished to the exchange named in the `"x-dead-letter-exchange"`
argument. The message's original routing key is kept unless the `"x-dead-letter-routing-key"`
argument specifies an override.

A bit more care is needed during setup when using dead-letter exchanges because the dead-letter
exchange must be declared and exist _before_ you attempt to reference it when declaring a queue.
whatyouhide marked this conversation as resolved.
Show resolved Hide resolved
For this reason, you may need to take additional steps when setting up your RabbitMQ instance
beyond what is available to you in the `:after_connect` option.

You can declare your exchanges and queues in a `Mix` task or before your application starts:

{:ok, connection} = AMQP.Connection.open()
fireproofsocks marked this conversation as resolved.
Show resolved Hide resolved
{:ok, channel} = AMQP.Channel.open(connection)

# Declare exchanges
:ok = AMQP.Exchange.declare(channel, "my_exchange", :fanout, durable: true)
:ok = AMQP.Exchange.declare(channel, "my_exchange.dlx", :fanout, durable: true)

# Define and bind queues within the exchange depending on your needs
{:ok, _} = AMQP.Queue.declare(channel, "my_queue.dlx", durable: true)
:ok = AMQP.Queue.bind(channel, "my_queue.dlx", "my_exchange.dlx", [])

{:ok, _} =
AMQP.Queue.declare(channel, "my_queue",
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, "my_exchange.dlx"},
{"x-dead-letter-routing-key", :longstr, "my_queue.dlx"}
]
)

:ok = AMQP.Queue.bind(channel, "my_queue", "my_exchange", [])

whatyouhide marked this conversation as resolved.
Show resolved Hide resolved

Once you have your exchanges and queues established, you can start Broadway pipelines
fireproofsocks marked this conversation as resolved.
Show resolved Hide resolved
to consume messages in the queues. For a thorough example, we need one pipeline which
will fail messages and another pipeline to consume messages from the dead-letter exchange.

In this example, `MyPipeline` represents the primary pipeline which may reject/fail
messages:

defmodule MyPipeline do
use Broadway

require Logger
fireproofsocks marked this conversation as resolved.
Show resolved Hide resolved
@queue "my_queue"
@exchange "my_exchange"
@queue_dlx "my_queue.dlx"
@exchange_dlx "my_exchange.dlx"

def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwayRabbitMQ.Producer,
on_failure: :reject,
after_connect: &declare_rabbitmq_topology/1,
queue: @queue,
declare: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, @exchange_dlx},
{"x-dead-letter-routing-key", :longstr, @queue_dlx}
]
],
bindings: [{@exchange, []}],
},
module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue},
concurrency: 2
],
processors: [default: [concurrency: 4]]
)
end

defp declare_rabbitmq_topology(amqp_channel) do
with :ok <- AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true),
:ok <- AMQP.Exchange.declare(amqp_channel, @exchange_dlx, :fanout, durable: true),
{:ok, _} <- AMQP.Queue.declare(amqp_channel, @queue_dlx, durable: true),
:ok <- AMQP.Queue.bind(amqp_channel, @queue_dlx, @exchange_dlx) do
:ok
end
@impl true
def handle_message(_processor, message, _context) do
# Raising errors or returning a "failed" message here sends the message to the
# dead-letter queue, e.g.
Logger.debug("Failing message; this should republish to the dead-letter exchange")

Broadway.Message.failed(
message,
"Failing a message triggers republication to the dead-letter exchange"
)
end
end

In order to see the message forwarding in action, we can spin up another pipeline that
consumes messages from the `my_queue.dlx` queue:

defmodule DeadPipeline do
use Broadway
require Logger

@queue_dlx "my_queue.dlx"

def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {BroadwayRabbitMQ.Producer, on_failure: :reject, queue: @queue_dlx},
concurrency: 2
],
processors: [default: [concurrency: 4]]
)
end

@impl true
def handle_message(_processor, message, _context) do
# Raising errors or returning a "failed" message here sends the message to the
# dead-letter queue.
Logger.debug("Dead letter message received!")
message
end
end


whatyouhide marked this conversation as resolved.
Show resolved Hide resolved

To test out the dead-letter republishing behavior, try publishing a message into your
primary exchange and observe that it gets republished and consumed by the queue in the
dead-letter exchange, e.g.
whatyouhide marked this conversation as resolved.
Show resolved Hide resolved

iex> {:ok, connection} = AMQP.Connection.open()
iex> {:ok, channel} = AMQP.Channel.open(connection)
iex> AMQP.Basic.publish(channel, "my_exchange", "my_queue", "Am I dead?")
:ok
[debug] Failing message; this should republish to the dead-letter exchange
[debug] Dead letter message received!
whatyouhide marked this conversation as resolved.
Show resolved Hide resolved
"""

use GenStage
Expand Down