Skip to content

Commit

Permalink
feat(producer): initial idempotency implementantion and compression
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Apr 7, 2024
1 parent b2a31ec commit e58d032
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 75 deletions.
10 changes: 10 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ config :klife,
client_id: "my_custom_client_id",
linger_ms: 1_500
},
%{
name: :my_batch_compressed_producer,
client_id: "my_custom_client_id",
linger_ms: 1_500,
compression_type: :snappy
},
%{
name: :benchmark_producer,
client_id: "my_custom_client_id"
Expand Down Expand Up @@ -47,6 +53,10 @@ config :klife,
num_partitions: 30,
replication_factor: 2
},
%{
name: "comression_topic",
producer: :my_batch_compressed_producer
},
%{
name: "dispatcher_benchmark_topic_1",
producer: :dispatcher_benchmark_producer_1,
Expand Down
121 changes: 87 additions & 34 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ defmodule Klife.Producer.Dispatcher do
require Logger
alias Klife.Producer
alias Klife.Connection.Broker
alias KlifeProtocol.Messages
alias KlifeProtocol.Messages, as: M

defstruct [
:producer_config,
:producer_id,
:base_sequences,
:broker_id,
:current_batch,
:current_waiting_pids,
Expand Down Expand Up @@ -41,6 +43,13 @@ defmodule Klife.Producer.Dispatcher do
args_map = Map.new(args)
max_in_flight = args_map.producer_config.max_in_flight_requests
linger_ms = args_map.producer_config.linger_ms
idempotent? = args_map.producer_config.enable_idempotence
cluster_name = args_map.producer_config.cluster_name

producer_id =
if idempotent?,
do: get_producer_id(cluster_name, args_map.broker_id),
else: nil

next_send_msg_ref =
if linger_ms > 0,
Expand All @@ -55,14 +64,28 @@ defmodule Klife.Producer.Dispatcher do
batch_queue: :queue.new(),
last_batch_sent_at: System.monotonic_time(:millisecond),
in_flight_pool: Enum.map(1..max_in_flight, fn _ -> nil end),
next_send_msg_ref: next_send_msg_ref
next_send_msg_ref: next_send_msg_ref,
producer_id: producer_id,
base_sequences: %{}
}

state = %__MODULE__{} = Map.merge(base, args_map)

{:ok, state}
end

defp get_producer_id(cluster_name, broker_id) do
content = %{
transactional_id: nil,
transaction_timeout_ms: 0
}

{:ok, %{content: %{error_code: 0, producer_id: producer_id}}} =
Broker.send_message(M.InitProducerId, cluster_name, broker_id, content)

producer_id
end

def produce_sync(
record,
topic,
Expand Down Expand Up @@ -233,23 +256,23 @@ defmodule Klife.Producer.Dispatcher do
def add_record_to_current_data(
%__MODULE__{
current_estimated_size: curr_size,
producer_config: pconfig,
current_batch: curr_batch,
current_waiting_pids: curr_pids,
current_base_time: curr_base_time
current_base_time: curr_base_time,
base_sequences: base_sequences
} = state,
record,
topic,
partition,
pid,
estimated_size
) do
new_batch = add_record_to_batch(curr_batch, record, topic, partition, pconfig)
new_batch = add_record_to_current_batch(state, record, topic, partition)

%{
state
| current_batch: new_batch,
current_waiting_pids: add_waiting_pid(curr_pids, new_batch, pid, topic, partition),
base_sequences: update_base_sequence(base_sequences, new_batch, topic, partition),
current_base_time: curr_base_time || System.monotonic_time(:millisecond),
current_estimated_size: curr_size + estimated_size
}
Expand Down Expand Up @@ -357,23 +380,62 @@ defmodule Klife.Producer.Dispatcher do

## PRIVATE FUNCTIONS

defp add_record_to_batch(current_batch, record, topic, partition, pconfig) do
case Map.get(current_batch, {topic, partition}) do
defp add_record_to_current_batch(
%__MODULE__{current_batch: batch} = state,
record,
topic,
partition
) do
case Map.get(batch, {topic, partition}) do
nil ->
new_batch =
pconfig
state
|> init_partition_data(topic, partition)
|> add_record_to_batch(record)
|> add_record_to_partition_data(record)

Map.put(current_batch, {topic, partition}, new_batch)
Map.put(batch, {topic, partition}, new_batch)

partition_data ->
new_partition_data = add_record_to_batch(partition_data, record)
Map.replace!(current_batch, {topic, partition}, new_partition_data)
new_partition_data = add_record_to_partition_data(partition_data, record)
Map.replace!(batch, {topic, partition}, new_partition_data)
end
end

defp add_record_to_batch(batch, record) do
defp init_partition_data(
%__MODULE__{
base_sequences: bs,
producer_id: p_id,
producer_config: %{enable_idempotence: idempotent?} = pconfig
} = _state,
topic,
partition
) do
%{
base_offset: 0,
partition_leader_epoch: -1,
magic: 2,
# TODO: Handle different attributes opts
attributes: get_attributes_byte(pconfig, []),
last_offset_delta: -1,
base_timestamp: nil,
max_timestamp: nil,
producer_id: p_id,
# TODO: Handle producer epoch
producer_epoch: -1,
base_sequence: if(idempotent?, do: Map.get(bs, {topic, partition}, 0) + 1, else: -1),
records: [],
records_length: 0
}
end

defp get_attributes_byte(%Producer{} = pconfig, _opts) do
[
compression: pconfig.compression_type
]
|> KlifeProtocol.RecordBatch.encode_attributes()
end

defp add_record_to_partition_data(batch, record) do
now = DateTime.to_unix(DateTime.utc_now())

new_offset_delta = batch.last_offset_delta + 1
Expand All @@ -398,22 +460,13 @@ defmodule Klife.Producer.Dispatcher do
}
end

defp init_partition_data(%Producer{} = _pconfig, _topic, _partition) do
# TODO: Use proper values here
%{
base_offset: 0,
partition_leader_epoch: -1,
magic: 2,
attributes: 0,
last_offset_delta: -1,
base_timestamp: nil,
max_timestamp: nil,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: [],
records_length: 0
}
defp update_base_sequence(curr_base_sequences, new_batch, topic, partition) do
new_base_sequence =
new_batch
|> Map.fetch!({topic, partition})
|> Map.fetch!(:base_sequence)

Map.put(curr_base_sequences, {topic, partition}, new_base_sequence)
end

defp add_waiting_pid(waiting_pids, _new_batch, nil, _topic, _partition), do: waiting_pids
Expand Down Expand Up @@ -465,7 +518,7 @@ defmodule Klife.Producer.Dispatcher do
# Check if it is safe to send the batch
if now + req_timeout - base_time < delivery_timeout - :timer.seconds(5) do
{:ok, resp} =
Broker.send_message(Messages.Produce, cluster_name, broker_id, content, headers)
Broker.send_message(M.Produce, cluster_name, broker_id, content, headers)

grouped_results =
for %{name: topic_name, partition_responses: partition_resps} <- resp.content.responses,
Expand Down Expand Up @@ -493,9 +546,9 @@ defmodule Klife.Producer.Dispatcher do

_list ->
# TODO: Handle specific errors by code, not just retry all not success
new_batch_to_send =
batch_to_send
|> Map.drop(Enum.map(success_list, fn {t, p, _, _} -> {t, p} end))
# TODO: Handle out of order error for max_in_flight > 1
success_keys = Enum.map(success_list, fn {t, p, _, _} -> {t, p} end)
new_batch_to_send = Map.drop(batch_to_send, success_keys)

Process.sleep(retry_ms)

Expand Down
10 changes: 3 additions & 7 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@ defmodule Klife.Producer do
retry_backoff_ms: [type: :non_neg_integer, default: :timer.seconds(1)],
max_in_flight_requests: [type: :non_neg_integer, default: 1],
dispatchers_count: [type: :pos_integer, default: 1],
# Not implemented
max_retries: [type: :timeout, default: :infinity],
# Not implemented
compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none],
# Not implemented
enable_idempotence: [type: :boolean, default: true]
enable_idempotence: [type: :boolean, default: true],
compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none]
]

defstruct (Keyword.keys(@producer_options) -- [:name]) ++ [:producer_name]
Expand Down Expand Up @@ -145,7 +141,7 @@ defmodule Klife.Producer do
# Used when a record is produced by a non default producer
# in this case the proper dispatcher_id won't be present at
# main metadata ets table, therefore we need a way to
# find out it's value.
# find out it's value.
put_dispatcher_id(cluster_name, producer_name, t_name, p_idx, d_id)

if ProducerController.get_default_producer(cluster_name, t_name, p_idx) == producer_name do
Expand Down
38 changes: 38 additions & 0 deletions lib/klife/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,42 @@ defmodule Klife.Utils do
[%{records: records}] = partition_resp.records
records
end

def get_partition_resp_records_by_offset(cluster_name, topic, partition, offset) do
content = %{
replica_id: -1,
max_wait_ms: 1000,
min_bytes: 1,
max_bytes: 100_000,
isolation_level: 0,
topics: [
%{
topic: topic,
partitions: [
%{
partition: partition,
fetch_offset: offset,
# 1 guarantees that only the first record batch will
# be retrieved
partition_max_bytes: 1
}
]
}
]
}

broker = Klife.Producer.Controller.get_broker_id(cluster_name, topic, partition)

{:ok, %{content: content}} =
Klife.Connection.Broker.send_message(
KlifeProtocol.Messages.Fetch,
cluster_name,
broker,
content
)

topic_resp = Enum.find(content.responses, &(&1.topic == topic))
partition_resp = Enum.find(topic_resp.partitions, &(&1.partition_index == partition))
partition_resp.records
end
end
5 changes: 3 additions & 2 deletions test/producer/dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ defmodule Klife.Producer.DispatcherTest do
enable_idempotence: true,
linger_ms: 10_000,
max_in_flight_requests: 2,
max_retries: :infinity,
producer_name: :my_batch_producer,
request_timeout_ms: 15000,
retry_backoff_ms: 1000
Expand All @@ -35,7 +34,9 @@ defmodule Klife.Producer.DispatcherTest do
last_batch_sent_at: System.monotonic_time(:millisecond),
in_flight_pool: [nil, nil],
next_send_msg_ref: nil,
batch_queue: :queue.new()
batch_queue: :queue.new(),
base_sequences: %{},
producer_id: 123
}

assert {:reply, {:ok, 60000}, new_state} =
Expand Down
Loading

0 comments on commit e58d032

Please sign in to comment.