Skip to content

QQ grow to a target quorum cluster size #13873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 58 additions & 15 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1528,25 +1528,14 @@ shrink_all(Node) ->
grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).

-spec grow(node(), binary(), binary(), all | even, membership()) ->
-spec grow(node() | integer(), binary(), binary(), all | even, membership()) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) ->
Running = rabbit_nodes:list_running(),
[begin
Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
rabbit_log:warning(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end
maybe_grow(Q, Node, Membership, Size)
end
|| Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
Expand All @@ -1556,7 +1545,61 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
lists:member(Node, Running),
matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ];

grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership)
when is_integer(QuorumClusterSize), QuorumClusterSize > 0 ->
Running = rabbit_nodes:list_running(),
TotalRunning = length(Running),

TargetQuorumClusterSize =
if QuorumClusterSize > TotalRunning ->
%% we cant grow beyond total running nodes
TotalRunning;
true ->
QuorumClusterSize
end,

lists:flatten(
[begin
QNodes = get_nodes(Q),
case length(QNodes) of
Size when Size < TargetQuorumClusterSize ->
TargetAvailableNodes = Running -- QNodes,
N = length(TargetAvailableNodes),
Node = lists:nth(rand:uniform(N), TargetAvailableNodes),
maybe_grow(Q, Node, Membership, Size);
_ ->
[]
end
end
|| _ <- lists:seq(1, TargetQuorumClusterSize),
Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]);

grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership)
when is_integer(QuorumClusterSize) ->
rabbit_log:warning(
"cannot grow queues to a quorum cluster size less than zero (~tp)",
[QuorumClusterSize]),
{error, bad_quorum_cluster_size}.

maybe_grow(Q, Node, Membership, Size) ->
QName = amqqueue:get_name(Q),
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
rabbit_log:warning(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end.

-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
transfer_leadership(Q, Destination) ->
Expand Down
83 changes: 82 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ groups() ->
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down,
subscribe_from_each
subscribe_from_each,
grow_queue


]},
Expand Down Expand Up @@ -1536,6 +1537,86 @@ subscribe_from_each(Config) ->

ok.

grow_queue(Config) ->
[Server0, Server1, _Server2, _Server3, _Server4] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
AQ = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 5}])),
?assertEqual({'queue.declare_ok', AQ, 0, 0},
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 5}])),

QQs = [QQ, AQ],
MsgCount = 3,

[begin
RaName = ra_name(Q),
rabbit_ct_client_helpers:publish(Ch, Q, MsgCount),
wait_for_messages_ready([Server0], RaName, MsgCount),
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(5, length(Nodes0))
end || Q <- QQs],

rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),

TargetClusterSize_1 = 1,
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to node 'Server1'
TargetClusterSize_2 = 2,
rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to quorum cluster size '2' has no effect
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to quorum cluster size '3'
TargetClusterSize_3 = 3,
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all]),
assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount),

%% grow queues to quorum cluster size '5'
TargetClusterSize_5 = 5,
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]),
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),

%% shrink all queues again
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to quorum cluster size > '5' (limit = 5).
TargetClusterSize_10 = 10,
rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]),
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),

%% shrink all queues again
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% attempt to grow queues to quorum cluster size < '0'.
BadTargetClusterSize = -5,
?assertEqual({error, bad_quorum_cluster_size},
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])).

assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) ->
[begin
RaName = ra_name(Q),
wait_for_messages_ready([Node], RaName, MsgCount),
{ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(TargetClusterSize, length(Nodes0))
end || Q <- Qs].

gh_12635(Config) ->
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
[Server0, _Server1, Server2] =
Expand Down
21 changes: 17 additions & 4 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
{:validation_failure, "strategy '#{s}' is not recognised."}
end

def validate([n, _], _)
when (is_integer(n) and n <= 0) do
{:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."}
end

def validate(_, %{membership: m})
when not (m == "promotable" or
m == "non_voter" or
Expand All @@ -60,14 +65,22 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
)
end

def run([node, strategy], %{
def run([node_or_quorum_cluster_size, strategy], %{
node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
membership: membership,
errors_only: errors_only
}) do
args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)]

node_or_quorum_cluster_size =
if is_integer(node_or_quorum_cluster_size) do
node_or_quorum_cluster_size
else
to_atom(node_or_quorum_cluster_size)
end

args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)]

args =
case to_atom(membership) do
Expand Down Expand Up @@ -108,11 +121,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do

def usage,
do:
"grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
"grow <node | quorum_cluster_size> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"

def usage_additional do
[
["<node>", "node name to place replicas on"],
["<node | quorum_cluster_size>", "node name to place replicas on or desired quorum cluster size"],
[
"<all | even>",
"add a member for all matching queues or just those whose membership count is an even number"
Expand Down
49 changes: 37 additions & 12 deletions deps/rabbitmq_cli/test/queues/grow_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,51 +44,76 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
end

test "validate: when one argument is provided, returns a failure" do
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :not_enough_args}
assert @command.validate(["target@node"], %{}) == {:validation_failure, :not_enough_args}
end

test "validate: when a node and even are provided, returns a success" do
assert @command.validate(["quorum-queue-a", "even"], %{}) == :ok
assert @command.validate(["target@node", "even"], %{}) == :ok
end

test "validate: when a node and all are provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{}) == :ok
assert @command.validate(["target@node", "all"], %{}) == :ok
end

test "validate: when a node and something else is provided, returns a failure" do
assert @command.validate(["quorum-queue-a", "banana"], %{}) ==
assert @command.validate(["target@node", "banana"], %{}) ==
{:validation_failure, "strategy 'banana' is not recognised."}
end

test "validate: when three arguments are provided, returns a failure" do
assert @command.validate(["quorum-queue-a", "extra-arg", "another-extra-arg"], %{}) ==
assert @command.validate(["target@node", "extra-arg", "another-extra-arg"], %{}) ==
{:validation_failure, :too_many_args}
end

test "validate: when membership promotable is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok
assert @command.validate(["target@node", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok
end

test "validate: when membership voter is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok
assert @command.validate(["target@node", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok
end

test "validate: when membership non_voter is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok
assert @command.validate(["target@node", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == :ok
end

test "validate: when wrong membership is provided, returns failure" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) ==
assert @command.validate(["target@node", "all"], %{membership: "banana", queue_pattern: "qq.*"}) ==
{:validation_failure, "voter status 'banana' is not recognised."}
end

test "validate: when target quorum cluster size greater than zero, returns a success" do
assert @command.validate([7, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok
end

test "validate: when target quorum cluster size is zero, returns failure" do
assert @command.validate([0, "all"], %{membership: "voter", queue_pattern: "qq.*"}) ==
{:validation_failure, "target quorum cluster size '0' must be greater than 0."}
end

test "validate: when target quorum cluster size is less than zero, returns failure" do
assert @command.validate([-1, "all"], %{membership: "voter", queue_pattern: "qq.*"}) ==
{:validation_failure, "target quorum cluster size '-1' must be greater than 0."}
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do
assert match?(
{:badrpc, _},
@command.run(
["target@node", "all"],
Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"})
)
)
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
test "run: targeting an unreachable node throws a badrpc when growing to a target quorum cluster size", context do
assert match?(
{:badrpc, _},
@command.run(
["quorum-queue-a", "all"],
Map.merge(context[:opts], %{node: :jake@thedog})
[5, "all"],
Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"})
)
)
end
Expand Down
Loading