From d25c4df93900d6402091c2e909b23edb0bc3922c Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Sun, 17 Sep 2023 11:23:53 -0300 Subject: [PATCH] feat(producer): add batch queue when batch size limit reached --- config/config.exs | 9 +- config/dev.exs | 18 +- lib/klife/producer/controller.ex | 1 - lib/klife/producer/dispatcher.ex | 387 ++++++++++++++++++++---------- lib/klife/producer/producer.ex | 13 +- lib/klife/utils.ex | 6 +- lib/mix/tasks/benchmark.ex | 48 ++-- mix.exs | 3 +- mix.lock | 7 +- test/producer/dispatcher_test.exs | 48 +++- 10 files changed, 370 insertions(+), 170 deletions(-) diff --git a/config/config.exs b/config/config.exs index 10e5e93..be224f1 100644 --- a/config/config.exs +++ b/config/config.exs @@ -21,11 +21,13 @@ config :klife, linger_ms: 100 }, %{ - name: :my_no_batch_producer, - client_id: "my_no_batch_producer", + name: :benchmark_producer, + client_id: "my_custom_client_id", + max_in_flight_requests: 1 } ], topics: [ + %{name: "benchmark_topic", producer: :benchmark_producer}, %{ name: "my_batch_topic", enable_produce: true, @@ -41,8 +43,7 @@ config :klife, }, %{ name: "my_no_batch_topic", - enable_produce: true, - producer: :my_no_batch_producer + enable_produce: true } ] ] diff --git a/config/dev.exs b/config/dev.exs index 597e51e..b133c1b 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,8 +1,24 @@ import Config -# KAFKA EX BENCHMARK +# kafka_ex benchmark config :kafka_ex, brokers: [ {"localhost", 19092}, {"localhost", 29092} + ], + kafka_version: "kayrock" + +# Brod benchmark +config :brod, + clients: [ + kafka_client: [ + endpoints: [localhost: 19092, localhost: 29092], + auto_start_producers: true, + default_producer_config: [ + required_acks: -1, + partition_onwire_limit: 1, + max_linger_ms: 0, + max_batch_size: 512_000 + ] + ] ] diff --git a/lib/klife/producer/controller.ex b/lib/klife/producer/controller.ex index 3a1ff5b..4e28c40 100644 --- a/lib/klife/producer/controller.ex +++ b/lib/klife/producer/controller.ex @@ -3,7 +3,6 @@ defmodule Klife.Producer.Controller do import Klife.ProcessRegistry - require Logger alias KlifeProtocol.Messages alias Klife.Connection.Broker alias Klife.Utils diff --git a/lib/klife/producer/dispatcher.ex b/lib/klife/producer/dispatcher.ex index d186636..44296ec 100644 --- a/lib/klife/producer/dispatcher.ex +++ b/lib/klife/producer/dispatcher.ex @@ -15,8 +15,11 @@ defmodule Klife.Producer.Dispatcher do :current_batch, :current_waiting_pids, :current_base_time, + :current_estimated_size, :last_batch_sent_at, - :in_flight_queue + :in_flight_pool, + :next_send_msg_ref, + :batch_queue ] def start_link(args) do @@ -33,68 +36,86 @@ defmodule Klife.Producer.Dispatcher do def init(args) do args_map = Map.new(args) - max_in_flight = args_map.producer_config.max_inflight_requests + max_in_flight = args_map.producer_config.max_in_flight_requests linger_ms = args_map.producer_config.linger_ms + next_send_msg_ref = + if linger_ms > 0, + do: Process.send_after(self(), :send_to_broker, linger_ms), + else: nil + base = %__MODULE__{ current_batch: %{}, current_waiting_pids: %{}, current_base_time: nil, + current_estimated_size: 0, + batch_queue: :queue.new(), last_batch_sent_at: System.monotonic_time(:millisecond), - in_flight_queue: Enum.map(1..max_in_flight, fn _ -> nil end) + in_flight_pool: Enum.map(1..max_in_flight, fn _ -> nil end), + next_send_msg_ref: next_send_msg_ref } state = %__MODULE__{} = Map.merge(base, args_map) - if linger_ms > 0 do - Process.send_after(self(), :send_to_broker, linger_ms) - end - {:ok, state} end def produce_sync(record, topic, partition, %Producer{} = pconfig, broker_id, cluster_name) do pconfig |> get_process_name(broker_id, topic, partition, cluster_name) - |> GenServer.call({:produce_sync, record, topic, partition}, pconfig.delivery_timeout_ms) + |> GenServer.call( + {:produce_sync, record, topic, partition, estimate_record_size(record)}, + pconfig.delivery_timeout_ms + ) end - def handle_call({:produce_sync, record, topic, partition}, from, %__MODULE__{} = state) do + def handle_call( + {:produce_sync, record, topic, partition, rec_size}, + {pid, _tag}, + %__MODULE__{} = state + ) do %{ - producer_config: %{linger_ms: linger_ms} = pconfig, - current_batch: current_batch, + producer_config: %{linger_ms: linger_ms}, last_batch_sent_at: last_batch_sent_at, - in_flight_queue: in_flight_queue, - current_waiting_pids: current_waiting_pids + in_flight_pool: in_flight_pool, + batch_queue: batch_queue } = state now = System.monotonic_time(:millisecond) - queue_position = Enum.find_index(in_flight_queue, &is_nil/1) + pool_idx = Enum.find_index(in_flight_pool, &is_nil/1) - new_state = - %{ - state - | current_batch: add_record_to_batch(current_batch, record, topic, partition, pconfig), - current_waiting_pids: add_waiting_pid(current_waiting_pids, from, topic, partition) - } - |> case do - %{current_base_time: nil} = state -> - %{state | current_base_time: now} + on_time? = now - last_batch_sent_at >= linger_ms + request_in_flight_available? = is_number(pool_idx) + batch_queue_is_empty? = :queue.is_empty(batch_queue) + + cond do + not on_time? -> + {:reply, :ok, add_record(state, record, topic, partition, pid, rec_size)} - state -> + not request_in_flight_available? -> + new_state = state - end + |> add_record(record, topic, partition, pid, rec_size) + |> schedule_dispatch(20) - on_time = now - last_batch_sent_at >= linger_ms - queue_available = is_number(queue_position) + {:reply, :ok, new_state} - cond do - on_time and queue_available -> - {:noreply, send_batch_to_broker(new_state, queue_position)} + batch_queue_is_empty? -> + new_sate = + state + |> add_record(record, topic, partition, pid, rec_size) + |> dispatch_to_broker(pool_idx) - true -> - Process.send_after(self(), :send_to_broker, :timer.seconds(1)) - {:noreply, new_state} + {:reply, :ok, new_sate} + + not batch_queue_is_empty? -> + new_sate = + state + |> add_record(record, topic, partition, pid, rec_size) + |> dispatch_to_broker(pool_idx) + |> schedule_dispatch(20) + + {:reply, :ok, new_sate} end end @@ -102,58 +123,199 @@ defmodule Klife.Producer.Dispatcher do %{ producer_config: %{linger_ms: linger_ms}, last_batch_sent_at: last_batch_sent_at, - in_flight_queue: in_flight_queue + in_flight_pool: in_flight_pool, + batch_queue: batch_queue } = state now = System.monotonic_time(:millisecond) - queue_position = Enum.find_index(in_flight_queue, &is_nil/1) + pool_idx = Enum.find_index(in_flight_pool, &is_nil/1) - on_time = now - last_batch_sent_at >= linger_ms - queue_available = is_number(queue_position) - batch_is_empty = state.current_batch == %{} + on_time? = now - last_batch_sent_at >= linger_ms + in_flight_available? = is_number(pool_idx) + has_batch_on_queue? = not :queue.is_empty(batch_queue) + should_reschedule? = linger_ms > 0 or has_batch_on_queue? + + new_state = %{state | next_send_msg_ref: nil} cond do - batch_is_empty -> - Process.send_after(self(), :send_to_broker, linger_ms) - {:noreply, state} + not on_time? -> + {:noreply, schedule_dispatch(new_state, linger_ms - (now - last_batch_sent_at))} + + not in_flight_available? -> + {:noreply, schedule_dispatch(new_state, 20)} - not on_time -> - Process.send_after(self(), :send_to_broker, linger_ms - (now - last_batch_sent_at)) - {:noreply, state} + should_reschedule? -> + new_state = + new_state + |> dispatch_to_broker(pool_idx) + |> schedule_dispatch(if has_batch_on_queue?, do: 0, else: linger_ms) - on_time and queue_available -> - new_state = send_batch_to_broker(state, queue_position) - Process.send_after(self(), :send_to_broker, linger_ms) {:noreply, new_state} - not queue_available -> - # TODO: Add warning log here - Process.send_after(self(), :send_to_broker, :timer.seconds(1)) - {:noreply, state} + not should_reschedule? -> + {:noreply, dispatch_to_broker(new_state, pool_idx)} end end - def handle_info({:broker_delivery_success, queue_position, resp}, %__MODULE__{} = state) do + def handle_info({:broker_delivery_success, pool_idx}, %__MODULE__{} = state) do %__MODULE__{ - in_flight_queue: in_flight_queue + in_flight_pool: in_flight_pool } = state - %{waiting_pids: waiting_pids} = Enum.at(in_flight_queue, queue_position) + {:noreply, %{state | in_flight_pool: List.replace_at(in_flight_pool, pool_idx, nil)}} + end + + ## State Operations + + def reset_current_data(%__MODULE__{} = state) do + %{ + state + | current_batch: %{}, + current_waiting_pids: %{}, + current_base_time: nil, + current_estimated_size: 0 + } + end + + def move_current_data_to_batch_queue(%__MODULE__{batch_queue: batch_queue} = state) do + data_to_queue = + Map.take(state, [ + :current_batch, + :current_waiting_pids, + :current_base_time, + :current_estimated_size + ]) + + %{state | batch_queue: :queue.in(data_to_queue, batch_queue)} + |> reset_current_data() + end + + def add_record_to_current_data( + %__MODULE__{ + current_estimated_size: curr_size, + producer_config: pconfig, + current_batch: curr_batch, + current_waiting_pids: curr_pids, + current_base_time: curr_base_time + } = state, + record, + topic, + partition, + pid, + estimated_size + ) do + %{ + state + | current_batch: add_record_to_batch(curr_batch, record, topic, partition, pconfig), + current_waiting_pids: add_waiting_pid(curr_pids, pid, topic, partition), + current_base_time: curr_base_time || System.monotonic_time(:millisecond), + current_estimated_size: curr_size + estimated_size + } + end - topic_resps = resp.content.responses + def add_record( + %__MODULE__{ + producer_config: %{batch_size_bytes: batch_size_bytes}, + current_estimated_size: current_estimated_size + } = state, + record, + topic, + partition, + pid, + rec_estimated_size + ) do + if current_estimated_size + rec_estimated_size > batch_size_bytes, + do: + state + |> move_current_data_to_batch_queue() + |> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size), + else: + state + |> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size) + end - for %{name: topic_name, partition_responses: partition_resps} <- topic_resps, - %{base_offset: base_offset, error_code: 0, index: p_index} <- partition_resps do - waiting_pids - |> Map.get({topic_name, p_index}, []) - |> Enum.each(fn {pid, batch_offset} -> - GenServer.reply(pid, {:ok, base_offset + batch_offset}) - end) + def dispatch_to_broker( + %__MODULE__{ + producer_config: + %{ + cluster_name: cluster_name, + client_id: client_id, + acks: acks, + request_timeout_ms: request_timeout + } = pconfig, + broker_id: broker_id, + in_flight_pool: in_flight_pool, + batch_queue: batch_queue + } = state, + pool_idx + ) do + case :queue.out(batch_queue) do + {{:value, queued_data}, new_queue} -> + {queued_data, new_queue, false} + + {:empty, new_queue} -> + if state.current_estimated_size == 0, + do: :noop, + else: {state, new_queue, true} end + |> case do + :noop -> + state - {:noreply, %{state | in_flight_queue: List.replace_at(in_flight_queue, queue_position, nil)}} + {data_to_send, new_batch_queue, sending_from_current?} -> + %{ + current_batch: batch_to_send, + current_waiting_pids: waiting_pids, + current_base_time: batch_base_time + } = data_to_send + + headers = %{client_id: client_id} + + content = %{ + transactional_id: nil, + acks: if(acks == :all, do: -1, else: acks), + timeout_ms: request_timeout, + topic_data: parse_batch_before_send(batch_to_send) + } + + {:ok, task_pid} = + Task.Supervisor.start_child( + via_tuple({Klife.Producer.DispatcherTaskSupervisor, cluster_name}), + __MODULE__, + :do_dispatch_to_broker, + [ + pconfig, + content, + headers, + broker_id, + self(), + waiting_pids, + pool_idx, + batch_base_time + ] + ) + + new_state = %{ + state + | batch_queue: new_batch_queue, + in_flight_pool: List.replace_at(in_flight_pool, pool_idx, task_pid), + last_batch_sent_at: System.monotonic_time(:millisecond) + } + + if sending_from_current?, + do: reset_current_data(new_state), + else: new_state + end end + def schedule_dispatch(%__MODULE__{next_send_msg_ref: nil} = state, time), + do: %{ + state + | next_send_msg_ref: Process.send_after(self(), :send_to_broker, time) + } + + def schedule_dispatch(%__MODULE__{} = state, _), do: state + ## PRIVATE FUNCTIONS defp add_record_to_batch(current_batch, record, topic, partition, pconfig) do @@ -210,15 +372,9 @@ defmodule Klife.Producer.Dispatcher do | records: [new_rec | batch.records], records_length: batch.records_length + 1, last_offset_delta: batch.last_offset_delta + 1, - max_timestamp: now + max_timestamp: now, + base_timestamp: min(batch.base_timestamp, now) } - |> case do - %{base_timestamp: nil} = new_batch -> - %{new_batch | base_timestamp: now} - - new_batch -> - new_batch - end end defp add_waiting_pid(waiting_pids, new_pid, topic, partition) do @@ -231,62 +387,14 @@ defmodule Klife.Producer.Dispatcher do end end - def send_batch_to_broker(%__MODULE__{} = state, queue_position) do - %__MODULE__{ - producer_config: - %{ - cluster_name: cluster_name, - client_id: client_id, - acks: acks, - request_timeout_ms: request_timeout - } = pconfig, - current_batch: current_batch, - broker_id: broker_id, - in_flight_queue: in_flight_queue, - current_waiting_pids: current_waiting_pids - } = state - - headers = %{client_id: client_id} - - content = %{ - transactional_id: nil, - acks: if(acks == :all, do: -1, else: acks), - timeout_ms: request_timeout, - topic_data: parse_batch_before_send(current_batch) - } - - {:ok, task_pid} = - Task.Supervisor.start_child( - via_tuple({Klife.Producer.DispatcherTaskSupervisor, cluster_name}), - __MODULE__, - :do_send_to_broker, - [pconfig, content, headers, broker_id, self(), queue_position, state.current_base_time] - ) - - new_in_flight_queue = - List.replace_at(in_flight_queue, queue_position, %{ - batch: current_batch, - waiting_pids: current_waiting_pids, - task_pid: task_pid - }) - - %{ - state - | in_flight_queue: new_in_flight_queue, - current_batch: %{}, - current_waiting_pids: %{}, - current_base_time: nil, - last_batch_sent_at: System.monotonic_time(:millisecond) - } - end - - def do_send_to_broker( + def do_dispatch_to_broker( pconfig, content, headers, broker_id, callback_pid, - queue_position, + delivery_confirmation_pids, + pool_idx, base_time ) do now = System.monotonic_time(:millisecond) @@ -303,24 +411,35 @@ defmodule Klife.Producer.Dispatcher do case Broker.send_sync(Messages.Produce, cluster_name, broker_id, content, headers) do {:ok, resp} -> # TODO: check non retryable errors and partial errors - send(callback_pid, {:broker_delivery_success, queue_position, resp}) + topic_resps = resp.content.responses + + for %{name: topic_name, partition_responses: partition_resps} <- topic_resps, + %{base_offset: base_offset, error_code: 0, index: p_index} <- partition_resps do + delivery_confirmation_pids + |> Map.get({topic_name, p_index}, []) + |> Enum.each(fn {pid, batch_offset} -> + send(pid, {:klife_produce_sync, :ok, base_offset + batch_offset}) + end) + end + + send(callback_pid, {:broker_delivery_success, pool_idx}) _err -> Process.sleep(retry_ms) - do_send_to_broker( + do_dispatch_to_broker( pconfig, content, headers, broker_id, callback_pid, - queue_position, + delivery_confirmation_pids, + pool_idx, base_time ) end else - IO.inspect("ERROR") - send(callback_pid, {:broker_delivery_error, queue_position, :timeout}) + send(callback_pid, {:broker_delivery_error, pool_idx, :timeout}) end end @@ -360,4 +479,18 @@ defmodule Klife.Producer.Dispatcher do ) do via_tuple({__MODULE__, cluster_name, broker_id, pname}) end + + defp estimate_record_size(record) do + # 50 extra bytes to account for other fields of the protocol + Enum.reduce(record, 50, fn {k, v}, acc -> + acc + do_estimate_size(k, v) + end) + end + + defp do_estimate_size(_k, nil), do: 0 + + defp do_estimate_size(_k, v) when is_list(v), + do: Enum.reduce(v, 0, fn i, acc -> acc + estimate_record_size(i) end) + + defp do_estimate_size(_k, v) when is_binary(v), do: byte_size(v) end diff --git a/lib/klife/producer/producer.ex b/lib/klife/producer/producer.ex index 5b2caea..d8e362e 100644 --- a/lib/klife/producer/producer.ex +++ b/lib/klife/producer/producer.ex @@ -14,13 +14,13 @@ defmodule Klife.Producer do client_id: [type: :string], acks: [type: {:in, [:all, 0, 1]}, default: :all], linger_ms: [type: :non_neg_integer, default: 0], - batch_size_bytes: [type: :non_neg_integer, default: 32_000], + batch_size_bytes: [type: :non_neg_integer, default: 512_000], delivery_timeout_ms: [type: :non_neg_integer, default: :timer.minutes(1)], request_timeout_ms: [type: :non_neg_integer, default: :timer.seconds(15)], retry_backoff_ms: [type: :non_neg_integer, default: :timer.seconds(1)], max_retries: [type: :timeout, default: :infinity], compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none], - max_inflight_requests: [type: :non_neg_integer, default: 1], + max_in_flight_requests: [type: :non_neg_integer, default: 1], enable_idempotence: [type: :boolean, default: true] ] @@ -65,7 +65,14 @@ defmodule Klife.Producer do def produce_sync(record, topic, partition, cluster_name) do broker_id = ProducerController.get_broker_id(cluster_name, topic, partition) pconfig = get_producer_for_topic(cluster_name, topic) - Dispatcher.produce_sync(record, topic, partition, pconfig, broker_id, cluster_name) + :ok = Dispatcher.produce_sync(record, topic, partition, pconfig, broker_id, cluster_name) + + receive do + {:klife_produce_sync, :ok, offset} -> {:ok, offset} + after + pconfig.delivery_timeout_ms -> + {:error, :timeout} + end end defp init_dispatchers(%__MODULE__{linger_ms: 0} = state) do diff --git a/lib/klife/utils.ex b/lib/klife/utils.ex index 6fbb6aa..ea5b402 100644 --- a/lib/klife/utils.ex +++ b/lib/klife/utils.ex @@ -1,7 +1,5 @@ defmodule Klife.Utils do # TODO: Everything that is in here must be moved to a proper place - require Logger - def wait_connection!(cluster_name, timeout \\ :timer.seconds(5)) do deadline = System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native) do_wait_connection!(cluster_name, deadline) @@ -49,8 +47,8 @@ defmodule Klife.Utils do Enum.map(cluster_opts[:topics], fn input -> %{ name: input[:name], - num_partitions: 3, - replication_factor: 1, + num_partitions: 12, + replication_factor: 2, assignments: [], configs: [] } diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index c080241..491ba26 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -8,27 +8,33 @@ if Mix.env() in [:dev] do apply(Mix.Tasks.Benchmark, :do_run_bench, args) end - def do_run_bench("test") do - rec = %{ - value: "1", - key: "key_1", - headers: [%{key: "header_key", value: "header_value"}] - } + defp definition(%{a: a, b: b, c: c}), do: a + b + c + + defp inside(map) do + %{a: a, b: b, c: c} = map + a + b + c + end + + defp no_match(map) do + map.a + map.b + map.c + end - :persistent_term.put({:some, :key}, Application.fetch_env!(:klife, :clusters)) + def do_run_bench("test") do + input = %{a: 1, b: 2, c: 3} Benchee.run( %{ - "app_env" => fn -> Application.fetch_env!(:klife, :clusters) end, - "persistent_term" => fn -> :persistent_term.get({:some, :key}) end + "definition" => fn -> Enum.each(1..1000, fn _ -> definition(input) end) end, + "inside" => fn -> Enum.each(1..1000, fn _ -> inside(input) end) end, + "no_match" => fn -> Enum.each(1..1000, fn _ -> no_match(input) end) end }, time: 10, memory_time: 2 ) end - def do_run_bench("test_producer") do - topic = "my_no_batch_topic" + def do_run_bench("test_producer_sync", parallel) do + topic = "benchmark_topic" val = :rand.bytes(1000) key = "some_key" @@ -39,23 +45,33 @@ if Mix.env() in [:dev] do Benchee.run( %{ - "klife produce_sync" => fn -> + "klife" => fn -> {:ok, offset} = Klife.Producer.produce_sync( record, topic, - Enum.random(0..2), + Enum.random(0..11), :my_test_cluster_1 ) end, "kafka_ex" => fn -> {:ok, offset} = - KafkaEx.produce(topic, Enum.random(0..2), val, key: key, required_acks: -1) + KafkaEx.produce(topic, Enum.random(0..11), val, key: key, required_acks: -1) + end, + "brod" => fn -> + {:ok, offset} = + :brod.produce_sync_offset( + :kafka_client, + topic, + Enum.random(0..11), + key, + val + ) end }, - time: 10, + time: 15, memory_time: 2, - parallel: 10 + parallel: parallel |> String.to_integer() ) end end diff --git a/mix.exs b/mix.exs index 68d7258..d52a636 100644 --- a/mix.exs +++ b/mix.exs @@ -30,7 +30,8 @@ defmodule Klife.MixProject do {:nimble_options, "~> 1.0"}, # Benchmarks and tests {:benchee, "~> 1.0", only: :dev, runtime: false}, - {:kafka_ex, "~> 0.11", only: :dev} + {:kafka_ex, "~> 0.13", only: :dev}, + {:brod, "~> 3.16", only: :dev} ] end end diff --git a/mix.lock b/mix.lock index ae76fed..b0aa8da 100644 --- a/mix.lock +++ b/mix.lock @@ -1,13 +1,16 @@ %{ "benchee": {:hex, :benchee, "1.1.0", "f3a43817209a92a1fade36ef36b86e1052627fd8934a8b937ac9ab3a76c43062", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}], "hexpm", "7da57d545003165a012b587077f6ba90b89210fd88074ce3c60ce239eb5e6d93"}, + "brod": {:hex, :brod, "3.16.5", "c1ef9264bdc8d72e8f5b82234f1ad7f05c82f769851ed85393376a0d4b430812", [:rebar3], [{:kafka_protocol, "4.1.0", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.8", [hex: :snappyer, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "fad2d9c644ccdc6cda25dd96cee78376261f053e28ca0403f787338f4b20cc40"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, - "crc32cer": {:hex, :crc32cer, "0.1.10", "fb87abbf34b72f180f8c3a908cd1826c6cb9a59787d156a29e05de9e98be385e", [:rebar3], [], "hexpm", "5b1f47efd0a1b4b7411f1f35e14d3c8c6da6e6a2a725ec8f2cf1ab13703e5f38"}, + "crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "kafka_ex": {:hex, :kafka_ex, "0.13.0", "2bfaf3c81d4ee01ed2088cb09e46c070c245f60f5752ec7043f29e807f6679ec", [:mix], [{:kayrock, "~> 0.1.12", [hex: :kayrock, repo: "hexpm", optional: false]}], "hexpm", "8a806eee5cd8191f45870b2ef4b3f4f52c57d798039f2d3fc602ce47053db7b9"}, + "kafka_protocol": {:hex, :kafka_protocol, "4.1.0", "53fac8866969484f783bff204bd4e41e62a97ce9753c83f802a08d5bfc0e0c4c", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "61cb8b80199bf95122cf8073e0f4c0ad62f82515b4d44c54f946a5972c3f5fa5"}, "kayrock": {:hex, :kayrock, "0.1.15", "61ce03b65dd2236479357ca4162f43fe3a42923b39fbb6551a16d57cf2b93072", [:mix], [{:connection, "~> 1.1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~> 0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~> 1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "61d7b3579db68e61c26f316b9246e0231b878148bb1887adc59fecedcbc46c12"}, "klife_protocol": {:hex, :klife_protocol, "0.2.1", "5acf278fddba27f079e4696d0bb93271e173052df3a0bda6544c1a6db9902daa", [:mix], [{:crc32cer, "~> 0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "~> 1.2.7", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "e4bc0053f1f0c66d41ea5007fe3c6bbffa1f3c6d6f3e48f9c4460b24ba453080"}, "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, - "snappyer": {:hex, :snappyer, "1.2.9", "9cc58470798648ce34c662ca0aa6daae31367667714c9a543384430a3586e5d3", [:rebar3], [], "hexpm", "18d00ca218ae613416e6eecafe1078db86342a66f86277bd45c95f05bf1c8b29"}, + "snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, + "supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"}, "varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"}, } diff --git a/test/producer/dispatcher_test.exs b/test/producer/dispatcher_test.exs index 02e4101..3abb89e 100644 --- a/test/producer/dispatcher_test.exs +++ b/test/producer/dispatcher_test.exs @@ -21,7 +21,7 @@ defmodule Klife.Producer.DispatcherTest do delivery_timeout_ms: 60000, enable_idempotence: true, linger_ms: 10_000, - max_inflight_requests: 2, + max_in_flight_requests: 2, max_retries: :infinity, producer_name: :my_batch_producer, request_timeout_ms: 15000, @@ -29,14 +29,23 @@ defmodule Klife.Producer.DispatcherTest do }, broker_id: 1002, current_batch: %{}, + current_waiting_pids: %{}, + current_estimated_size: 0, + current_base_time: nil, last_batch_sent_at: System.monotonic_time(:millisecond), - in_flight_queue: [nil, nil], - current_waiting_pids: %{} + in_flight_pool: [nil, nil], + next_send_msg_ref: nil, + batch_queue: :queue.new() } - assert {:noreply, new_state} = - Dispatcher.handle_call({:produce_sync, rec, "my_topic", 0}, self(), state) + assert {:reply, :ok, new_state} = + Dispatcher.handle_call( + {:produce_sync, rec, "my_topic", 0, 100}, + {self(), nil}, + state + ) + assert new_state.current_estimated_size == 100 assert [inserted_rec_1] = new_state.current_batch[{"my_topic", 0}].records assert %{ @@ -55,9 +64,14 @@ defmodule Klife.Producer.DispatcherTest do headers: [%{key: "header_key2", value: "header_value2"}] } - assert {:noreply, new_state} = - Dispatcher.handle_call({:produce_sync, rec, "my_topic", 0}, self(), new_state) + assert {:reply, :ok, new_state} = + Dispatcher.handle_call( + {:produce_sync, rec, "my_topic", 0, 200}, + {self(), nil}, + new_state + ) + assert new_state.current_estimated_size == 300 assert [inserted_rec_2, ^inserted_rec_1] = new_state.current_batch[{"my_topic", 0}].records assert %{ @@ -76,8 +90,14 @@ defmodule Klife.Producer.DispatcherTest do headers: [%{key: "header_key3", value: "header_value3"}] } - assert {:noreply, new_state} = - Dispatcher.handle_call({:produce_sync, rec, "my_topic", 1}, self(), new_state) + assert {:reply, :ok, new_state} = + Dispatcher.handle_call( + {:produce_sync, rec, "my_topic", 1, 300}, + {self(), nil}, + new_state + ) + + assert new_state.current_estimated_size == 600 assert [^inserted_rec_2, ^inserted_rec_1] = new_state.current_batch[{"my_topic", 0}].records @@ -99,8 +119,14 @@ defmodule Klife.Producer.DispatcherTest do headers: [%{key: "header_key4", value: "header_value4"}] } - assert {:noreply, new_state} = - Dispatcher.handle_call({:produce_sync, rec, "topic_b", 0}, self(), new_state) + assert {:reply, :ok, new_state} = + Dispatcher.handle_call( + {:produce_sync, rec, "topic_b", 0, 400}, + {self(), nil}, + new_state + ) + + assert new_state.current_estimated_size == 1000 assert [^inserted_rec_2, ^inserted_rec_1] = new_state.current_batch[{"my_topic", 0}].records