Skip to content

Commit

Permalink
chore(connection): add some docs and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Mar 29, 2024
1 parent 95cbc90 commit 9e18b1b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 24 deletions.
82 changes: 58 additions & 24 deletions lib/klife/connection/broker.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
defmodule Klife.Connection.Broker do
@moduledoc """
Represents a connection to a specific broker
Responsible for housekeeping the connection and
forward responses to waiting pids
"""
use GenServer

import Klife.ProcessRegistry
Expand Down Expand Up @@ -135,9 +142,14 @@ defmodule Klife.Connection.Broker do
broker_id = get_broker_id(broker_id, cluster_name)
conn = get_connection(cluster_name, broker_id)

if is_function(callback) or match?({_m, _f, _a}, callback) do
true = Controller.insert_in_flight(cluster_name, correlation_id, callback)
end
in_flight_data =
cond do
is_function(callback) -> callback
match?({_m, _f, _a}, callback) -> callback
true -> :noop
end

true = Controller.insert_in_flight(cluster_name, correlation_id, in_flight_data)

case Connection.write(raw_data, conn) do
:ok ->
Expand All @@ -152,17 +164,38 @@ defmodule Klife.Connection.Broker do
end
end

def reply_message(<<correlation_id::32-signed, _rest::binary>> = reply, cluster_name, conn) do
defp reply_message(<<correlation_id::32-signed, _rest::binary>> = reply, cluster_name, conn) do
case Controller.take_from_in_flight(cluster_name, correlation_id) do
# sync send
{^correlation_id, waiting_pid} when is_pid(waiting_pid) ->
Process.send(waiting_pid, {:broker_response, reply}, [])

# async send function callback
{^correlation_id, callback} when is_function(callback) ->
Task.Supervisor.start_child(
via_tuple({Klife.Connection.CallbackSupervisor, cluster_name}),
fn -> callback.(reply) end
)

# async send mfa callback
{^correlation_id, {mod, fun, args}} ->
Task.Supervisor.start_child(
via_tuple({Klife.Connection.CallbackSupervisor, cluster_name}),
mod,
fun,
[reply, args]
)

# async send with no callback
{^correlation_id, :noop} ->
:noop

nil ->
# TODO: HOW TO HANDLE THIS?
# There are 2 possibilities to this case
#
# 1 - An async message without callback was sent which does not populate the inflight table
# 2 - A sync message was sent but the caller gave up waiting the response
# A sync message was sent but the caller gave up waiting the response
#
# The only problematic case is the second one since the caller will assume
# that the message was not delivered and may send it again.
# The caller will assume that the message was not delivered and may send it again.
#
# Dependeing on the message being sent and the idempotency configuration
# this may not be a problem.
Expand All @@ -174,29 +207,30 @@ defmodule Klife.Connection.Broker do
# wont give up in the middle of a request. The rule is:
# now + req_timeout - base_time < delivery_timeout - :timer.seconds(5)
#
nil
Logger.warning("""
Unkown correlation id received from cluster #{inspect(cluster_name)}.
{^correlation_id, waiting_pid} when is_pid(waiting_pid) ->
Process.send(waiting_pid, {:broker_response, reply}, [])
correlation_id: #{inspect(correlation_id)}
{^correlation_id, callback} when is_function(callback) ->
Task.Supervisor.start_child(
via_tuple({Klife.Connection.CallbackSupervisor, cluster_name}),
fn -> callback.(reply) end
)
conn: #{inspect(conn)}
""")

{^correlation_id, {mod, fun, args}} ->
Task.Supervisor.start_child(
via_tuple({Klife.Connection.CallbackSupervisor, cluster_name}),
mod,
fun,
[reply, args]
)
nil
end

Connection.set_opts(conn, active: :once)
end

defp reply_message(_, cluster_name, conn) do
Logger.warning("""
Unkown message received from cluster #{inspect(cluster_name)}.
conn: #{inspect(conn)}
""")

:ok
end

defp get_reconnect_delay(%__MODULE__{reconnect_attempts: attempts}) do
max_idx = length(@reconnect_delays_seconds) - 1
base_delay = Enum.at(@reconnect_delays_seconds, min(attempts, max_idx))
Expand Down
9 changes: 9 additions & 0 deletions lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
defmodule Klife.Connection.Controller do
@moduledoc """
Controller of the connection system.
Responsible for starting broker connections, housekeeping
common resources that are not broker specific such as
in flight message ets, correlation id counter and
cluster controller.
"""
use GenServer

import Klife.ProcessRegistry
Expand Down

0 comments on commit 9e18b1b

Please sign in to comment.