|
| 1 | +defmodule Honeybadger.EventsWorker do |
| 2 | + @moduledoc """ |
| 3 | + A GenServer that batches and sends events with retry and throttling logic. |
| 4 | +
|
| 5 | + It accumulates events in a queue, forms batches when the batch size is reached or |
| 6 | + when a flush timeout expires, and then sends these batches to a backend module. |
| 7 | + If a batch fails to send, it will be retried (up to a configurable maximum) or dropped. |
| 8 | + In case of throttling (e.g. receiving a 429), the flush delay is increased. |
| 9 | + """ |
| 10 | + |
| 11 | + @dropped_log_interval 60_000 |
| 12 | + |
| 13 | + use GenServer |
| 14 | + require Logger |
| 15 | + |
| 16 | + defmodule State do |
| 17 | + @typedoc """ |
| 18 | + Function that accepts a list of events to be processed. |
| 19 | + """ |
| 20 | + @type send_events_fn :: ([term()] -> :ok | {:error, :throttled} | {:error, term()}) |
| 21 | + |
| 22 | + @typedoc """ |
| 23 | + State for the event batching GenServer. |
| 24 | + """ |
| 25 | + @type t :: %__MODULE__{ |
| 26 | + # Configuration |
| 27 | + send_events_fn: send_events_fn(), |
| 28 | + batch_size: pos_integer(), |
| 29 | + max_queue_size: pos_integer(), |
| 30 | + timeout: pos_integer(), |
| 31 | + max_batch_retries: non_neg_integer(), |
| 32 | + throttle_wait: pos_integer(), |
| 33 | + |
| 34 | + # Internal state |
| 35 | + timeout_started_at: non_neg_integer(), |
| 36 | + throttling: boolean(), |
| 37 | + dropped_events: non_neg_integer(), |
| 38 | + last_dropped_log: non_neg_integer(), |
| 39 | + queue: [any()], |
| 40 | + batches: :queue.queue() |
| 41 | + } |
| 42 | + |
| 43 | + @enforce_keys [ |
| 44 | + :send_events_fn, |
| 45 | + :batch_size, |
| 46 | + :max_queue_size, |
| 47 | + :max_batch_retries |
| 48 | + ] |
| 49 | + |
| 50 | + defstruct [ |
| 51 | + :send_events_fn, |
| 52 | + :batch_size, |
| 53 | + :max_queue_size, |
| 54 | + :timeout, |
| 55 | + :max_batch_retries, |
| 56 | + :last_dropped_log, |
| 57 | + timeout_started_at: 0, |
| 58 | + throttle_wait: 60000, |
| 59 | + throttling: false, |
| 60 | + dropped_events: 0, |
| 61 | + queue: [], |
| 62 | + batches: :queue.new() |
| 63 | + ] |
| 64 | + end |
| 65 | + |
| 66 | + @spec start_link(Keyword.t()) :: GenServer.on_start() |
| 67 | + def start_link(opts \\ []) do |
| 68 | + if Honeybadger.get_env(:events_worker_enabled) do |
| 69 | + {name, opts} = Keyword.pop(opts, :name, __MODULE__) |
| 70 | + GenServer.start_link(__MODULE__, opts, name: name) |
| 71 | + else |
| 72 | + :ignore |
| 73 | + end |
| 74 | + end |
| 75 | + |
| 76 | + @spec push(event :: map(), GenServer.server()) :: :ok |
| 77 | + def push(event, server \\ __MODULE__) do |
| 78 | + GenServer.cast(server, {:push, event}) |
| 79 | + end |
| 80 | + |
| 81 | + @spec state(GenServer.server()) :: State.t() |
| 82 | + def state(server \\ __MODULE__) do |
| 83 | + GenServer.call(server, {:state}) |
| 84 | + end |
| 85 | + |
| 86 | + @impl true |
| 87 | + def init(opts) do |
| 88 | + config = %{ |
| 89 | + send_events_fn: Keyword.get(opts, :send_events_fn, &Honeybadger.Client.send_events/1), |
| 90 | + batch_size: Keyword.get(opts, :batch_size, Honeybadger.get_env(:events_batch_size)), |
| 91 | + timeout: Keyword.get(opts, :timeout, Honeybadger.get_env(:events_timeout)), |
| 92 | + throttle_wait: |
| 93 | + Keyword.get(opts, :throttle_wait, Honeybadger.get_env(:events_throttle_wait)), |
| 94 | + max_queue_size: |
| 95 | + Keyword.get(opts, :max_queue_size, Honeybadger.get_env(:events_max_queue_size)), |
| 96 | + max_batch_retries: |
| 97 | + Keyword.get(opts, :max_batch_retries, Honeybadger.get_env(:events_max_batch_retries)), |
| 98 | + last_dropped_log: System.monotonic_time(:millisecond) |
| 99 | + } |
| 100 | + |
| 101 | + state = struct!(State, config) |
| 102 | + {:ok, state} |
| 103 | + end |
| 104 | + |
| 105 | + @impl true |
| 106 | + def handle_call({:state}, _from, %State{} = state) do |
| 107 | + {:reply, state, state, current_timeout(state)} |
| 108 | + end |
| 109 | + |
| 110 | + @impl true |
| 111 | + def handle_cast({:push, event}, %State{timeout_started_at: 0} = state) do |
| 112 | + handle_cast({:push, event}, reset_timeout(state)) |
| 113 | + end |
| 114 | + |
| 115 | + def handle_cast({:push, event}, %State{} = state) do |
| 116 | + if total_event_count(state) >= state.max_queue_size do |
| 117 | + {:noreply, %{state | dropped_events: state.dropped_events + 1}, current_timeout(state)} |
| 118 | + else |
| 119 | + queue = [event | state.queue] |
| 120 | + |
| 121 | + if length(queue) >= state.batch_size do |
| 122 | + flush(%{state | queue: queue}) |
| 123 | + else |
| 124 | + {:noreply, %{state | queue: queue}, current_timeout(state)} |
| 125 | + end |
| 126 | + end |
| 127 | + end |
| 128 | + |
| 129 | + @impl true |
| 130 | + def handle_info(:timeout, state), do: flush(state) |
| 131 | + |
| 132 | + @impl true |
| 133 | + def terminate(_reason, %State{} = state) do |
| 134 | + Logger.debug("[Honeybadger] Terminating with #{total_event_count(state)} events unsent") |
| 135 | + _ = flush(state) |
| 136 | + :ok |
| 137 | + end |
| 138 | + |
| 139 | + @spec flush(State.t()) :: {:noreply, State.t(), pos_integer()} |
| 140 | + defp flush(state) do |
| 141 | + cond do |
| 142 | + state.queue == [] and :queue.is_empty(state.batches) -> |
| 143 | + # It's all empty so we stop the timeout and reset the |
| 144 | + # timeout_started_at which will restart on the next push |
| 145 | + {:noreply, %{state | timeout_started_at: 0}} |
| 146 | + |
| 147 | + state.queue == [] -> |
| 148 | + attempt_send(state) |
| 149 | + |
| 150 | + true -> |
| 151 | + batches = :queue.in(%{batch: Enum.reverse(state.queue), attempts: 0}, state.batches) |
| 152 | + attempt_send(%{state | queue: [], batches: batches}) |
| 153 | + end |
| 154 | + end |
| 155 | + |
| 156 | + @spec attempt_send(State.t()) :: {:noreply, State.t(), pos_integer()} |
| 157 | + # Sends pending batches, handling retries and throttling |
| 158 | + defp attempt_send(%State{} = state) do |
| 159 | + {new_batches_list, throttling} = |
| 160 | + Enum.reduce(:queue.to_list(state.batches), {[], false}, fn |
| 161 | + # If already throttled, skip sending and retain the batch. |
| 162 | + b, {acc, true} -> |
| 163 | + {acc ++ [b], true} |
| 164 | + |
| 165 | + %{batch: batch, attempts: attempts} = b, {acc, false} -> |
| 166 | + case state.send_events_fn.(batch) do |
| 167 | + :ok -> |
| 168 | + Logger.debug("[Honeybadger] Sent batch of #{length(batch)} events.") |
| 169 | + {acc, false} |
| 170 | + |
| 171 | + {:error, reason} -> |
| 172 | + throttling = reason == :throttled |
| 173 | + updated_attempts = attempts + 1 |
| 174 | + |
| 175 | + if throttling do |
| 176 | + Logger.warning( |
| 177 | + "[Honeybadger] Rate limited (429) events - (batch attempt #{updated_attempts}) - waiting for #{state.throttle_wait}ms" |
| 178 | + ) |
| 179 | + else |
| 180 | + Logger.debug( |
| 181 | + "[Honeybadger] Failed to send events batch (attempt #{updated_attempts}): #{inspect(reason)}" |
| 182 | + ) |
| 183 | + end |
| 184 | + |
| 185 | + if updated_attempts < state.max_batch_retries do |
| 186 | + {acc ++ [%{b | attempts: updated_attempts}], throttling} |
| 187 | + else |
| 188 | + Logger.debug( |
| 189 | + "[Honeybadger] Dropping events batch after #{updated_attempts} attempts." |
| 190 | + ) |
| 191 | + |
| 192 | + {acc, throttling} |
| 193 | + end |
| 194 | + end |
| 195 | + end) |
| 196 | + |
| 197 | + current_time = System.monotonic_time(:millisecond) |
| 198 | + |
| 199 | + # Log dropped events if present and we haven't logged within the last |
| 200 | + # @dropped_log_interval |
| 201 | + state = |
| 202 | + if state.dropped_events > 0 and |
| 203 | + current_time - state.last_dropped_log >= @dropped_log_interval do |
| 204 | + Logger.info("[Honeybadger] Dropped #{state.dropped_events} events due to max queue limit") |
| 205 | + %{state | dropped_events: 0, last_dropped_log: current_time} |
| 206 | + else |
| 207 | + state |
| 208 | + end |
| 209 | + |
| 210 | + new_state = |
| 211 | + %{state | batches: :queue.from_list(new_batches_list), throttling: throttling} |
| 212 | + |> reset_timeout() |
| 213 | + |
| 214 | + {:noreply, new_state, current_timeout(new_state)} |
| 215 | + end |
| 216 | + |
| 217 | + @spec total_event_count(State.t()) :: non_neg_integer() |
| 218 | + # Counts events in both the queue and pending batches. |
| 219 | + defp total_event_count(%State{batches: batches, queue: queue}) do |
| 220 | + events_count = length(queue) |
| 221 | + |
| 222 | + batch_count = :queue.fold(fn %{batch: b}, acc -> acc + length(b) end, 0, batches) |
| 223 | + |
| 224 | + events_count + batch_count |
| 225 | + end |
| 226 | + |
| 227 | + # Returns the time remaining until the next flush |
| 228 | + defp current_timeout(%State{ |
| 229 | + throttling: throttling, |
| 230 | + timeout: timeout, |
| 231 | + throttle_wait: throttle_wait, |
| 232 | + timeout_started_at: timeout_started_at |
| 233 | + }) do |
| 234 | + elapsed = System.monotonic_time(:millisecond) - timeout_started_at |
| 235 | + timeout = if throttling, do: throttle_wait, else: timeout |
| 236 | + max(1, timeout - elapsed) |
| 237 | + end |
| 238 | + |
| 239 | + defp reset_timeout(state) do |
| 240 | + %{state | timeout_started_at: System.monotonic_time(:millisecond)} |
| 241 | + end |
| 242 | +end |
0 commit comments