Skip to content

Commit

Permalink
chore: enhance producer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Apr 1, 2024
1 parent 03862bd commit b16b477
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 22 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ config :klife,
%{
name: :my_batch_producer,
client_id: "my_custom_client_id",
linger_ms: 100
linger_ms: 1_000
},
%{
name: :benchmark_producer,
Expand Down
3 changes: 2 additions & 1 deletion lib/klife/connection/message_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ defmodule Klife.Connection.MessageVersions do
{M.CreateTopics, %{min: 0, max: 0, should_raise?: false}},
{M.Metadata, %{min: 1, max: 1, should_raise?: true}},
{M.Produce, %{min: 0, max: 0, should_raise?: false}},
{M.InitProducerId, %{min: 0, max: 0, should_raise?: false}}
{M.InitProducerId, %{min: 0, max: 0, should_raise?: false}},
{M.Fetch, %{min: 4, max: 4, should_raise?: true}}
]
end

Expand Down
3 changes: 2 additions & 1 deletion lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ defmodule Klife.Producer.Dispatcher do
|> Enum.each(fn {topic, partition, 0, base_offset} ->
delivery_confirmation_pids
|> Map.get({topic, partition}, [])
|> Enum.reverse()
|> Enum.each(fn {pid, batch_offset} ->
send(pid, {:klife_produce_sync, :ok, base_offset + batch_offset})
end)
Expand Down Expand Up @@ -487,7 +488,7 @@ defmodule Klife.Producer.Dispatcher do
Enum.map(partitions_list, fn {partition, batch} ->
%{
index: partition,
records: Map.replace(batch, :records, Enum.reverse(batch.records))
records: Map.replace!(batch, :records, Enum.reverse(batch.records))
}
end)
}
Expand Down
78 changes: 78 additions & 0 deletions lib/klife/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,82 @@ defmodule Klife.Utils do
:error
end
end

def get_record_by_offset(cluster_name, topic, partition, offset) do
content = %{
replica_id: -1,
max_wait_ms: 1000,
min_bytes: 1,
max_bytes: 100_000,
isolation_level: 0,
topics: [
%{
topic: topic,
partitions: [
%{
partition: partition,
fetch_offset: offset,
# 1 guarantees that only the first record batch will
# be retrieved
partition_max_bytes: 1
}
]
}
]
}

broker = Klife.Producer.Controller.get_broker_id(cluster_name, topic, partition)

{:ok, %{content: content}} =
Klife.Connection.Broker.send_message(
KlifeProtocol.Messages.Fetch,
cluster_name,
broker,
content
)

topic_resp = Enum.find(content.responses, &(&1.topic == topic))
partition_resp = Enum.find(topic_resp.partitions, &(&1.partition_index == partition))
[%{base_offset: base_offset, records: records}] = partition_resp.records
Enum.find(records, &(&1.offset_delta + base_offset == offset))
end

def get_record_batch_by_offset(cluster_name, topic, partition, offset) do
content = %{
replica_id: -1,
max_wait_ms: 1000,
min_bytes: 1,
max_bytes: 100_000,
isolation_level: 0,
topics: [
%{
topic: topic,
partitions: [
%{
partition: partition,
fetch_offset: offset,
# 1 guarantees that only the first record batch will
# be retrieved
partition_max_bytes: 1
}
]
}
]
}

broker = Klife.Producer.Controller.get_broker_id(cluster_name, topic, partition)

{:ok, %{content: content}} =
Klife.Connection.Broker.send_message(
KlifeProtocol.Messages.Fetch,
cluster_name,
broker,
content
)

topic_resp = Enum.find(content.responses, &(&1.topic == topic))
partition_resp = Enum.find(topic_resp.partitions, &(&1.partition_index == partition))
[%{records: records}] = partition_resp.records
records
end
end
2 changes: 1 addition & 1 deletion test/connection/system_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Klife.Connection.SystemTest do
alias Klife.Connection.Broker
alias KlifeProtocol.Messages.ApiVersions

def check_broker_connection(cluster_name, broker_id) do
defp check_broker_connection(cluster_name, broker_id) do
parent = self()

assert {:ok, response} = Broker.send_message(ApiVersions, cluster_name, broker_id)
Expand Down
60 changes: 42 additions & 18 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,37 @@ defmodule Klife.ProducerTest do
use ExUnit.Case

alias Klife.Producer
alias Klife.Utils

defp assert_offset(expected_record, cluster, topic, partition, offset) do
stored_record = Utils.get_record_by_offset(cluster, topic, partition, offset)

Enum.each(expected_record, fn {k, v} ->
assert v == Map.get(stored_record, k)
end)
end

test "produce message sync no batch" do
record = %{
value: "abc",
key: "some_key",
headers: [%{key: "header_1", value: "header_val_1"}]
value: :rand.bytes(1_000),
key: :rand.bytes(1_000),
headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}]
}

topic = "my_no_batch_topic"

assert {:ok, offset} = Producer.produce_sync(record, topic, 1, :my_test_cluster_1)

assert is_number(offset)
assert_offset(record, :my_test_cluster_1, topic, 1, offset)
record_batch = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset)
assert length(record_batch) == 1
end

test "produce message sync using not default producer" do
record = %{
value: "abc",
key: "some_key",
headers: [%{key: "header_1", value: "header_val_1"}]
value: :rand.bytes(1_000),
key: :rand.bytes(1_000),
headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}]
}

topic = "my_no_batch_topic"
Expand All @@ -31,16 +42,18 @@ defmodule Klife.ProducerTest do
producer: :benchmark_producer
)

assert is_number(offset)
assert_offset(record, :my_test_cluster_1, topic, 1, offset)
record_batch = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset)
assert length(record_batch) == 1
end

test "produce message sync with batch" do

Check failure on line 50 in test/producer/producer_test.exs

View workflow job for this annotation

GitHub Actions / test

test produce message sync with batch (Klife.ProducerTest)
topic = "my_batch_topic"

rec_1 = %{
value: "some_val_1",
key: "some_key_1",
headers: [%{key: "header_1", value: "header_val_1"}]
value: :rand.bytes(1_000),
key: :rand.bytes(1_000),
headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}]
}

task_1 =
Expand All @@ -49,9 +62,9 @@ defmodule Klife.ProducerTest do
end)

rec_2 = %{
value: "some_val_2",
key: "some_key_2",
headers: [%{key: "header_2", value: "header_val_2"}]
value: :rand.bytes(1_000),
key: :rand.bytes(1_000),
headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}]
}

Process.sleep(5)
Expand All @@ -62,9 +75,9 @@ defmodule Klife.ProducerTest do
end)

rec_3 = %{
value: "some_val_3",
key: "some_key_3",
headers: [%{key: "header_3", value: "header_val_3"}]
value: :rand.bytes(1_000),
key: :rand.bytes(1_000),
headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}]
}

Process.sleep(5)
Expand All @@ -75,9 +88,20 @@ defmodule Klife.ProducerTest do
end)

assert [{:ok, offset_1}, {:ok, offset_2}, {:ok, offset_3}] =
Task.await_many([task_1, task_2, task_3], 1_000)
Task.await_many([task_1, task_2, task_3], 2_000)

assert offset_2 - offset_1 == 1
assert offset_3 - offset_2 == 1

assert_offset(rec_1, :my_test_cluster_1, topic, 1, offset_1)
assert_offset(rec_2, :my_test_cluster_1, topic, 1, offset_2)
assert_offset(rec_3, :my_test_cluster_1, topic, 1, offset_3)

batch_1 = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset_1)
batch_2 = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset_2)
batch_3 = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset_3)

assert length(batch_1) == 3
assert batch_1 == batch_2 and batch_2 == batch_3
end
end

0 comments on commit b16b477

Please sign in to comment.