Skip to content

Commit 27ef97e

Browse files
committed
QQ: handle_tick improvements
Move leader repair earlier in tick function to ensure more timely update of meta data store record after leader change. Also use RPC_TIMEOUT macro for metric/stats multicalls to improve liveness when a node is connected but partitioned / frozen.
1 parent f10e084 commit 27ef97e

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

+10-6
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ start_cluster(Q) ->
266266
#{nodes => [LeaderNode | FollowerNodes]}),
267267

268268
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
269-
rabbit_fifo, version, [])],
269+
rabbit_fifo, version, [],
270+
?RPC_TIMEOUT)],
270271
MinVersion = lists:min([rabbit_fifo:version() | Versions]),
271272

272273
rabbit_log:debug("Will start up to ~w replicas for quorum queue ~ts with "
@@ -583,6 +584,7 @@ handle_tick(QName,
583584
fun() ->
584585
try
585586
{ok, Q} = rabbit_amqqueue:lookup(QName),
587+
ok = repair_leader_record(Q, Name),
586588
Reductions = reductions(Name),
587589
rabbit_core_metrics:queue_stats(QName, NumReadyMsgs,
588590
NumCheckedOut, NumMessages,
@@ -636,12 +638,12 @@ handle_tick(QName,
636638
end}
637639
| Infos0],
638640
rabbit_core_metrics:queue_stats(QName, Infos),
639-
ok = repair_leader_record(Q, Name),
640641
case repair_amqqueue_nodes(Q) of
641642
ok ->
642643
ok;
643644
repaired ->
644-
rabbit_log:debug("Repaired quorum queue ~ts amqqueue record", [rabbit_misc:rs(QName)])
645+
rabbit_log:debug("Repaired quorum queue ~ts amqqueue record",
646+
[rabbit_misc:rs(QName)])
645647
end,
646648
ExpectedNodes = rabbit_nodes:list_members(),
647649
case Nodes -- ExpectedNodes of
@@ -1763,8 +1765,9 @@ i(leader, Q) -> leader(Q);
17631765
i(open_files, Q) when ?is_amqqueue(Q) ->
17641766
{Name, _} = amqqueue:get_pid(Q),
17651767
Nodes = get_connected_nodes(Q),
1766-
{Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
1767-
lists:flatten(Data);
1768+
[Info || {ok, {_, _} = Info} <-
1769+
erpc:multicall(Nodes, ?MODULE, open_files,
1770+
[Name], ?RPC_TIMEOUT)];
17681771
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
17691772
QPid = amqqueue:get_pid(Q),
17701773
case ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1) of
@@ -1883,7 +1886,8 @@ online(Q) when ?is_amqqueue(Q) ->
18831886
Nodes = get_connected_nodes(Q),
18841887
{Name, _} = amqqueue:get_pid(Q),
18851888
[node(Pid) || {ok, Pid} <-
1886-
erpc:multicall(Nodes, erlang, whereis, [Name]),
1889+
erpc:multicall(Nodes, erlang, whereis,
1890+
[Name], ?RPC_TIMEOUT),
18871891
is_pid(Pid)].
18881892

18891893
format(Q, Ctx) when ?is_amqqueue(Q) ->

0 commit comments

Comments
 (0)