diff --git a/lib/klife/producer/dispatcher.ex b/lib/klife/producer/dispatcher.ex index dff7785..91b7e88 100644 --- a/lib/klife/producer/dispatcher.ex +++ b/lib/klife/producer/dispatcher.ex @@ -13,7 +13,8 @@ defmodule Klife.Producer.Dispatcher do defstruct [ :batcher_pid, :broker_id, - :requests + :requests, + :timeouts ] use GenServer @@ -44,6 +45,7 @@ defmodule Klife.Producer.Dispatcher do def init(args) do state = %__MODULE__{ requests: %{}, + timeouts: %{}, broker_id: args[:broker_id], batcher_pid: args[:batcher_pid] } @@ -65,12 +67,18 @@ defmodule Klife.Producer.Dispatcher do %__MODULE__{ batcher_pid: batcher_pid, - requests: requests + requests: requests, + timeouts: timeouts } = state case do_dispatch(data, state) do - :ok -> - new_state = %{state | requests: Map.put(requests, request_ref, data)} + {:ok, timeout_ref} -> + new_state = %{ + state + | requests: Map.put(requests, request_ref, data), + timeouts: Map.put(timeouts, request_ref, timeout_ref) + } + {:reply, :ok, new_state} {:error, :retry} -> @@ -98,12 +106,12 @@ defmodule Klife.Producer.Dispatcher do %__MODULE__{ batcher_pid: batcher_pid, - requests: requests + timeouts: timeouts } = state case do_dispatch(data, state) do - :ok -> - {:noreply, state} + {:ok, timeout_ref} -> + {:noreply, %{state | timeouts: Map.put(timeouts, request_ref, timeout_ref)}} {:error, :retry} -> Process.send_after(self(), {:dispatch, request_ref}, retry_ms) @@ -113,9 +121,19 @@ defmodule Klife.Producer.Dispatcher do failed_topic_partitions = Map.keys(batch_to_send) send(batcher_pid, {:bump_epoch, failed_topic_partitions}) send(batcher_pid, {:request_completed, pool_idx}) + {:noreply, remove_request(state, req_ref)} + end + end - new_state = %{state | requests: Map.delete(requests, request_ref)} - {:noreply, new_state} + def handle_info({:check_timeout, req_ref, timeout_ref}, %__MODULE__{} = state) do + case Map.get(state.timeouts, req_ref) do + ^timeout_ref -> + %{producer_config: %{retry_ms: retry_ms}} = Map.fetch!(state.requests, req_ref) + Process.send_after(self(), {:dispatch, req_ref}, retry_ms) + {:noreply, state} + + _ -> + {:noreply, state} end end @@ -165,7 +183,7 @@ defmodule Klife.Producer.Dispatcher do case failure_list do [] -> send(batcher_pid, {:request_completed, pool_idx}) - {:noreply, %{state | requests: Map.delete(requests, req_ref)}} + {:noreply, remove_request(state, req_ref)} error_list -> # TODO: Enhance specific code error handling @@ -224,7 +242,7 @@ defmodule Klife.Producer.Dispatcher do if Map.keys(new_batch_to_send) == [] do send(batcher_pid, {:request_completed, pool_idx}) - {:noreply, %{state | requests: Map.delete(requests, req_ref)}} + {:noreply, remove_request(state, req_ref)} else Process.send_after(self(), {:dispatch, req_ref}, p_config.retry_ms) new_req_data = %{data | batch_to_send: new_batch_to_send} @@ -262,7 +280,9 @@ defmodule Klife.Producer.Dispatcher do with {:before_deadline?, true} <- {:before_deadline?, before_deadline?}, :ok <- send_to_broker_async(cluster_name, state.broker_id, content, headers, req_ref) do - :ok + timeout_ref = make_ref() + Process.send_after(self(), {:check_timeout, req_ref, timeout_ref}, req_timeout) + {:ok, timeout_ref} else {:before_deadline?, false} -> {:error, :request_deadline} @@ -282,10 +302,6 @@ defmodule Klife.Producer.Dispatcher do Broker.send_message(M.Produce, cluster_name, broker_id, content, headers, opts) end - def broker_callback(binary_response, req_ref, dispatcher_pid) do - send(dispatcher_pid, {:broker_response, req_ref, binary_response}) - end - defp parse_batch_before_send(batch_to_send) do batch_to_send |> Enum.group_by(fn {k, _} -> elem(k, 0) end, fn {k, v} -> {elem(k, 1), v} end) @@ -302,4 +318,9 @@ defmodule Klife.Producer.Dispatcher do } end) end + + defp remove_request(%__MODULE__{} = state, req_ref) do + %__MODULE__{requests: requests, timeouts: timeouts} = state + %{state | requests: Map.delete(requests, req_ref), timeouts: Map.delete(timeouts, req_ref)} + end end