Skip to content

Commit

Permalink
chore: major api refactor, flaky tests fixes and overall stability
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 9, 2024
1 parent ce166fc commit bb80edb
Show file tree
Hide file tree
Showing 16 changed files with 689 additions and 477 deletions.
12 changes: 12 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ config :klife,
client_id: "my_custom_client_id",
max_in_flight_requests: 10
},
%{
name: :benchmark_producer_in_flight_linger,
client_id: "my_custom_client_id",
max_in_flight_requests: 10,
linger_ms: 1
},
%{
name: :test_batch_producer,
client_id: "my_custom_client_id",
Expand Down Expand Up @@ -66,6 +72,12 @@ config :klife,
num_partitions: 30,
replication_factor: 2
},
%{
name: "benchmark_topic_in_flight_linger",
producer: :benchmark_producer_in_flight_linger,
num_partitions: 30,
replication_factor: 2
},
%{
name: "test_batch_topic",
enable_produce: true,
Expand Down
62 changes: 27 additions & 35 deletions lib/klife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ defmodule Klife do
alias Klife.TxnProducerPool
alias Klife.Producer.Controller, as: PController

def produce(record_or_records, opts \\ [])

def produce(%Record{} = record, opts) do
case produce([record], opts) do
[resp] ->
resp

resp ->
resp
def produce(%Record{} = record, opts \\ []) do
case produce_batch([record], opts) do
[resp] -> resp
resp -> resp
end
end

def produce([%Record{} | _] = records, opts) do
def produce_batch([%Record{} | _] = records, opts \\ []) do
cluster = get_cluster(opts)

records =
Expand All @@ -29,38 +24,35 @@ defmodule Klife do
|> maybe_add_partition(cluster, opts)
end)

in_txn? = TxnProducerPool.in_txn?(cluster)
with_txn_opt = Keyword.get(opts, :with_txn, false)
if TxnProducerPool.in_txn?(cluster),
do: TxnProducerPool.produce(records, cluster, opts),
else: Producer.produce(records, cluster, opts)
end

cond do
in_txn? ->
TxnProducerPool.produce(records, cluster, opts)
def produce_batch_txn([%Record{} | _] = records, opts \\ []) do
transaction(fn -> records |> produce_batch(opts) |> verify_batch() end, opts)
end

with_txn_opt ->
transaction(
fn ->
resp = produce(records, opts)
def transaction(fun, opts \\ []) do
cluster = get_cluster(opts)
TxnProducerPool.run_txn(cluster, get_txn_pool(opts), fun)
end

# Do we really need this match?
if Enum.all?(resp, &match?({:ok, _}, &1)),
do: {:ok, resp},
else: {:error, :abort}
end,
opts
)
|> case do
{:ok, resp} -> resp
err -> err
end
def verify_batch(produce_resps) do
case Enum.group_by(produce_resps, &elem(&1, 0), &elem(&1, 1)) do
%{error: error_list} ->
{:error, error_list}

true ->
Producer.produce(records, cluster, opts)
%{ok: resp} ->
{:ok, resp}
end
end

def transaction(fun, opts \\ []) do
cluster = get_cluster(opts)
TxnProducerPool.run_txn(cluster, get_txn_pool(opts), fun)
def verify_batch!(produce_resps) do
case verify_batch(produce_resps) do
{:ok, resp} -> resp
{:error, errors} -> raise "Error on batch verification. #{inspect(errors)}"
end
end

defp default_cluster(), do: :persistent_term.get(:klife_default_cluster)
Expand Down
2 changes: 1 addition & 1 deletion lib/klife/connection/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ defmodule Klife.Connection.Broker do
Error while connecting to broker #{state.broker_id} on host #{state.url}. Reason: #{inspect(res)}
""")

:ok = Controller.trigger_brokers_verification(state.cluster_name)
:ok = Controller.trigger_brokers_verification_async(state.cluster_name)

Process.send_after(self(), :connect, get_reconnect_delay(state))

Expand Down
19 changes: 19 additions & 0 deletions lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ defmodule Klife.Connection.Controller do
end
end

@impl true
def handle_cast(:trigger_check_cluster, %__MODULE__{} = state) do
case state do
%__MODULE__{check_cluster_waiting_pids: []} ->
Process.cancel_timer(state.check_cluster_timer_ref)
new_ref = Process.send_after(self(), :check_cluster, 0)

{:noreply,
%__MODULE__{
state
| check_cluster_timer_ref: new_ref
}}
end
end

## PUBLIC INTERFACE

def insert_in_flight(cluster_name, correlation_id) do
Expand Down Expand Up @@ -224,6 +239,10 @@ defmodule Klife.Connection.Controller do
GenServer.call(via_tuple({__MODULE__, cluster_name}), :trigger_check_cluster)
end

def trigger_brokers_verification_async(cluster_name) do
GenServer.cast(via_tuple({__MODULE__, cluster_name}), :trigger_check_cluster)
end

def get_cluster_controller(cluster_name),
do: :persistent_term.get({:cluster_controller, cluster_name})

Expand Down
4 changes: 2 additions & 2 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ defmodule Klife.Producer.Controller do
)
|> case do
{:ok, _pid} -> :ok
{:error, {:already_started, pid}} -> send(pid, :handle_batchers)
{:error, {:already_started, pid}} -> send(pid, :handle_change)
end
end

Expand Down Expand Up @@ -175,7 +175,7 @@ defmodule Klife.Producer.Controller do
)
|> case do
{:ok, _pid} -> :ok
{:error, {:already_started, pid}} -> send(pid, :handle_batchers)
{:error, {:already_started, pid}} -> send(pid, :handle_change)
end
end

Expand Down
27 changes: 18 additions & 9 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ defmodule Klife.Producer do
cluster_name = Keyword.fetch!(validated_args, :cluster_name)

GenServer.start_link(__MODULE__, validated_args,
name: via_tuple({__MODULE__, cluster_name, producer_name})
name: via_tuple(get_process_name(cluster_name, producer_name))
)
end

defp get_process_name(cluster, producer), do: {__MODULE__, cluster, producer}

def init(validated_args) do
args_map = Map.new(validated_args)

Expand All @@ -61,7 +63,7 @@ defmodule Klife.Producer do
|> Map.merge(filtered_args)
|> set_producer_id()

:ok = do_handle_batchers(state)
:ok = handle_batchers(state)

{:ok, state}
end
Expand All @@ -78,9 +80,16 @@ defmodule Klife.Producer do
|> GenServer.call(:get_txn_pool_data)
end

def handle_info(:handle_batchers, %__MODULE__{} = state) do
:ok = do_handle_batchers(state)
{:noreply, state}
def get_pid(cluster_name, producer_name) do
get_process_name(cluster_name, producer_name)
|> registry_lookup()
|> List.first()
end

def handle_info(:handle_change, %__MODULE__{} = state) do
new_state = set_producer_id(state)
:ok = handle_batchers(new_state)
{:noreply, new_state}
end

def handle_call(:get_txn_pool_data, _from, %__MODULE__{txn_id: nil} = state) do
Expand Down Expand Up @@ -122,7 +131,7 @@ defmodule Klife.Producer do
end
end

defp set_producer_id(%__MODULE__{enable_idempotence: false}), do: nil
defp set_producer_id(%__MODULE__{enable_idempotence: false} = state), do: state

defp set_producer_id(%__MODULE__{enable_idempotence: true} = state) do
broker =
Expand Down Expand Up @@ -223,8 +232,8 @@ defmodule Klife.Producer do
{:ok, offset} ->
{:ok, %{rec | offset: offset}}

err ->
err
{:error, ec} ->
{:error, %{rec | error_code: ec}}
end
end)
else
Expand Down Expand Up @@ -262,7 +271,7 @@ defmodule Klife.Producer do
end
end

defp do_handle_batchers(%__MODULE__{} = state) do
defp handle_batchers(%__MODULE__{} = state) do
known_brokers = ConnController.get_known_brokers(state.cluster_name)
batchers_per_broker = state.batchers_count
:ok = init_batchers(state, known_brokers, batchers_per_broker)
Expand Down
Loading

0 comments on commit bb80edb

Please sign in to comment.