diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 4a013bbe70d3..f5efdcf2cddf 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1528,25 +1528,14 @@ shrink_all(Node) -> grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable). --spec grow(node(), binary(), binary(), all | even, membership()) -> +-spec grow(node() | integer(), binary(), binary(), all | even, membership()) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. -grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> +grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) -> Running = rabbit_nodes:list_running(), [begin Size = length(get_nodes(Q)), - QName = amqqueue:get_name(Q), - rabbit_log:info("~ts: adding a new member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node, Membership) of - ok -> - {QName, {ok, Size + 1}}; - {error, Err} -> - rabbit_log:warning( - "~ts: failed to add member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end + maybe_grow(Q, Node, Membership, Size) end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE, @@ -1556,7 +1545,61 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> lists:member(Node, Running), matches_strategy(Strategy, get_nodes(Q)), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]; + +grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) + when is_integer(QuorumClusterSize), QuorumClusterSize > 0 -> + Running = rabbit_nodes:list_running(), + TotalRunning = length(Running), + + TargetQuorumClusterSize = + if QuorumClusterSize > TotalRunning -> + %% we cant grow beyond total running nodes + TotalRunning; + true -> + QuorumClusterSize + end, + + lists:flatten( + [begin + QNodes = get_nodes(Q), + case length(QNodes) of + Size when Size < TargetQuorumClusterSize -> + TargetAvailableNodes = Running -- QNodes, + N = length(TargetAvailableNodes), + Node = lists:nth(rand:uniform(N), TargetAvailableNodes), + maybe_grow(Q, Node, Membership, Size); + _ -> + [] + end + end + || _ <- lists:seq(1, TargetQuorumClusterSize), + Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + matches_strategy(Strategy, get_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]); + +grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership) + when is_integer(QuorumClusterSize) -> + rabbit_log:warning( + "cannot grow queues to a quorum cluster size less than zero (~tp)", + [QuorumClusterSize]), + {error, bad_quorum_cluster_size}. + +maybe_grow(Q, Node, Membership, Size) -> + QName = amqqueue:get_name(Q), + rabbit_log:info("~ts: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node, Membership) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + rabbit_log:warning( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end. -spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 463445b9f474..d3e807e4543d 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -110,7 +110,8 @@ groups() -> node_removal_is_not_quorum_critical, select_nodes_with_least_replicas, select_nodes_with_least_replicas_node_down, - subscribe_from_each + subscribe_from_each, + grow_queue ]}, @@ -1536,6 +1537,86 @@ subscribe_from_each(Config) -> ok. +grow_queue(Config) -> + [Server0, Server1, _Server2, _Server3, _Server4] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + + QQs = [QQ, AQ], + MsgCount = 3, + + [begin + RaName = ra_name(Q), + rabbit_ct_client_helpers:publish(Ch, Q, MsgCount), + wait_for_messages_ready([Server0], RaName, MsgCount), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(5, length(Nodes0)) + end || Q <- QQs], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + TargetClusterSize_1 = 1, + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1' + TargetClusterSize_2 = 2, + rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '2' has no effect + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '3' + TargetClusterSize_3 = 3, + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount), + + %% grow queues to quorum cluster size '5' + TargetClusterSize_5 = 5, + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to quorum cluster size > '5' (limit = 5). + TargetClusterSize_10 = 10, + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% attempt to grow queues to quorum cluster size < '0'. + BadTargetClusterSize = -5, + ?assertEqual({error, bad_quorum_cluster_size}, + rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])). + +assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Node], RaName, MsgCount), + {ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(TargetClusterSize, length(Nodes0)) + end || Q <- Qs]. + gh_12635(Config) -> % https://github.com/rabbitmq/rabbitmq-server/issues/12635 [Server0, _Server1, Server2] = diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index f1ada3a383bb..8df0b54a6f0e 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -39,6 +39,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, "strategy '#{s}' is not recognised."} end + def validate([n, _], _) + when (is_integer(n) and n <= 0) do + {:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."} + end + def validate(_, %{membership: m}) when not (m == "promotable" or m == "non_voter" or @@ -60,14 +65,22 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do ) end - def run([node, strategy], %{ + def run([node_or_quorum_cluster_size, strategy], %{ node: node_name, vhost_pattern: vhost_pat, queue_pattern: queue_pat, membership: membership, errors_only: errors_only }) do - args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] + + node_or_quorum_cluster_size = + if is_integer(node_or_quorum_cluster_size) do + node_or_quorum_cluster_size + else + to_atom(node_or_quorum_cluster_size) + end + + args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)] args = case to_atom(membership) do @@ -108,11 +121,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do def usage, do: - "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" + "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" def usage_additional do [ - ["", "node name to place replicas on"], + ["", "node name to place replicas on or desired quorum cluster size"], [ "", "add a member for all matching queues or just those whose membership count is an even number" diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index 2b1aab070317..0b28f3957eef 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -44,51 +44,76 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do end test "validate: when one argument is provided, returns a failure" do - assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :not_enough_args} + assert @command.validate(["target@node"], %{}) == {:validation_failure, :not_enough_args} end test "validate: when a node and even are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "even"], %{}) == :ok + assert @command.validate(["target@node", "even"], %{}) == :ok end test "validate: when a node and all are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{}) == :ok + assert @command.validate(["target@node", "all"], %{}) == :ok end test "validate: when a node and something else is provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "banana"], %{}) == + assert @command.validate(["target@node", "banana"], %{}) == {:validation_failure, "strategy 'banana' is not recognised."} end test "validate: when three arguments are provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "extra-arg", "another-extra-arg"], %{}) == + assert @command.validate(["target@node", "extra-arg", "another-extra-arg"], %{}) == {:validation_failure, :too_many_args} end test "validate: when membership promotable is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok end test "validate: when membership voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end test "validate: when membership non_voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == :ok end test "validate: when wrong membership is provided, returns failure" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + assert @command.validate(["target@node", "all"], %{membership: "banana", queue_pattern: "qq.*"}) == {:validation_failure, "voter status 'banana' is not recognised."} end + test "validate: when target quorum cluster size greater than zero, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok + end + + test "validate: when target quorum cluster size is zero, returns failure" do + assert @command.validate([0, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == + {:validation_failure, "target quorum cluster size '0' must be greater than 0."} + end + + test "validate: when target quorum cluster size is less than zero, returns failure" do + assert @command.validate([-1, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == + {:validation_failure, "target quorum cluster size '-1' must be greater than 0."} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do + assert match?( + {:badrpc, _}, + @command.run( + ["target@node", "all"], + Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) + ) + ) + end + @tag test_timeout: 3000 - test "run: targeting an unreachable node throws a badrpc", context do + test "run: targeting an unreachable node throws a badrpc when growing to a target quorum cluster size", context do assert match?( {:badrpc, _}, @command.run( - ["quorum-queue-a", "all"], - Map.merge(context[:opts], %{node: :jake@thedog}) + [5, "all"], + Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) ) ) end