Skip to content

Commit

Permalink
fix(producer): properly handle request timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed May 5, 2024
1 parent cae25c4 commit 1fc125e
Showing 1 changed file with 37 additions and 16 deletions.
53 changes: 37 additions & 16 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Klife.Producer.Dispatcher do
defstruct [
:batcher_pid,
:broker_id,
:requests
:requests,
:timeouts
]

use GenServer
Expand Down Expand Up @@ -44,6 +45,7 @@ defmodule Klife.Producer.Dispatcher do
def init(args) do
state = %__MODULE__{
requests: %{},
timeouts: %{},
broker_id: args[:broker_id],
batcher_pid: args[:batcher_pid]
}
Expand All @@ -65,12 +67,18 @@ defmodule Klife.Producer.Dispatcher do

%__MODULE__{
batcher_pid: batcher_pid,
requests: requests
requests: requests,
timeouts: timeouts
} = state

case do_dispatch(data, state) do
:ok ->
new_state = %{state | requests: Map.put(requests, request_ref, data)}
{:ok, timeout_ref} ->
new_state = %{
state
| requests: Map.put(requests, request_ref, data),
timeouts: Map.put(timeouts, request_ref, timeout_ref)
}

{:reply, :ok, new_state}

{:error, :retry} ->
Expand Down Expand Up @@ -98,12 +106,12 @@ defmodule Klife.Producer.Dispatcher do

%__MODULE__{
batcher_pid: batcher_pid,
requests: requests
timeouts: timeouts
} = state

case do_dispatch(data, state) do
:ok ->
{:noreply, state}
{:ok, timeout_ref} ->
{:noreply, %{state | timeouts: Map.put(timeouts, request_ref, timeout_ref)}}

{:error, :retry} ->
Process.send_after(self(), {:dispatch, request_ref}, retry_ms)
Expand All @@ -113,9 +121,19 @@ defmodule Klife.Producer.Dispatcher do
failed_topic_partitions = Map.keys(batch_to_send)
send(batcher_pid, {:bump_epoch, failed_topic_partitions})
send(batcher_pid, {:request_completed, pool_idx})
{:noreply, remove_request(state, req_ref)}
end
end

new_state = %{state | requests: Map.delete(requests, request_ref)}
{:noreply, new_state}
def handle_info({:check_timeout, req_ref, timeout_ref}, %__MODULE__{} = state) do
case Map.get(state.timeouts, req_ref) do
^timeout_ref ->
%{producer_config: %{retry_ms: retry_ms}} = Map.fetch!(state.requests, req_ref)
Process.send_after(self(), {:dispatch, req_ref}, retry_ms)
{:noreply, state}

_ ->
{:noreply, state}
end
end

Expand Down Expand Up @@ -165,7 +183,7 @@ defmodule Klife.Producer.Dispatcher do
case failure_list do
[] ->
send(batcher_pid, {:request_completed, pool_idx})
{:noreply, %{state | requests: Map.delete(requests, req_ref)}}
{:noreply, remove_request(state, req_ref)}

error_list ->
# TODO: Enhance specific code error handling
Expand Down Expand Up @@ -224,7 +242,7 @@ defmodule Klife.Producer.Dispatcher do

if Map.keys(new_batch_to_send) == [] do
send(batcher_pid, {:request_completed, pool_idx})
{:noreply, %{state | requests: Map.delete(requests, req_ref)}}
{:noreply, remove_request(state, req_ref)}
else
Process.send_after(self(), {:dispatch, req_ref}, p_config.retry_ms)
new_req_data = %{data | batch_to_send: new_batch_to_send}
Expand Down Expand Up @@ -262,7 +280,9 @@ defmodule Klife.Producer.Dispatcher do

with {:before_deadline?, true} <- {:before_deadline?, before_deadline?},
:ok <- send_to_broker_async(cluster_name, state.broker_id, content, headers, req_ref) do
:ok
timeout_ref = make_ref()
Process.send_after(self(), {:check_timeout, req_ref, timeout_ref}, req_timeout)
{:ok, timeout_ref}
else
{:before_deadline?, false} ->
{:error, :request_deadline}
Expand All @@ -282,10 +302,6 @@ defmodule Klife.Producer.Dispatcher do
Broker.send_message(M.Produce, cluster_name, broker_id, content, headers, opts)
end

def broker_callback(binary_response, req_ref, dispatcher_pid) do
send(dispatcher_pid, {:broker_response, req_ref, binary_response})
end

defp parse_batch_before_send(batch_to_send) do
batch_to_send
|> Enum.group_by(fn {k, _} -> elem(k, 0) end, fn {k, v} -> {elem(k, 1), v} end)
Expand All @@ -302,4 +318,9 @@ defmodule Klife.Producer.Dispatcher do
}
end)
end

defp remove_request(%__MODULE__{} = state, req_ref) do
%__MODULE__{requests: requests, timeouts: timeouts} = state
%{state | requests: Map.delete(requests, req_ref), timeouts: Map.delete(timeouts, req_ref)}
end
end

0 comments on commit 1fc125e

Please sign in to comment.