Skip to content

Commit 60b074a

Browse files
authored
Merge pull request #12801 from rabbitmq/make-peer-disc-nodes-querying-more-resiliant
rabbit_peer_discovery: Retry RPC calls
2 parents 1fa4fe2 + f6314d0 commit 60b074a

File tree

1 file changed

+109
-149
lines changed

1 file changed

+109
-149
lines changed

deps/rabbit/src/rabbit_peer_discovery.erl

Lines changed: 109 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
normalize/1,
2727
append_node_prefix/1,
2828
node_prefix/0]).
29-
-export([do_query_node_props/1,
30-
group_leader_proxy/2]).
29+
-export([do_query_node_props/2]).
3130

3231
-ifdef(TEST).
3332
-export([query_node_props/1,
@@ -378,7 +377,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
378377
%% @private
379378

380379
query_node_props(Nodes) when Nodes =/= [] ->
381-
{Prefix, Suffix} = rabbit_nodes_common:parts(node()),
380+
ThisNode = node(),
381+
{Prefix, Suffix} = rabbit_nodes_common:parts(ThisNode),
382382
PeerName = peer:random_name(Prefix),
383383
%% We go through a temporary hidden node to query all other discovered
384384
%% peers properties, instead of querying them directly.
@@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] ->
440440
[Peer],
441441
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
442442
try
443-
peer:call(Pid, ?MODULE, do_query_node_props, [Nodes], 180000)
443+
NodesAndProps1 = peer:call(
444+
Pid,
445+
?MODULE, do_query_node_props,
446+
[Nodes, ThisNode], 180000),
447+
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
448+
NodesAndProps2
444449
after
445450
peer:stop(Pid)
446451
end;
@@ -563,80 +568,31 @@ maybe_add_tls_arguments(VMArgs) ->
563568
end,
564569
VMArgs2.
565570

566-
do_query_node_props(Nodes) when Nodes =/= [] ->
571+
do_query_node_props(Nodes, FromNode) when Nodes =/= [] ->
567572
%% Make sure all log messages are forwarded from this temporary hidden
568573
%% node to the upstream node, regardless of their level.
569574
_ = logger:set_primary_config(level, debug),
570575

571-
%% The group leader for all processes on this temporary hidden node is the
572-
%% calling process' group leader on the upstream node.
573-
%%
574-
%% When we use `erpc:call/4' (or the multicall equivalent) to execute code
575-
%% on one of the `Nodes', the remotely executed code will also use the
576-
%% calling process' group leader by default.
577-
%%
578-
%% We use this temporary hidden node to ensure the downstream node will
579-
%% not connected to the upstream node. Therefore, we must change the group
580-
%% leader as well, otherwise any I/O from the downstream node will send a
581-
%% message to the upstream node's group leader and thus open a connection.
582-
%% This would defeat the entire purpose of this temporary hidden node.
583-
%%
584-
%% To avoid this, we start a proxy process which we will use as a group
585-
%% leader. This process will send all messages it receives to the group
586-
%% leader on the upstream node.
587-
%%
588-
%% There is one caveat: the logger (local to the temporary hidden node)
589-
%% forwards log messages to the upstream logger (on the upstream node)
590-
%% only if the group leader of that message is a remote PID. Because we
591-
%% set a local PID, it stops forwarding log messages originating from that
592-
%% temporary hidden node. That's why we use `with_group_leader_proxy/2' to
593-
%% set the group leader to our proxy only around the use of `erpc'.
594-
%%
595-
%% That's a lot just to keep logging working while not reveal the upstream
596-
%% node to the downstream node...
597-
Parent = self(),
598-
UpstreamGroupLeader = erlang:group_leader(),
599-
ProxyGroupLeader = spawn_link(
600-
?MODULE, group_leader_proxy,
601-
[Parent, UpstreamGroupLeader]),
602-
603576
%% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
604577
%% supported version has it.
605-
MembersPerNode = with_group_leader_proxy(
606-
ProxyGroupLeader,
607-
fun() ->
608-
erpc:multicall(Nodes, rabbit_nodes, all, [])
609-
end),
610-
query_node_props1(Nodes, MembersPerNode, [], ProxyGroupLeader).
611-
612-
with_group_leader_proxy(ProxyGroupLeader, Fun) ->
613-
UpstreamGroupLeader = erlang:group_leader(),
614-
try
615-
true = erlang:group_leader(ProxyGroupLeader, self()),
616-
Fun()
617-
after
618-
true = erlang:group_leader(UpstreamGroupLeader, self())
619-
end.
620-
621-
group_leader_proxy(Parent, UpstreamGroupLeader) ->
622-
receive
623-
stop_proxy ->
624-
erlang:unlink(Parent),
625-
Parent ! proxy_stopped;
626-
Message ->
627-
UpstreamGroupLeader ! Message,
628-
group_leader_proxy(Parent, UpstreamGroupLeader)
629-
end.
578+
MembersPerNode = [try
579+
{ok,
580+
erpc_call(Node, rabbit_nodes, all, [], FromNode)}
581+
catch
582+
Class:Reason ->
583+
{Class, Reason}
584+
end || Node <- Nodes],
585+
query_node_props1(Nodes, MembersPerNode, [], FromNode).
630586

631587
query_node_props1(
632588
[Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps,
633-
ProxyGroupLeader) ->
589+
FromNode) ->
634590
NodeAndProps = {Node, Members},
635591
NodesAndProps1 = [NodeAndProps | NodesAndProps],
636-
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ProxyGroupLeader);
592+
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, FromNode);
637593
query_node_props1(
638-
[Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps,
639-
ProxyGroupLeader) ->
594+
[Node | Nodes], [{_, _} = Error | MembersPerNode], NodesAndProps,
595+
FromNode) ->
640596
%% We consider that an error means the remote node is unreachable or not
641597
%% ready. Therefore, we exclude it from the list of discovered nodes as we
642598
%% won't be able to join it anyway.
@@ -645,55 +601,51 @@ query_node_props1(
645601
"Peer discovery: node '~ts' excluded from the discovered nodes",
646602
[Node, Error, Node],
647603
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
648-
query_node_props1(Nodes, MembersPerNode, NodesAndProps, ProxyGroupLeader);
649-
query_node_props1([], [], NodesAndProps, ProxyGroupLeader) ->
604+
query_node_props1(Nodes, MembersPerNode, NodesAndProps, FromNode);
605+
query_node_props1([], [], NodesAndProps, FromNode) ->
650606
NodesAndProps1 = lists:reverse(NodesAndProps),
651-
query_node_props2(NodesAndProps1, [], ProxyGroupLeader).
652-
653-
query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) ->
654-
try
655-
erpc:call(
656-
Node, logger, debug,
657-
["Peer discovery: temporary hidden node '~ts' queries properties "
658-
"from node '~ts'", [node(), Node]]),
659-
StartTime = get_node_start_time(Node, microsecond, ProxyGroupLeader),
660-
IsReady = is_node_db_ready(Node, ProxyGroupLeader),
661-
NodeAndProps = {Node, Members, StartTime, IsReady},
662-
NodesAndProps1 = [NodeAndProps | NodesAndProps],
663-
query_node_props2(Rest, NodesAndProps1, ProxyGroupLeader)
664-
catch
665-
_:Error:_ ->
666-
%% If one of the erpc calls we use to get the start time fails,
667-
%% there is something wrong with the remote node because it
668-
%% doesn't depend on RabbitMQ. We exclude it from the discovered
669-
%% nodes.
670-
?LOG_DEBUG(
671-
"Peer discovery: failed to query start time of node '~ts': "
672-
"~0tp~n"
673-
"Peer discovery: node '~ts' excluded from the discovered nodes",
674-
[Node, Error, Node],
675-
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
676-
query_node_props2(Rest, NodesAndProps, ProxyGroupLeader)
677-
end;
678-
query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
607+
query_node_props2(NodesAndProps1, [], FromNode).
608+
609+
query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) ->
610+
NodesAndProps2 = try
611+
erpc_call(
612+
Node, logger, debug,
613+
["Peer discovery: temporary hidden node '~ts' "
614+
"queries properties from node '~ts'",
615+
[node(), Node]], FromNode),
616+
StartTime = get_node_start_time(
617+
Node, microsecond, FromNode),
618+
IsReady = is_node_db_ready(Node, FromNode),
619+
NodeAndProps = {Node, Members, StartTime, IsReady},
620+
NodesAndProps1 = [NodeAndProps | NodesAndProps],
621+
NodesAndProps1
622+
catch
623+
_:Error:_ ->
624+
%% If one of the erpc calls we use to get the
625+
%% start time fails, there is something wrong with
626+
%% the remote node because it doesn't depend on
627+
%% RabbitMQ. We exclude it from the discovered
628+
%% nodes.
629+
?LOG_DEBUG(
630+
"Peer discovery: failed to query start time "
631+
"+ DB readyness of node '~ts': ~0tp~n"
632+
"Peer discovery: node '~ts' excluded from the "
633+
"discovered nodes",
634+
[Node, Error, Node],
635+
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
636+
NodesAndProps
637+
end,
638+
query_node_props2(Rest, NodesAndProps2, FromNode);
639+
query_node_props2([], NodesAndProps, _FromNode) ->
679640
NodesAndProps1 = lists:reverse(NodesAndProps),
680-
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
681-
%% Wait for the proxy group leader to flush its inbox.
682-
ProxyGroupLeader ! stop_proxy,
683-
receive
684-
proxy_stopped ->
685-
ok
686-
after 120_000 ->
687-
ok
688-
end,
689641
?assertEqual([], nodes()),
690-
?assert(length(NodesAndProps2) =< length(nodes(hidden))),
691-
NodesAndProps2.
642+
?assert(length(NodesAndProps1) =< length(nodes(hidden))),
643+
NodesAndProps1.
692644

693-
-spec get_node_start_time(Node, Unit, ProxyGroupLeader) -> StartTime when
645+
-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when
694646
Node :: node(),
695647
Unit :: erlang:time_unit(),
696-
ProxyGroupLeader :: pid(),
648+
FromNode :: node(),
697649
StartTime :: non_neg_integer().
698650
%% @doc Returns the start time of the given `Node' in `Unit'.
699651
%%
@@ -713,52 +665,60 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
713665
%%
714666
%% @private
715667

716-
get_node_start_time(Node, Unit, ProxyGroupLeader) ->
717-
with_group_leader_proxy(
718-
ProxyGroupLeader,
719-
fun() ->
720-
NativeStartTime = erpc:call(
721-
Node, erlang, system_info, [start_time]),
722-
TimeOffset = erpc:call(Node, erlang, time_offset, []),
723-
SystemStartTime = NativeStartTime + TimeOffset,
724-
StartTime = erpc:call(
725-
Node, erlang, convert_time_unit,
726-
[SystemStartTime, native, Unit]),
727-
StartTime
728-
end).
729-
730-
-spec is_node_db_ready(Node, ProxyGroupLeader) -> IsReady when
668+
get_node_start_time(Node, Unit, FromNode) ->
669+
NativeStartTime = erpc_call(
670+
Node, erlang, system_info, [start_time], FromNode),
671+
TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode),
672+
SystemStartTime = NativeStartTime + TimeOffset,
673+
StartTime = erpc_call(
674+
Node, erlang, convert_time_unit,
675+
[SystemStartTime, native, Unit], FromNode),
676+
StartTime.
677+
678+
-spec is_node_db_ready(Node, FromNode) -> IsReady when
731679
Node :: node(),
732-
ProxyGroupLeader :: pid(),
680+
FromNode :: node(),
733681
IsReady :: boolean() | undefined.
734682
%% @doc Returns if the node's DB layer is ready or not.
735683
%%
736684
%% @private
737685

738-
is_node_db_ready(Node, ProxyGroupLeader) ->
739-
%% This code is running from a temporary hidden node. We derive the real
740-
%% node interested in the properties from the group leader.
741-
UpstreamGroupLeader = erlang:group_leader(),
742-
ThisNode = node(UpstreamGroupLeader),
743-
case Node of
744-
ThisNode ->
745-
%% The current node is running peer discovery, thus way before we
746-
%% mark the DB layer as ready. Consider it ready in this case,
747-
%% otherwise if the current node is selected, it will loop forever
748-
%% waiting for itself to be ready.
749-
true;
750-
_ ->
751-
with_group_leader_proxy(
752-
ProxyGroupLeader,
753-
fun() ->
754-
try
755-
erpc:call(Node, rabbit_db, is_init_finished, [])
756-
catch
757-
_:{exception, undef,
758-
[{rabbit_db, is_init_finished, _, _} | _]} ->
759-
undefined
760-
end
761-
end)
686+
is_node_db_ready(FromNode, FromNode) ->
687+
%% The function is called for rhe current node running peer discovery, thus
688+
%% way before we mark the DB layer as ready. Consider it ready in this
689+
%% case, otherwise if the current node is selected, it will loop forever
690+
%% waiting for itself to be ready.
691+
true;
692+
is_node_db_ready(Node, FromNode) ->
693+
try
694+
erpc_call(Node, rabbit_db, is_init_finished, [], FromNode)
695+
catch
696+
_:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} ->
697+
undefined
698+
end.
699+
700+
erpc_call(Node, Mod, Fun, Args, FromNode) ->
701+
erpc_call(Node, Mod, Fun, Args, FromNode, 10000).
702+
703+
erpc_call(Node, Mod, Fun, Args, FromNode, Timeout) when Timeout >= 0 ->
704+
try
705+
erpc:call(Node, Mod, Fun, Args)
706+
catch
707+
error:{erpc, _} = Reason:Stacktrace ->
708+
Peer = node(),
709+
_ = catch erpc:call(
710+
FromNode,
711+
logger, debug,
712+
["Peer discovery: temporary hidden node '~ts' "
713+
"failed to connect to '~ts': ~0p",
714+
[Peer, Node, Reason]]),
715+
Sleep = 1000,
716+
timer:sleep(Sleep),
717+
NewTimeout = Timeout - Sleep,
718+
case NewTimeout >= 0 of
719+
true -> erpc_call(Node, Mod, Fun, Args, FromNode, NewTimeout);
720+
false -> erlang:raise(error, Reason, Stacktrace)
721+
end
762722
end.
763723

764724
-spec sort_nodes_and_props(NodesAndProps) ->

0 commit comments

Comments
 (0)