Skip to content

Unblock group of consumers on super stream partition #7765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,9 @@ rabbitmq_suite(
deps = [
"//deps/rabbit_common:erlang_app",
],
runtime_deps = [
"@meck//:erlang_app",
],
)

rabbitmq_integration_suite(
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,12 @@
stability => stable,
depends_on => [stream_queue]
}}).


-rabbit_feature_flag(
{stream_sac_coordinator_unblock_group,
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
stability => stable,
depends_on => [stream_single_active_consumer]
}}).
103 changes: 82 additions & 21 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
of
{value, Consumer} ->
G1 = remove_from_group(Consumer, Group0),
handle_consumer_removal(G1, Consumer);
handle_consumer_removal(G1, Consumer, Stream, ConsumerName);
false ->
{Group0, []}
end,
Expand All @@ -247,19 +247,24 @@ apply(#command_activate_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName},
#?MODULE{groups = StreamGroups0} = State0) ->
rabbit_log:debug("Activating consumer on ~tp, group ~p",
[Stream, ConsumerName]),
{G, Eff} =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
undefined ->
rabbit_log:warning("trying to activate consumer in group ~tp, but "
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
"the group does not longer exist",
[{VirtualHost, Stream, ConsumerName}]),
{undefined, []};
Group ->
#consumer{pid = Pid, subscription_id = SubId} =
evaluate_active_consumer(Group),
rabbit_log:debug("New active consumer on ~tp, group ~tp " ++
"is ~tp from ~tp",
[Stream, ConsumerName, SubId, Pid]),
Group1 =
update_consumer_state_in_group(Group, Pid, SubId, true),
{Group1, [notify_consumer_effect(Pid, SubId, true)]}
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
end,
StreamGroups1 =
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
Expand Down Expand Up @@ -499,7 +504,8 @@ do_register_consumer(VirtualHost,
Effects =
case Active of
true ->
[notify_consumer_effect(ConnectionPid, SubscriptionId, Active)];
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, Active)];
_ ->
[]
end,
Expand Down Expand Up @@ -527,7 +533,8 @@ do_register_consumer(VirtualHost,
active = true},
G1 = add_to_group(Consumer0, Group0),
{G1,
[notify_consumer_effect(ConnectionPid, SubscriptionId, true)]};
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, true)]};
_G ->
%% whatever the current state is, the newcomer will be passive
Consumer0 =
Expand All @@ -546,18 +553,28 @@ do_register_consumer(VirtualHost,
%% the current active stays the same
{G1, []};
_ ->
rabbit_log:debug("SAC consumer registration: " ++
"active consumer change on stream ~tp, group ~tp. " ++
"Notifying ~tp from ~tp it is no longer active.",
[Stream, ConsumerName, ActSubId, ActPid]),
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(G1,
ActPid,
ActSubId,
false),
[notify_consumer_effect(ActPid,
ActSubId,
Stream,
ConsumerName,
false,
true)]}
end;
false ->
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
rabbit_log:debug("SAC consumer registration: no active consumer on stream ~tp, group ~tp. " ++
"Likely waiting for a response from former active consumer.",
[Stream, ConsumerName]),
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{G1, []}
end
end,
Expand All @@ -571,27 +588,27 @@ do_register_consumer(VirtualHost,
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.

handle_consumer_removal(#group{consumers = []} = G, _) ->
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
{G, []};
handle_consumer_removal(#group{partition_index = -1} = Group0,
Consumer) ->
Consumer, Stream, ConsumerName) ->
case Consumer of
#consumer{active = true} ->
%% this is the active consumer we remove, computing the new one
Group1 = compute_active_consumer(Group0),
case lookup_active_consumer(Group1) of
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
%% creating the side effect to notify the new active consumer
{Group1, [notify_consumer_effect(Pid, SubId, true)]};
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]};
_ ->
%% no active consumer found in the group, nothing to do
{Group1, []}
end;
#consumer{active = false} ->
%% not the active consumer, nothing to do."),
%% not the active consumer, nothing to do.
{Group0, []}
end;
handle_consumer_removal(Group0, Consumer) ->
handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
case lookup_active_consumer(Group0) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} =
Expand All @@ -601,40 +618,81 @@ handle_consumer_removal(Group0, Consumer) ->
%% the current active stays the same
{Group0, []};
_ ->
rabbit_log:debug("SAC consumer removal: " ++
"active consumer change on stream ~tp, group ~tp. " ++
"Notifying ~tp from ~tp it is no longer active.",
[Stream, ConsumerName, ActSubId, ActPid]),

%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
false),
[notify_consumer_effect(ActPid, ActSubId, false, true)]}
[notify_consumer_effect(ActPid, ActSubId,
Stream, ConsumerName, false, true)]}
end;
false ->
case Consumer#consumer.active of
true ->
%% the active one is going away, picking a new one
#consumer{pid = P, subscription_id = SID} =
evaluate_active_consumer(Group0),
rabbit_log:debug("SAC consumer removal: " ++
"active consumer change on stream ~tp, group ~tp. " ++
"Notifying ~tp from ~tp it is the new active consumer.",
[Stream, ConsumerName, SID, P]),
{update_consumer_state_in_group(Group0, P, SID, true),
[notify_consumer_effect(P, SID, true)]};
[notify_consumer_effect(P, SID,
Stream, ConsumerName, true)]};
false ->
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
rabbit_log:debug("SAC consumer removal: no active consumer on stream ~tp, group ~tp. " ++
"Likely waiting for a response from former active consumer.",
[Stream, ConsumerName]),
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{Group0, []}
end
end.

notify_consumer_effect(Pid, SubId, Active) ->
notify_consumer_effect(Pid, SubId, Active, false).
message_type() ->
case has_unblock_group_support() of
true ->
map;
false ->
tuple
end.

notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).

notify_consumer_effect(Pid, SubId, Active, false = _SteppingDown) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()).

notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) ->
mod_call_effect(Pid,
{sac,
{{subscription_id, SubId}, {active, Active},
{{subscription_id, SubId},
{active, Active},
{extra, []}}});
notify_consumer_effect(Pid, SubId, Active, true = _SteppingDown) ->
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) ->
mod_call_effect(Pid,
{sac,
{{subscription_id, SubId}, {active, Active},
{extra, [{stepping_down, true}]}}}).
{{subscription_id, SubId},
{active, Active},
{extra, [{stepping_down, true}]}}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = _SteppingDown, map) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active,
stepping_down => true}}).

maybe_create_group(VirtualHost,
Stream,
Expand Down Expand Up @@ -743,3 +801,6 @@ mod_call_effect(Pid, Msg) ->
send_message(ConnectionPid, Msg) ->
ConnectionPid ! Msg,
ok.

has_unblock_group_support() ->
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).
43 changes: 27 additions & 16 deletions deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ end_per_group(_Group, _Config) ->
ok.

init_per_testcase(_TestCase, Config) ->
ok = meck:new(rabbit_feature_flags),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
Config.

end_per_testcase(_TestCase, _Config) ->
meck:unload(),
ok.

simple_sac_test(_) ->
Expand All @@ -71,7 +74,7 @@ simple_sac_test(_) ->
rabbit_stream_sac_coordinator:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),

Command1 =
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
Expand Down Expand Up @@ -107,7 +110,7 @@ simple_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 1, true),
consumer(ConnectionPid, 2, false)],
Consumers4),
assertSendMessageEffect(ConnectionPid, 1, true, Effects4),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),

Command4 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
Expand All @@ -116,7 +119,7 @@ simple_sac_test(_) ->
ok, Effects5} =
rabbit_stream_sac_coordinator:apply(Command4, State4),
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
assertSendMessageEffect(ConnectionPid, 2, true, Effects5),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),

Command5 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
Expand All @@ -141,7 +144,7 @@ super_stream_partition_sac_test(_) ->
rabbit_stream_sac_coordinator:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),

Command1 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
Expand All @@ -155,7 +158,7 @@ super_stream_partition_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, false)],
Consumers2),
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Effects2),
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2),

Command2 = activate_consumer_command(Stream, ConsumerName),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
Expand All @@ -167,7 +170,7 @@ super_stream_partition_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, true)],
Consumers3),
assertSendMessageEffect(ConnectionPid, 1, true, Effects3),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),

Command3 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
Expand Down Expand Up @@ -197,7 +200,7 @@ super_stream_partition_sac_test(_) ->
consumer(ConnectionPid, 2, false)],
Consumers5),

assertSendMessageSteppingDownEffect(ConnectionPid, 1, Effects5),
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5),

Command5 = activate_consumer_command(Stream, ConsumerName),
{#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} =
Expand All @@ -208,7 +211,7 @@ super_stream_partition_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, true)],
Consumers6),
assertSendMessageEffect(ConnectionPid, 2, true, Effects6),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),

Command6 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
Expand Down Expand Up @@ -310,7 +313,9 @@ ensure_monitors_test(_) ->
ok.

handle_connection_down_test(_) ->
GroupId = {<<"/">>, <<"stream">>, <<"app">>},
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group =
Expand All @@ -326,7 +331,7 @@ handle_connection_down_test(_) ->
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
assertSize(1, PidsGroups1),
assertSize(1, maps:get(Pid1, PidsGroups1)),
assertSendMessageEffect(Pid1, 1, true, Effects1),
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
Groups1),
{#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
Expand Down Expand Up @@ -397,22 +402,28 @@ activate_consumer_command(Stream, ConsumerName) ->
stream = Stream,
consumer_name = ConsumerName}.

assertSendMessageEffect(Pid, SubId, Active, [Effect]) ->
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
{{subscription_id, SubId}, {active, Active},
{extra, []}}}]},
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => Active}
}]},
Effect).

assertSendMessageSteppingDownEffect(Pid, SubId, [Effect]) ->
assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
{{subscription_id, SubId}, {active, false},
{extra, [{stepping_down, true}]}}}]},
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => false,
stepping_down => true}}]},
Effect).
Loading