Skip to content

Commit

Permalink
feat(dispatcher): handle producer epoch bumps per topic/partition
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Apr 14, 2024
1 parent 99fb3bc commit 97a5d41
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 75 deletions.
195 changes: 122 additions & 73 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Klife.Producer.Dispatcher do
defstruct [
:producer_config,
:producer_id,
:producer_epoch,
:producer_epochs,
:base_sequences,
:broker_id,
:current_batch,
Expand Down Expand Up @@ -72,8 +72,7 @@ defmodule Klife.Producer.Dispatcher do
next_send_msg_ref: next_send_msg_ref,
producer_id: producer_id,
base_sequences: %{},
producer_epoch:
get_producer_epoch(cluster_name, broker_id, producer_config.producer_name, dispatcher_id),
producer_epochs: %{},
broker_id: broker_id,
dispatcher_id: dispatcher_id,
producer_config: producer_config
Expand All @@ -82,21 +81,6 @@ defmodule Klife.Producer.Dispatcher do
{:ok, state}
end

# TODO: Use an unique producer epoch for each topic partition
defp get_producer_epoch(cluster_name, broker_id, producer_name, dispatcher_id) do
key = {__MODULE__, cluster_name, broker_id, producer_name, dispatcher_id, :epoch}

case :persistent_term.get(key, nil) do
nil ->
counter = :atomics.new(1, [])
:persistent_term.put(key, counter)
:atomics.add_get(counter, 1, 1)

counter ->
:atomics.add_get(counter, 1, 1)
end
end

defp get_producer_id(cluster_name, broker_id) do
content = %{
transactional_id: nil,
Expand Down Expand Up @@ -227,27 +211,24 @@ defmodule Klife.Producer.Dispatcher do
end
end

def handle_info({:broker_delivery_success, pool_idx}, %__MODULE__{} = state) do
%__MODULE__{
in_flight_pool: in_flight_pool
} = state
def handle_info({:bump_epoch, topics_partitions_list}, %__MODULE__{} = state) do
%{producer_epochs: pe, base_sequences: bs} = state

{:noreply, %{state | in_flight_pool: List.replace_at(in_flight_pool, pool_idx, nil)}}
{new_pe, new_bs} =
Enum.reduce(topics_partitions_list, {pe, bs}, fn key, {acc_pe, acc_bs} ->
new_pe = Map.put(acc_pe, key, Map.get(acc_pe, key, 0) + 1)
new_bs = Map.replace!(acc_bs, key, 0)
{new_pe, new_bs}
end)

{:noreply, %{state | producer_epochs: new_pe, base_sequences: new_bs}}
end

def handle_info({:broker_delivery_error, pool_idx, :timeout}, %__MODULE__{} = state) do
def handle_info({:request_completed, pool_idx}, %__MODULE__{} = state) do
%__MODULE__{
in_flight_pool: in_flight_pool
} = state

Logger.error("""
Timeout error while produce to broker.
cluster: #{state.producer_config.cluster_name}
broker_id: #{state.broker_id}
producer_name: #{state.producer_config.producer_name}
""")

{:noreply, %{state | in_flight_pool: List.replace_at(in_flight_pool, pool_idx, nil)}}
end

Expand Down Expand Up @@ -351,6 +332,10 @@ defmodule Klife.Producer.Dispatcher do
current_base_time: batch_base_time
} = data_to_send

# TODO: When in flight > 1 messages can be written on socket
# out of order leading to idempotency errors and added latency
# for idempotent producers. One way to fix this is to serialize
# socket writes and move the response handler to other process.
{:ok, task_pid} =
Task.Supervisor.start_child(
via_tuple({Klife.Producer.DispatcherTaskSupervisor, cluster_name}),
Expand Down Expand Up @@ -429,29 +414,37 @@ defmodule Klife.Producer.Dispatcher do
base_sequences: bs,
producer_id: p_id,
producer_config: %{enable_idempotence: idempotent?} = pconfig,
producer_epoch: p_epoch
producer_epochs: p_epochs
} = _state,
topic,
partition
) do
{p_epoch, base_seq} =
if idempotent? do
key = {topic, partition}
{Map.get(p_epochs, key, 0), Map.get(bs, key, 0)}
else
{-1, -1}
end

%{
base_offset: 0,
partition_leader_epoch: -1,
magic: 2,
# TODO: Handle different attributes opts
attributes: get_attributes_byte(pconfig, []),
last_offset_delta: -1,
base_timestamp: nil,
max_timestamp: nil,
producer_id: p_id,
producer_epoch: p_epoch,
base_sequence: if(idempotent?, do: Map.get(bs, {topic, partition}, 0), else: -1),
base_sequence: base_seq,
records: [],
records_length: 0
}
end

defp get_attributes_byte(%Producer{} = pconfig, _opts) do
# TODO: Handle different attributes opts
[
compression: pconfig.compression_type
]
Expand Down Expand Up @@ -517,6 +510,8 @@ defmodule Klife.Producer.Dispatcher do
end
end

@delivery_success_codes [0, 46]
@delivery_discard_codes [18, 47]
def do_dispatch_to_broker(
pconfig,
batch_to_send,
Expand Down Expand Up @@ -547,23 +542,22 @@ defmodule Klife.Producer.Dispatcher do
topic_data: parse_batch_before_send(batch_to_send)
}

# Check if it is safe to send the batch
if now + req_timeout - base_time < delivery_timeout - :timer.seconds(5) do
{:ok, resp} =
Broker.send_message(M.Produce, cluster_name, broker_id, content, headers)
before_deadline? = now + req_timeout - base_time < delivery_timeout - :timer.seconds(2)

with {:before_deadline?, true} <- {:before_deadline?, before_deadline?},
{:ok, resp} <- Broker.send_message(M.Produce, cluster_name, broker_id, content, headers) do
grouped_results =
for %{name: topic_name, partition_responses: partition_resps} <- resp.content.responses,
%{error_code: error_code, index: p_index} = p <- partition_resps do
{topic_name, p_index, error_code, p[:base_offset]}
end
|> Enum.group_by(&(elem(&1, 2) == 0))
|> Enum.group_by(&(elem(&1, 2) in @delivery_success_codes))

success_list = grouped_results[true] || []
failure_list = grouped_results[false] || []

success_list
|> Enum.each(fn {topic, partition, 0, base_offset} ->
|> Enum.each(fn {topic, partition, _code, base_offset} ->
delivery_confirmation_pids
|> Map.get({topic, partition}, [])
|> Enum.reverse()
Expand All @@ -574,47 +568,102 @@ defmodule Klife.Producer.Dispatcher do

case failure_list do
[] ->
send(callback_pid, {:broker_delivery_success, pool_idx})
send(callback_pid, {:request_completed, pool_idx})

error_list ->
Enum.each(error_list, fn {topic, partition, error_code, _base_offset} ->
Logger.warning("""
Error while producing message. Retrying...
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")
# TODO: Enhance specific code error handling
# TODO: One major problem with the current implementantion is that
# one bad topic can hold the in flight request spot for a long time
# can it be handled better without a new producer?
grouped_errors =
Enum.group_by(error_list, fn {topic, partition, error_code, _base_offset} ->
cond do
error_code in @delivery_discard_codes ->
Logger.warning("""
Fatal error while producing message. Message will be discarded!
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")

:discard

true ->
Logger.warning("""
Error while producing message. Message will be retried!
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")

:retry
end
end)

to_discard = grouped_errors[:discard] || []

Enum.each(to_discard, fn {topic, partition, error_code, _base_offset} ->
delivery_confirmation_pids
|> Map.get({topic, partition}, [])
|> Enum.reverse()
|> Enum.each(fn {pid, _batch_offset} ->
send(pid, {:klife_produce_sync, :error, error_code})
end)
end)

# TODO: Handle specific errors by code, not just retry all not success
# TODO: Handle out of order error for max_in_flight > 1
success_keys = Enum.map(success_list, fn {t, p, _, _} -> {t, p} end)
new_batch_to_send = Map.drop(batch_to_send, success_keys)

Process.sleep(retry_ms)

do_dispatch_to_broker(
pconfig,
new_batch_to_send,
broker_id,
callback_pid,
delivery_confirmation_pids,
pool_idx,
base_time
)
to_drop_list = List.flatten([success_list, to_discard])
to_drop_keys = Enum.map(to_drop_list, fn {t, p, _, _} -> {t, p} end)
new_batch_to_send = Map.drop(batch_to_send, to_drop_keys)

if Map.keys(new_batch_to_send) == [] do
send(callback_pid, {:request_completed, pool_idx})
else
Process.sleep(retry_ms)

do_dispatch_to_broker(
pconfig,
new_batch_to_send,
broker_id,
callback_pid,
delivery_confirmation_pids,
pool_idx,
base_time
)
end
end
else
send(callback_pid, {:broker_delivery_error, pool_idx, :timeout})
{:before_deadline?, false} ->
failed_topic_partitions = Map.keys(batch_to_send)
send(callback_pid, {:bump_epoch, failed_topic_partitions})
send(callback_pid, {:request_completed, pool_idx})

{:error, _reason} ->
Process.sleep(retry_ms)

do_dispatch_to_broker(
pconfig,
batch_to_send,
broker_id,
callback_pid,
delivery_confirmation_pids,
pool_idx,
base_time
)
end
end

defp parse_batch_before_send(current_batch) do
current_batch
defp parse_batch_before_send(batch_to_send) do
batch_to_send
|> Enum.group_by(fn {k, _} -> elem(k, 0) end, fn {k, v} -> {elem(k, 1), v} end)
|> Enum.map(fn {topic, partitions_list} ->
%{
Expand Down
9 changes: 7 additions & 2 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ defmodule Klife.Producer do
dispatcher_id
)

# TODO: Should we handle cluster change errors here by retrying after a cluster check?
receive do
{:klife_produce_sync, :ok, offset} -> {:ok, offset}
{:klife_produce_sync, :ok, offset} ->
{:ok, offset}

{:klife_produce_sync, :error, err} ->
{:error, err}
after
delivery_timeout_ms ->
{:error, :timeout}
Expand Down Expand Up @@ -140,7 +145,7 @@ defmodule Klife.Producer do
|> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, dispatcher_id: d_id} ->
# Used when a record is produced by a non default producer
# in this case the proper dispatcher_id won't be present at
# main metadata ets table, therefore we need a way to
# main metadata ets table, therefore we need a way to
# find out it's value.
put_dispatcher_id(cluster_name, producer_name, t_name, p_idx, d_id)

Expand Down
1 change: 1 addition & 0 deletions test/producer/dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule Klife.Producer.DispatcherTest do
next_send_msg_ref: nil,
batch_queue: :queue.new(),
base_sequences: %{},
producer_epochs: %{},
producer_id: 123
}

Expand Down

0 comments on commit 97a5d41

Please sign in to comment.