Skip to content

Commit

Permalink
Unify info messages from ered_client and ered_cluster
Browse files Browse the repository at this point in the history
Make the messages sent by a standalone ered_client match those sent
by a cluster client for each client, i.e. a map with the keys
msg_type, reason, client_id, addr and the optional fields
cluster_id and master.
  • Loading branch information
zuiderkwast committed Mar 20, 2024
1 parent 7d5dde3 commit f84f0e9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 79 deletions.
62 changes: 46 additions & 16 deletions src/ered_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,29 @@

-type host() :: ered_connection:host().
-type addr() :: {host(), inet:port_number()}.
-type node_id() :: binary() | undefined.
-type client_info() :: {pid(), addr(), node_id()}.
-type status() :: connection_up | {connection_down, down_reason()} | queue_ok | queue_full.
-type reason() :: term(). % ssl reasons are of type any so no point being more specific
-type down_reason() :: node_down_timeout | node_deactivated |
{client_stopped | connect_error | init_error | socket_closed,
reason()}.
-type info_msg() :: {connection_status, client_info(), status()}.
-type info_msg(MsgType, Reason) ::
#{msg_type := MsgType,
reason := Reason,
master => boolean(), % Optional. Added by ered_cluster.
addr := addr(),
client_id := pid(),
cluster_id => binary() % Optional. Used by ered_cluster.
}.
-type info_msg() ::
info_msg(connected, none) |
info_msg(socket_closed, any()) |
info_msg(connect_error, any()) |
info_msg(init_error, any()) |
info_msg(node_down_timeout, none) |
info_msg(node_deactivated, none) |
info_msg(queue_ok, none) |
info_msg(queue_full, none) |
info_msg(client_stopped, any()).
-type server_ref() :: pid().

-type opt() ::
Expand Down Expand Up @@ -413,13 +428,11 @@ reply_command({command, _, Fun}, Reply) ->
get_command_payload({command, Command, _Fun}) ->
Command.

-spec report_connection_status(status(), #st{}) -> #st{}.
report_connection_status(Status, State = #st{last_status = Status}) ->
State;
report_connection_status(Status, State) ->
#opts{host = Host, port = Port} = State#st.opts,
ClusterId = State#st.cluster_id,
Msg = {connection_status, {self(), {Host, Port}, ClusterId}, Status},
send_info(Msg, State),
send_info(Status, State),
case Status of
%% Skip saving the last_status in this to avoid an extra connect_error event.
%% The usual case is that there is a connect_error and then node_down and then
Expand All @@ -431,15 +444,32 @@ report_connection_status(Status, State) ->
end.


-spec send_info(info_msg(), #st{}) -> ok.
send_info(Msg, State) ->
Pid = State#st.opts#opts.info_pid,
case Pid of
none ->
ok;
_ ->
Pid ! Msg
end,
-spec send_info(status(), #st{}) -> ok.
send_info(Status, #st{opts = #opts{info_pid = Pid,
host = Host,
port = Port},
cluster_id = ClusterId}) when is_pid(Pid) ->
{MsgType, Reason} =
case Status of
connection_up -> {connected, none};
{connection_down, R} when is_atom(R) -> {R, none};
{connection_down, R} -> R;
queue_full -> {queue_full, none};
queue_ok -> {queue_ok, none}
end,
Msg0 = #{msg_type => MsgType,
reason => Reason,
addr => {Host, Port},
client_id => self()},
Msg = case ClusterId of
undefined ->
Msg0;
Id when is_binary(Id) ->
Msg0#{cluster_id => ClusterId}
end,
Pid ! Msg,
ok;
send_info(_Msg, _State) ->
ok.


Expand Down
16 changes: 10 additions & 6 deletions src/ered_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,12 @@ handle_info({command_try_again, Command, Slot, From, AttemptsLeft}, State) ->
State1 = send_command_to_slot(Command, Slot, From, State, AttemptsLeft),
{noreply, State1};

handle_info(Msg = {connection_status, {_Pid, Addr, _Id}, Status}, State) ->
handle_info(Msg = #{msg_type := MsgType, client_id := _Pid, addr := Addr}, State) ->
IsMaster = sets:is_element(Addr, State#st.masters),
ered_info_msg:connection_status(Msg, IsMaster, State#st.info_pid),
State1 = case Status of
{connection_down, {Reason, _}} when Reason =:= socket_closed;
Reason =:= connect_error ->
State1 = case MsgType of
_ when MsgType =:= socket_closed;
MsgType =:= connect_error ->
%% Avoid triggering the alarm for a socket closed by the
%% peer. The cluster will be marked down on the node down
%% timeout.
Expand All @@ -321,11 +321,15 @@ handle_info(Msg = {connection_status, {_Pid, Addr, _Id}, Status}, State) ->
false ->
NewState
end;
{connection_down,_} ->
_ when MsgType =:= node_down_timeout;
MsgType =:= node_deactivated;
MsgType =:= init_error;
MsgType =:= client_stopped ->
%% Client is down.
State#st{up = sets:del_element(Addr, State#st.up),
pending = sets:del_element(Addr, State#st.pending),
reconnecting = sets:del_element(Addr, State#st.reconnecting)};
connection_up ->
connected ->
State#st{up = sets:add_element(Addr, State#st.up),
pending = sets:del_element(Addr, State#st.pending),
reconnecting = sets:del_element(Addr, State#st.reconnecting)};
Expand Down
44 changes: 3 additions & 41 deletions src/ered_info_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,8 @@
%%% Definitions
%%%===================================================================

-type node_info(MsgType, Reason) ::
#{msg_type := MsgType,
reason := Reason,
master := boolean(),
addr := addr(),
client_id := pid(),
node_id := string()
}.

-type info_msg() ::
node_info(connected, none) |

node_info(socket_closed, any()) |

node_info(connect_error, any()) |

node_info(init_error, any()) |

node_info(node_down_timeout, none) |

node_info(queue_ok, none) |

node_info(queue_full, none) |

node_info(client_stopped, any()) |
ered_client:info_msg() |

#{msg_type := slot_map_updated,
slot_map := ClusterSlotsReply :: any(),
Expand Down Expand Up @@ -73,23 +50,8 @@
%% Client connection goes up or down.
%% Client queue full or queue recovered to OK level.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
connection_status(ClientInfo, IsMaster, Pids) ->
{connection_status, {Pid, Addr, Id} , Status} = ClientInfo,
{MsgType, Reason} =
case Status of
connection_up -> {connected, none};
{connection_down, R} when is_atom(R) -> {R, none};
{connection_down, R} -> R;
queue_full -> {queue_full, none};
queue_ok -> {queue_ok, none}
end,
send_info(#{msg_type => MsgType,
reason => Reason,
master => IsMaster,
addr => Addr,
client_id => Pid,
cluster_id => Id},
Pids).
connection_status(Msg, IsMaster, Pids) ->
send_info(Msg#{master => IsMaster}, Pids).

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec slot_map_updated(ered_lib:slot_map(), non_neg_integer(), addr(), [pid()]) -> ok.
Expand Down
33 changes: 17 additions & 16 deletions test/ered_client_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fail_parse_t() ->
end),
expect_connection_up(Client),
Reason = {recv_exit, {parse_error,{invalid_data,<<"&pong">>}}},
receive {connection_status, _ClientInfo, {connection_down, {socket_closed, Reason}}} -> ok end,
receive #{msg_type := socket_closed, reason := Reason} -> ok end,
expect_connection_up(Client),
{ok, <<"pong">>} = get_msg().

Expand All @@ -84,7 +84,7 @@ server_close_socket_t() ->
end),
Client = start_client(Port),
expect_connection_up(Client),
receive {connection_status, _ClientInfo, {connection_down, {socket_closed, {recv_exit, closed}}}} -> ok end,
receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end,
expect_connection_up(Client).


Expand Down Expand Up @@ -131,9 +131,9 @@ server_buffer_full_t() ->

Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive {connection_status, _, queue_full} -> ok end,
receive #{msg_type := queue_full} -> ok end,
{6, {error, queue_overflow}} = get_msg(),
receive {connection_status, _, queue_ok} -> ok end,
receive #{msg_type := queue_ok} -> ok end,
[{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]],
no_more_msgs().

Expand Down Expand Up @@ -168,14 +168,14 @@ server_buffer_full_reconnect_t() ->
Pid = self(),
%% 5 messages will be pending, 5 messages in queue
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive {connection_status, _ClientInfo1, queue_full} -> ok end,
receive #{msg_type := queue_full} -> ok end,
%% 1 message over the limit, first one in queue gets kicked out
{6, {error, queue_overflow}} = get_msg(),
receive {connection_status, _ClientInfo2, {connection_down, {socket_closed, {recv_exit, closed}}}} -> ok end,
receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end,
%% when connection goes down the pending messages will be put in the queue and the queue
%% will overflow kicking out the oldest first
[{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]],
receive {connection_status, _ClientInfo3, queue_ok} -> ok end,
receive #{msg_type := queue_ok} -> ok end,
expect_connection_up(Client),
[{N, {ok, <<"pong">>}} = get_msg() || N <- [7,8,9,10,11]],
no_more_msgs().
Expand All @@ -199,13 +199,13 @@ server_buffer_full_node_goes_down_t() ->

Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive {connection_status, _ClientInfo1, queue_full} -> ok end,
receive #{msg_type := queue_full} -> ok end,
{6, {error, queue_overflow}} = get_msg(),
receive {connection_status, _ClientInfo2, {connection_down, {socket_closed, {recv_exit, closed}}}} -> ok end,
receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end,
[{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]],
receive {connection_status, _ClientInfo3, queue_ok} -> ok end,
receive {connection_status, _ClientInfo4, {connection_down, {connect_error,econnrefused}}} -> ok end,
receive {connection_status, _ClientInfo5, {connection_down, node_down_timeout}} -> ok end,
receive #{msg_type := queue_ok} -> ok end,
receive #{msg_type := connect_error, reason := econnrefused} -> ok end,
receive #{msg_type := node_down_timeout} -> ok end,
[{N, {error, node_down}} = get_msg() || N <- [7,8,9,10,11]],

%% additional commands should get a node down
Expand Down Expand Up @@ -241,7 +241,7 @@ send_timeout_t() ->
Pid = self(),
ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end),
%% this should come after max 1000ms
receive {connection_status, _ClientInfo, {connection_down, {socket_closed, {recv_exit, timeout}}}} -> ok after 2000 -> timeout_error() end,
receive #{msg_type := socket_closed, reason := {recv_exit, timeout}} -> ok after 2000 -> timeout_error() end,
expect_connection_up(Client),
{reply, {ok, <<"pong">>}} = get_msg(),
no_more_msgs().
Expand Down Expand Up @@ -373,16 +373,17 @@ expect_connection_up(Client) ->
expect_connection_up(Client, infinity).

expect_connection_up(Client, Timeout) ->
{connection_status, {Client, _Addr, _undefined}, connection_up} =
#{msg_type := connected, client_id := Client} =
get_msg(Timeout).

expect_connection_down(Client) ->
expect_connection_down(Client, infinity).

expect_connection_down(Client, Timeout) ->
{connection_status, {Client, _Addr, _undefined}, {connection_down, Reason}} =
#{msg_type := MsgType, reason := Reason, client_id := Client} =
get_msg(Timeout),
Reason.
if Reason =/= none -> ok end,
{MsgType, Reason}.

get_msg() ->
get_msg(infinity).
Expand Down

0 comments on commit f84f0e9

Please sign in to comment.