Skip to content

Commit

Permalink
chore(producer): refactor controller and producer modules
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Mar 31, 2024
1 parent 9e18b1b commit 595b304
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 116 deletions.
9 changes: 8 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ config :klife,
num_partitions: 30,
replication_factor: 2
},
%{
name: "benchmark_topic_2",
producer: :benchmark_producer,
num_partitions: 6,
replication_factor: 2
},
%{
name: "my_batch_topic",
enable_produce: true,
Expand All @@ -44,7 +50,8 @@ config :klife,
},
%{
name: "topic_c",
enable_produce: true
enable_produce: true,
producer: :my_batch_producer
},
%{
name: "my_no_batch_topic",
Expand Down
7 changes: 1 addition & 6 deletions lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ defmodule Klife.Connection.Controller do
socket_opts = Keyword.get(opts, :socket_opts, [])
cluster_name = Keyword.fetch!(opts, :cluster_name)

:persistent_term.put(
{:in_flight_messages, cluster_name},
String.to_atom("in_flight_messages.#{cluster_name}")
)

:ets.new(get_in_flight_messages_table_name(cluster_name), [
:set,
:public,
Expand Down Expand Up @@ -252,7 +247,7 @@ defmodule Klife.Connection.Controller do
## PRIVATE FUNCTIONS

defp get_in_flight_messages_table_name(cluster_name),
do: :persistent_term.get({:in_flight_messages, cluster_name})
do: :"in_flight_messages.#{cluster_name}"

defp connect_bootstrap_server(servers, socket_opts) do
conn =
Expand Down
91 changes: 65 additions & 26 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,27 @@ defmodule Klife.Producer.Controller do
topics: Enum.filter(topics_list, &Map.get(&1, :enable_produce, true))
}

:persistent_term.put(
{:producer_topics_partitions, cluster_name},
String.to_atom("producer_topics_partitions.#{cluster_name}")
)

:ets.new(get_producer_topics_table(cluster_name), [:bag, :public, :named_table])

Enum.each(topics_list, fn t ->
:persistent_term.put(
{__MODULE__, cluster_name, t.name},
Map.get(t, :producer, @default_producer.name)
)
end)

:ets.new(topics_partitions_metadata_table(cluster_name), [
:set,
:public,
:named_table,
read_concurrency: true
])

Utils.wait_connection!(cluster_name)

send(self(), :check_metadata)

{:ok, state}
end

# TODO: Rethink this code considering brokers leaving the cluster (should stop the process)
# The connection controller already does this for the connection processes but
# I think it could be better.
def handle_info(:init_producers, %__MODULE__{} = state) do
for producer <- state.producers do
opts =
Expand Down Expand Up @@ -86,16 +83,16 @@ defmodule Klife.Producer.Controller do
Broker.send_message(Messages.Metadata, state.cluster_name, :controller, content)

for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)),
producer_name = get_producer_for_topic(state.cluster_name, topic.name),
config_topic = Enum.find(state.topics, &(&1.name == topic.name)),
partition <- topic.partitions do
:persistent_term.put(
{__MODULE__, state.cluster_name, topic.name, partition.partition_index},
partition.leader_id
)

state.cluster_name
|> get_producer_topics_table()
|> :ets.insert({producer_name, {topic.name, partition.partition_index}})
|> topics_partitions_metadata_table()
|> :ets.insert({
{topic.name, partition.partition_index},
partition.leader_id,
config_topic[:producer_name] || @default_producer.name,
nil
})
end

if Enum.any?(resp.topics, &(&1.error_code != 0)) do
Expand All @@ -108,21 +105,63 @@ defmodule Klife.Producer.Controller do
end

# Public Interface

def get_topics_partitions_metadata(cluster_name, topic, partition) do
[{_key, broker_id, default_producer, dispatcher_id}] =
cluster_name
|> topics_partitions_metadata_table()
|> :ets.lookup({topic, partition})

%{
broker_id: broker_id,
producer_name: default_producer,
dispatcher_id: dispatcher_id
}
end

def get_broker_id(cluster_name, topic, partition) do
:persistent_term.get({__MODULE__, cluster_name, topic, partition})
cluster_name
|> topics_partitions_metadata_table()
|> :ets.lookup_element({topic, partition}, 2)
end

def get_topics_and_partitions_for_producer(cluster_name, producer_name) do
def get_default_producer(cluster_name, topic, partition) do
cluster_name
|> get_producer_topics_table()
|> :ets.lookup_element(producer_name, 2)
|> topics_partitions_metadata_table()
|> :ets.lookup_element({topic, partition}, 3)
end

# Private functions
def get_dispatcher_id(cluster_name, topic, partition) do
cluster_name
|> topics_partitions_metadata_table()
|> :ets.lookup_element({topic, partition}, 4)
end

def update_dispatcher_id(cluster_name, topic, partition, new_dispatcher_id) do
cluster_name
|> topics_partitions_metadata_table()
|> :ets.update_element({topic, partition}, {4, new_dispatcher_id})
end

defp get_producer_topics_table(cluster_name),
do: :persistent_term.get({:producer_topics_partitions, cluster_name})
def get_all_topics_partitions_metadata(cluster_name) do
cluster_name
|> topics_partitions_metadata_table()
|> :ets.tab2list()
|> Enum.map(fn {{topic_name, partition_idx}, leader_id, _default_producer, dispatcher_id} ->
%{
topic_name: topic_name,
partition_idx: partition_idx,
leader_id: leader_id,
dispatcher_id: dispatcher_id
}
end)
end

defp get_producer_for_topic(cluster_name, topic),
def get_producer_for_topic(cluster_name, topic),
do: :persistent_term.get({__MODULE__, cluster_name, topic})

# Private functions

defp topics_partitions_metadata_table(cluster_name),
do: :"topics_partitions_metadata.#{cluster_name}"
end
62 changes: 30 additions & 32 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ defmodule Klife.Producer.Dispatcher do
defstruct [
:producer_config,
:broker_id,
:topic_name,
:topic_partition,
:current_batch,
:current_waiting_pids,
:current_base_time,
Expand All @@ -26,11 +24,16 @@ defmodule Klife.Producer.Dispatcher do
pconfig = Keyword.fetch!(args, :producer_config)
cluster_name = pconfig.cluster_name
broker_id = Keyword.fetch!(args, :broker_id)
topic_name = Keyword.get(args, :topic_name)
topic_partition = Keyword.get(args, :topic_partition)
dispatcher_id = Keyword.fetch!(args, :id)

GenServer.start_link(__MODULE__, args,
name: get_process_name(pconfig, broker_id, topic_name, topic_partition, cluster_name)
name:
get_process_name(
cluster_name,
broker_id,
pconfig.producer_name,
dispatcher_id
)
)
end

Expand Down Expand Up @@ -60,13 +63,18 @@ defmodule Klife.Producer.Dispatcher do
{:ok, state}
end

def produce_sync(record, topic, partition, %Producer{} = pconfig, broker_id, cluster_name) do
pconfig
|> get_process_name(broker_id, topic, partition, cluster_name)
|> GenServer.call(
{:produce_sync, record, topic, partition, estimate_record_size(record)},
pconfig.delivery_timeout_ms
)
def produce_sync(
record,
topic,
partition,
cluster_name,
broker_id,
producer_name,
dispatcher_id
) do
cluster_name
|> get_process_name(broker_id, producer_name, dispatcher_id)
|> GenServer.call({:produce_sync, record, topic, partition, estimate_record_size(record)})
end

def handle_call(
Expand All @@ -75,7 +83,7 @@ defmodule Klife.Producer.Dispatcher do
%__MODULE__{} = state
) do
%{
producer_config: %{linger_ms: linger_ms},
producer_config: %{linger_ms: linger_ms, delivery_timeout_ms: delivery_timeout},
last_batch_sent_at: last_batch_sent_at,
in_flight_pool: in_flight_pool,
batch_queue: batch_queue
Expand All @@ -90,23 +98,24 @@ defmodule Klife.Producer.Dispatcher do

cond do
not on_time? ->
{:reply, :ok, add_record(state, record, topic, partition, pid, rec_size)}
new_state = add_record(state, record, topic, partition, pid, rec_size)
{:reply, {:ok, delivery_timeout}, new_state}

not request_in_flight_available? ->
new_state =
state
|> add_record(record, topic, partition, pid, rec_size)
|> schedule_dispatch(10)

{:reply, :ok, new_state}
{:reply, {:ok, delivery_timeout}, new_state}

batch_queue_is_empty? ->
new_sate =
state
|> add_record(record, topic, partition, pid, rec_size)
|> dispatch_to_broker(pool_idx)

{:reply, :ok, new_sate}
{:reply, {:ok, delivery_timeout}, new_sate}

not batch_queue_is_empty? ->
new_sate =
Expand All @@ -115,7 +124,7 @@ defmodule Klife.Producer.Dispatcher do
|> dispatch_to_broker(pool_idx)
|> schedule_dispatch(10)

{:reply, :ok, new_sate}
{:reply, {:ok, delivery_timeout}, new_sate}
end
end

Expand Down Expand Up @@ -486,23 +495,12 @@ defmodule Klife.Producer.Dispatcher do
end

defp get_process_name(
%Producer{producer_name: pname, linger_ms: 0},
broker_id,
topic,
partition,
cluster_name
) do
via_tuple({__MODULE__, cluster_name, broker_id, pname, topic, partition})
end

defp get_process_name(
%Producer{producer_name: pname},
cluster_name,
broker_id,
_topic,
_partition,
cluster_name
producer_name,
dispatcher_id
) do
via_tuple({__MODULE__, cluster_name, broker_id, pname})
via_tuple({__MODULE__, cluster_name, broker_id, producer_name, dispatcher_id})
end

defp estimate_record_size(record) do
Expand Down
Loading

0 comments on commit 595b304

Please sign in to comment.