diff --git a/lib/klife/producer/dispatcher.ex b/lib/klife/producer/dispatcher.ex index 91b7e88..29166d9 100644 --- a/lib/klife/producer/dispatcher.ex +++ b/lib/klife/producer/dispatcher.ex @@ -146,7 +146,8 @@ defmodule Klife.Producer.Dispatcher do %__MODULE__{ batcher_pid: batcher_pid, requests: requests, - broker_id: broker_id + broker_id: broker_id, + timeouts: timeouts } = state data = @@ -180,75 +181,70 @@ defmodule Klife.Producer.Dispatcher do end) end) - case failure_list do - [] -> - send(batcher_pid, {:request_completed, pool_idx}) - {:noreply, remove_request(state, req_ref)} + grouped_errors = + Enum.group_by(failure_list, fn {topic, partition, error_code, _base_offset} -> + cond do + error_code in @delivery_discard_codes -> + Logger.warning(""" + Fatal error while producing message. Message will be discarded! - error_list -> - # TODO: Enhance specific code error handling - # TODO: One major problem with the current implementantion is that - # one bad topic can hold the in flight request spot for a long time - # can it be handled better without a new producer? - grouped_errors = - Enum.group_by(error_list, fn {topic, partition, error_code, _base_offset} -> - cond do - error_code in @delivery_discard_codes -> - Logger.warning(""" - Fatal error while producing message. Message will be discarded! - - topic: #{topic} - partition: #{partition} - error_code: #{error_code} - - cluster: #{cluster_name} - broker_id: #{broker_id} - producer_name: #{producer_name} - """) - - :discard - - true -> - Logger.warning(""" - Error while producing message. Message will be retried! - - topic: #{topic} - partition: #{partition} - error_code: #{error_code} - - cluster: #{cluster_name} - broker_id: #{broker_id} - producer_name: #{producer_name} - """) - - :retry - end - end) + topic: #{topic} + partition: #{partition} + error_code: #{error_code} - to_discard = grouped_errors[:discard] || [] + cluster: #{cluster_name} + broker_id: #{broker_id} + producer_name: #{producer_name} + """) - Enum.each(to_discard, fn {topic, partition, error_code, _base_offset} -> - delivery_confirmation_pids - |> Map.get({topic, partition}, []) - |> Enum.reverse() - |> Enum.each(fn {pid, _batch_offset} -> - send(pid, {:klife_produce_sync, :error, error_code}) - end) - end) - - to_drop_list = List.flatten([success_list, to_discard]) - to_drop_keys = Enum.map(to_drop_list, fn {t, p, _, _} -> {t, p} end) - new_batch_to_send = Map.drop(batch_to_send, to_drop_keys) - - if Map.keys(new_batch_to_send) == [] do - send(batcher_pid, {:request_completed, pool_idx}) - {:noreply, remove_request(state, req_ref)} - else - Process.send_after(self(), {:dispatch, req_ref}, p_config.retry_ms) - new_req_data = %{data | batch_to_send: new_batch_to_send} - new_state = %{state | requests: Map.put(requests, req_ref, new_req_data)} - {:noreply, new_state} + :discard + + true -> + Logger.warning(""" + Error while producing message. Message will be retried! + + topic: #{topic} + partition: #{partition} + error_code: #{error_code} + + cluster: #{cluster_name} + broker_id: #{broker_id} + producer_name: #{producer_name} + """) + + :retry end + end) + + to_discard = grouped_errors[:discard] || [] + to_retry = grouped_errors[:retry] || [] + + Enum.each(to_discard, fn {topic, partition, error_code, _base_offset} -> + delivery_confirmation_pids + |> Map.get({topic, partition}, []) + |> Enum.reverse() + |> Enum.each(fn {pid, _batch_offset} -> + send(pid, {:klife_produce_sync, :error, error_code}) + end) + end) + + to_retry_keys = Enum.map(to_retry, fn {t, p, _, _} -> {t, p} end) + new_batch_to_send = Map.take(batch_to_send, to_retry_keys) + + if new_batch_to_send == %{} do + send(batcher_pid, {:request_completed, pool_idx}) + {:noreply, remove_request(state, req_ref)} + else + Process.send_after(self(), {:dispatch, req_ref}, p_config.retry_ms) + new_req_data = %{data | batch_to_send: new_batch_to_send} + + new_state = %{ + state + | requests: Map.put(requests, req_ref, new_req_data), + timeouts: Map.delete(timeouts, req_ref) + } + + {:noreply, new_state} end end diff --git a/lib/klife/producer/producer.ex b/lib/klife/producer/producer.ex index 4aa2b74..1cc8de7 100644 --- a/lib/klife/producer/producer.ex +++ b/lib/klife/producer/producer.ex @@ -141,15 +141,15 @@ defmodule Klife.Producer do end) end) |> List.flatten() - |> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, batcher_id: d_id} -> + |> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, batcher_id: b_id} -> # Used when a record is produced by a non default producer # in this case the proper batcher_id won't be present at # main metadata ets table, therefore we need a way to # find out it's value. - put_batcher_id(cluster_name, producer_name, t_name, p_idx, d_id) + put_batcher_id(cluster_name, producer_name, t_name, p_idx, b_id) if ProducerController.get_default_producer(cluster_name, t_name, p_idx) == producer_name do - ProducerController.update_batcher_id(cluster_name, t_name, p_idx, d_id) + ProducerController.update_batcher_id(cluster_name, t_name, p_idx, b_id) end end) end