Skip to content

akash-akya/off_broadway_redis_stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

OffBroadwayRedisStream

CI Hex.pm docs

A Redis Stream consumer for Broadway. A Redis Stream consumer implementation for Broadway, enabling concurrent processing of Redis Stream messages with built-in fault tolerance and automatic failover.

Features

  • Seamless integration with Broadway for concurrent message processing
  • Automatic failover and message recovery when nodes fail
  • Configurable consumer groups for distributed processing
  • Message retry handling with configurable attempts

It supports failover by automatically claiming pending messages when a node dies. A node is considered dead when it fails to send heartbeats.

Installation

Add off_broadway_redis_stream to your list of dependencies in mix.exs:

def deps do
  [
    {:off_broadway_redis_stream, "~> x.x.x"}
  ]
end

Usage

Basic Setup

Here's a complete example demonstrating how to set up a Broadway pipeline with Redis Stream:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {OffBroadwayRedisStream.Producer,
           [
             redis_client_opts: [host: "localhost"],
             stream: "orders",
             group: "processor-group",
             consumer_name: hostname()
           ]}
      ],
      processors: [
        default: [min_demand: 5, max_demand: 1000]
      ]
    )
  end

  def handle_message(_, message, _) do
    [_id, key_value_list] = message.data
    IO.inspect(key_value_list, label: "Got message")
    message
  end

  @max_attempts 5

  def handle_failed(messages, _) do
    for message <- messages do
      if message.metadata.attempt < @max_attempts do
        Broadway.Message.configure_ack(message, retry: true)
      else
        [id, _] = message.data
        IO.inspect(id, label: "Dropping")
      end
    end
  end

  defp hostname do
    {:ok, host} = :inet.gethostname()
    to_string(host)
  end
end

Check documentation for more details.

License

Copyright (c) 2021 Akash Hiremath.

OffBroadwayRedisStream source code is released under Apache License 2.0. Check LICENSE for more information.