Skip to content

Commit

Permalink
feat(connection): use pubsub to broadcast cluster changes
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed May 18, 2024
1 parent 5732688 commit 986f0bd
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 61 deletions.
1 change: 1 addition & 0 deletions lib/klife/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Klife.Application do

children = [
Klife.ProcessRegistry,
Klife.PubSub,
handle_clusters()
]

Expand Down
6 changes: 4 additions & 2 deletions lib/klife/connection/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ defmodule Klife.Connection.Broker do
# Must revisit this later.
#
# For the producer case the mechanism being used to avoid this is to only
# retry a delivery if there is a safe time where the producing process
# retry a delivery if there is enough time where the producing process
# wont give up in the middle of a request. The rule is:
# now + req_timeout - base_time < delivery_timeout - :timer.seconds(5)
# now + req_timeout - base_time < delivery_timeout - :timer.seconds(2)
#
Logger.warning("""
Unkown correlation id received from cluster #{inspect(cluster_name)}.
Expand Down Expand Up @@ -260,6 +260,8 @@ defmodule Klife.Connection.Broker do
Error while connecting to broker #{state.broker_id} on host #{state.url}. Reason: #{inspect(res)}
""")

:ok = Controller.trigger_brokers_verification(state.cluster_name)

Process.send_after(self(), :connect, get_reconnect_delay(state))

%__MODULE__{state | reconnect_attempts: state.reconnect_attempts + 1}
Expand Down
42 changes: 25 additions & 17 deletions lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defmodule Klife.Connection.Controller do

import Klife.ProcessRegistry

alias Klife.PubSub

alias KlifeProtocol.Messages

alias Klife.Connection
Expand All @@ -24,7 +26,7 @@ defmodule Klife.Connection.Controller do
# in order to avoid reaching this limit.
@max_correlation_counter 200_000_000
@check_correlation_counter_delay :timer.seconds(300)
@check_cluster_delay :timer.seconds(30)
@check_cluster_delay :timer.seconds(10)

defstruct [
:bootstrap_servers,
Expand Down Expand Up @@ -73,7 +75,7 @@ defmodule Klife.Connection.Controller do
def handle_info(:init_bootstrap_conn, %__MODULE__{} = state) do
conn = connect_bootstrap_server(state.bootstrap_servers, state.socket_opts)
negotiate_api_versions(conn, state.cluster_name)
new_ref = Process.send(self(), :check_cluster, [])
new_ref = Process.send_after(self(), :check_cluster, 0)
{:noreply, %__MODULE__{state | bootstrap_conn: conn, check_cluster_timer_ref: new_ref}}
end

Expand All @@ -86,7 +88,7 @@ defmodule Klife.Connection.Controller do
to_remove = old_brokers -- new_brokers_list
to_start = new_brokers_list -- old_brokers

Process.send(self(), {:handle_brokers, to_start, to_remove}, [])
send(self(), {:handle_brokers, to_start, to_remove})
next_ref = Process.send_after(self(), :check_cluster, @check_cluster_delay)

{:noreply,
Expand Down Expand Up @@ -134,6 +136,13 @@ defmodule Klife.Connection.Controller do

:persistent_term.put({:known_brokers_ids, state.cluster_name}, new_brokers)

if to_start != [] or to_remove != [] do
PubSub.publish({:cluster_change, state.cluster_name}, %{
added_brokers: to_start,
removed_brokers: to_remove
})
end

state.check_cluster_waiting_pids
|> Enum.reverse()
|> Enum.each(&GenServer.reply(&1, :ok))
Expand Down Expand Up @@ -219,24 +228,23 @@ defmodule Klife.Connection.Controller do
do: :persistent_term.get({:known_brokers_ids, cluster_name})

def get_cluster_info(%Connection{} = conn) do
%{
req = %{
headers: %{correlation_id: 0},
content: %{include_cluster_authorized_operations: true, topics: []}
}
|> Messages.Metadata.serialize_request(1)
|> Connection.write(conn)
|> case do
:ok ->
{:ok, received_data} = Connection.read(conn)

{:ok, %{content: resp}} = Messages.Metadata.deserialize_response(received_data, 1)

{:ok,
%{
brokers: Enum.map(resp.brokers, fn b -> {b.node_id, "#{b.host}:#{b.port}"} end),
controller: resp.controller_id
}}

serialized_req = Messages.Metadata.serialize_request(req, 1)

with :ok <- Connection.write(serialized_req, conn),
{:ok, received_data} <- Connection.read(conn) do
{:ok, %{content: resp}} = Messages.Metadata.deserialize_response(received_data, 1)

{:ok,
%{
brokers: Enum.map(resp.brokers, fn b -> {b.node_id, "#{b.host}:#{b.port}"} end),
controller: resp.controller_id
}}
else
{:error, _reason} = res ->
res
end
Expand Down
60 changes: 22 additions & 38 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Klife.Producer.Controller do

import Klife.ProcessRegistry

alias Klife.PubSub

alias KlifeProtocol.Messages
alias Klife.Connection.Broker
alias Klife.Connection.Controller, as: ConnController
Expand Down Expand Up @@ -58,10 +60,26 @@ defmodule Klife.Producer.Controller do

Utils.wait_connection!(cluster_name)

:ok = PubSub.subscribe({:cluster_change, cluster_name})

{:ok, state}
end

def handle_info(:init_producers, %__MODULE__{} = state) do
def handle_info(
{{:cluster_change, cluster_name}, _event_data, _callback_data},
%__MODULE__{cluster_name: cluster_name} = state
) do
Process.cancel_timer(state.check_metadata_timer_ref)
new_ref = Process.send_after(self(), :check_metadata, 0)

{:noreply,
%__MODULE__{
state
| check_metadata_timer_ref: new_ref
}}
end

def handle_info(:handle_producers, %__MODULE__{} = state) do
for producer <- state.producers do
opts =
producer
Expand All @@ -76,7 +94,7 @@ defmodule Klife.Producer.Controller do

case result do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, {:already_started, pid}} -> send(pid, :handle_batchers)
end
end

Expand Down Expand Up @@ -111,6 +129,7 @@ defmodule Klife.Producer.Controller do
{topic.name, partition.partition_index},
partition.leader_id,
config_topic[:producer] || @default_producer.name,
# batcher_id will be defined on producer
nil
})

Expand All @@ -132,7 +151,7 @@ defmodule Klife.Producer.Controller do
end

if Enum.any?(results, &(&1 == :new)) do
send(self(), :init_producers)
send(self(), :handle_producers)
end

if Enum.any?(resp.topics, &(&1.error_code != 0)) do
Expand All @@ -151,41 +170,6 @@ defmodule Klife.Producer.Controller do
end
end

@impl true
def handle_call(:trigger_check_metadata, from, %__MODULE__{} = state) do
case state do
%__MODULE__{check_metadata_waiting_pids: []} ->
Process.cancel_timer(state.check_metadata_timer_ref)
new_ref = Process.send_after(self(), :check_cluster, 0)

{:noreply,
%__MODULE__{
state
| check_metadata_waiting_pids: [from],
check_metadata_timer_ref: new_ref
}}

%__MODULE__{} ->
{:noreply,
%__MODULE__{
state
| check_metadata_waiting_pids: [from | state.check_metadata_timer_ref]
}}
end
end

@impl true
def handle_cast(:trigger_check_metadata, %__MODULE__{} = state) do
Process.cancel_timer(state.check_metadata_timer_ref)
new_ref = Process.send_after(self(), :check_cluster, 0)

{:noreply,
%__MODULE__{
state
| check_metadata_timer_ref: new_ref
}}
end

# Public Interface

def trigger_metadata_verification_sync(cluster_name) do
Expand Down
13 changes: 9 additions & 4 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ defmodule Klife.Producer do
filtered_args = Map.take(args_map, Map.keys(base))
state = Map.merge(base, filtered_args)

:ok = init_batchers(state)
send(self(), :handle_batchers)

{:ok, state}
end

def handle_info(:handle_batchers, %__MODULE__{} = state) do
:ok = handle_batchers(state)
{:noreply, state}
end

def produce_sync(record, topic, partition, cluster_name, opts \\ []) do
%{
broker_id: broker_id,
Expand Down Expand Up @@ -94,16 +99,16 @@ defmodule Klife.Producer do
end
end

defp init_batchers(%__MODULE__{} = state) do
defp handle_batchers(%__MODULE__{} = state) do
known_brokers = ConnController.get_known_brokers(state.cluster_name)
batchers_per_broker = state.batchers_count
:ok = do_init_batchers(state, known_brokers, batchers_per_broker)
:ok = init_batchers(state, known_brokers, batchers_per_broker)
:ok = update_topic_partition_metadata(state, batchers_per_broker)

:ok
end

defp do_init_batchers(state, known_brokers, batchers_per_broker) do
defp init_batchers(state, known_brokers, batchers_per_broker) do
for broker_id <- known_brokers,
batcher_id <- 0..(batchers_per_broker - 1) do
result =
Expand Down
34 changes: 34 additions & 0 deletions lib/klife/pubsub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Klife.PubSub do
def start_link() do
Registry.start_link(
name: __MODULE__,
keys: :duplicate,
partitions: System.schedulers_online()
)
end

def child_spec(_) do
Supervisor.child_spec(
Registry,
id: __MODULE__,
start: {__MODULE__, :start_link, []}
)
end

def subscribe(event, callback_data \\ nil) do
case Registry.register(__MODULE__, event, callback_data) do
{:ok, _} -> :ok
{:error, {:already_registered, _}} -> :ok
end
end

def unsubscribe(event) do
Registry.unregister(__MODULE__, event)
end

def publish(event, event_data) do
Registry.dispatch(__MODULE__, event, fn pids ->
Enum.each(pids, fn {pid, callback_data} -> send(pid, {event, event_data, callback_data}) end)
end)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Klife.MixProject do
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(:dev), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]

# Run "mix help deps" to learn about dependencies.
Expand Down
25 changes: 25 additions & 0 deletions test/connection/system_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Klife.Connection.SystemTest do
use ExUnit.Case
alias Klife.Utils
alias Klife.PubSub
alias Klife.TestUtils
alias Klife.Connection.Broker
alias Klife.Connection.MessageVersions, as: MV
alias KlifeProtocol.Messages.ApiVersions
Expand Down Expand Up @@ -117,4 +119,27 @@ defmodule Klife.Connection.SystemTest do
Enum.each(brokers_list_2, &check_broker_connection(cluster_name_2, &1))
Enum.each(brokers_list_3, &check_broker_connection(cluster_name_3, &1))
end

test "cluster changes events" do
cluster_name = :my_test_cluster_1
brokers = :persistent_term.get({:known_brokers_ids, cluster_name})
broker_id_to_remove = List.first(brokers)
cb_ref = make_ref()

:ok = PubSub.subscribe({:cluster_change, cluster_name}, %{some_data: cb_ref})

{:ok, service_name} = TestUtils.stop_broker(cluster_name, broker_id_to_remove)

assert_received({{:cluster_change, ^cluster_name}, event_data, %{some_data: ^cb_ref}})
assert broker_id_to_remove in Enum.map(event_data.removed_brokers, fn {b, _h} -> b end)
assert broker_id_to_remove not in :persistent_term.get({:known_brokers_ids, cluster_name})

{:ok, broker_id} = TestUtils.start_broker(service_name, cluster_name)

assert_received({{:cluster_change, ^cluster_name}, event_data, %{some_data: ^cb_ref}})
assert broker_id in Enum.map(event_data.added_brokers, fn {b, _h} -> b end)
assert broker_id in :persistent_term.get({:known_brokers_ids, cluster_name})

:ok = PubSub.unsubscribe({:cluster_change, cluster_name})
end
end
Loading

0 comments on commit 986f0bd

Please sign in to comment.