Skip to content

Commit 66b7692

Browse files
Merge pull request #7835 from rabbitmq/mergify/bp/v3.12.x/pr-7765
Unblock group of consumers on super stream partition (backport #7765)
2 parents 349c484 + ef08b1e commit 66b7692

6 files changed

+403
-112
lines changed

deps/rabbit/BUILD.bazel

+3
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,9 @@ rabbitmq_suite(
800800
deps = [
801801
"//deps/rabbit_common:erlang_app",
802802
],
803+
runtime_deps = [
804+
"@meck//:erlang_app",
805+
],
803806
)
804807

805808
rabbitmq_integration_suite(

deps/rabbit/src/rabbit_core_ff.erl

+9
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,12 @@
104104
stability => stable,
105105
depends_on => [stream_queue]
106106
}}).
107+
108+
109+
-rabbit_feature_flag(
110+
{stream_sac_coordinator_unblock_group,
111+
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
112+
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
113+
stability => stable,
114+
depends_on => [stream_single_active_consumer]
115+
}}).

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+82-21
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
231231
of
232232
{value, Consumer} ->
233233
G1 = remove_from_group(Consumer, Group0),
234-
handle_consumer_removal(G1, Consumer);
234+
handle_consumer_removal(G1, Consumer, Stream, ConsumerName);
235235
false ->
236236
{Group0, []}
237237
end,
@@ -247,19 +247,24 @@ apply(#command_activate_consumer{vhost = VirtualHost,
247247
stream = Stream,
248248
consumer_name = ConsumerName},
249249
#?MODULE{groups = StreamGroups0} = State0) ->
250+
rabbit_log:debug("Activating consumer on ~tp, group ~p",
251+
[Stream, ConsumerName]),
250252
{G, Eff} =
251253
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
252254
undefined ->
253-
rabbit_log:warning("trying to activate consumer in group ~tp, but "
255+
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
254256
"the group does not longer exist",
255257
[{VirtualHost, Stream, ConsumerName}]),
256258
{undefined, []};
257259
Group ->
258260
#consumer{pid = Pid, subscription_id = SubId} =
259261
evaluate_active_consumer(Group),
262+
rabbit_log:debug("New active consumer on ~tp, group ~tp " ++
263+
"is ~tp from ~tp",
264+
[Stream, ConsumerName, SubId, Pid]),
260265
Group1 =
261266
update_consumer_state_in_group(Group, Pid, SubId, true),
262-
{Group1, [notify_consumer_effect(Pid, SubId, true)]}
267+
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
263268
end,
264269
StreamGroups1 =
265270
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
@@ -499,7 +504,8 @@ do_register_consumer(VirtualHost,
499504
Effects =
500505
case Active of
501506
true ->
502-
[notify_consumer_effect(ConnectionPid, SubscriptionId, Active)];
507+
[notify_consumer_effect(ConnectionPid, SubscriptionId,
508+
Stream, ConsumerName, Active)];
503509
_ ->
504510
[]
505511
end,
@@ -527,7 +533,8 @@ do_register_consumer(VirtualHost,
527533
active = true},
528534
G1 = add_to_group(Consumer0, Group0),
529535
{G1,
530-
[notify_consumer_effect(ConnectionPid, SubscriptionId, true)]};
536+
[notify_consumer_effect(ConnectionPid, SubscriptionId,
537+
Stream, ConsumerName, true)]};
531538
_G ->
532539
%% whatever the current state is, the newcomer will be passive
533540
Consumer0 =
@@ -546,18 +553,28 @@ do_register_consumer(VirtualHost,
546553
%% the current active stays the same
547554
{G1, []};
548555
_ ->
556+
rabbit_log:debug("SAC consumer registration: " ++
557+
"active consumer change on stream ~tp, group ~tp. " ++
558+
"Notifying ~tp from ~tp it is no longer active.",
559+
[Stream, ConsumerName, ActSubId, ActPid]),
549560
%% there's a change, telling the active it's not longer active
550561
{update_consumer_state_in_group(G1,
551562
ActPid,
552563
ActSubId,
553564
false),
554565
[notify_consumer_effect(ActPid,
555566
ActSubId,
567+
Stream,
568+
ConsumerName,
556569
false,
557570
true)]}
558571
end;
559572
false ->
560-
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
573+
rabbit_log:debug("SAC consumer registration: no active consumer on stream ~tp, group ~tp. " ++
574+
"Likely waiting for a response from former active consumer.",
575+
[Stream, ConsumerName]),
576+
%% no active consumer in the (non-empty) group,
577+
%% we are waiting for the reply of a former active
561578
{G1, []}
562579
end
563580
end,
@@ -571,27 +588,27 @@ do_register_consumer(VirtualHost,
571588
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
572589
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.
573590

574-
handle_consumer_removal(#group{consumers = []} = G, _) ->
591+
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
575592
{G, []};
576593
handle_consumer_removal(#group{partition_index = -1} = Group0,
577-
Consumer) ->
594+
Consumer, Stream, ConsumerName) ->
578595
case Consumer of
579596
#consumer{active = true} ->
580597
%% this is the active consumer we remove, computing the new one
581598
Group1 = compute_active_consumer(Group0),
582599
case lookup_active_consumer(Group1) of
583600
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
584601
%% creating the side effect to notify the new active consumer
585-
{Group1, [notify_consumer_effect(Pid, SubId, true)]};
602+
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]};
586603
_ ->
587604
%% no active consumer found in the group, nothing to do
588605
{Group1, []}
589606
end;
590607
#consumer{active = false} ->
591-
%% not the active consumer, nothing to do."),
608+
%% not the active consumer, nothing to do.
592609
{Group0, []}
593610
end;
594-
handle_consumer_removal(Group0, Consumer) ->
611+
handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
595612
case lookup_active_consumer(Group0) of
596613
{value,
597614
#consumer{pid = ActPid, subscription_id = ActSubId} =
@@ -601,40 +618,81 @@ handle_consumer_removal(Group0, Consumer) ->
601618
%% the current active stays the same
602619
{Group0, []};
603620
_ ->
621+
rabbit_log:debug("SAC consumer removal: " ++
622+
"active consumer change on stream ~tp, group ~tp. " ++
623+
"Notifying ~tp from ~tp it is no longer active.",
624+
[Stream, ConsumerName, ActSubId, ActPid]),
625+
604626
%% there's a change, telling the active it's not longer active
605627
{update_consumer_state_in_group(Group0,
606628
ActPid,
607629
ActSubId,
608630
false),
609-
[notify_consumer_effect(ActPid, ActSubId, false, true)]}
631+
[notify_consumer_effect(ActPid, ActSubId,
632+
Stream, ConsumerName, false, true)]}
610633
end;
611634
false ->
612635
case Consumer#consumer.active of
613636
true ->
614637
%% the active one is going away, picking a new one
615638
#consumer{pid = P, subscription_id = SID} =
616639
evaluate_active_consumer(Group0),
640+
rabbit_log:debug("SAC consumer removal: " ++
641+
"active consumer change on stream ~tp, group ~tp. " ++
642+
"Notifying ~tp from ~tp it is the new active consumer.",
643+
[Stream, ConsumerName, SID, P]),
617644
{update_consumer_state_in_group(Group0, P, SID, true),
618-
[notify_consumer_effect(P, SID, true)]};
645+
[notify_consumer_effect(P, SID,
646+
Stream, ConsumerName, true)]};
619647
false ->
620-
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
648+
rabbit_log:debug("SAC consumer removal: no active consumer on stream ~tp, group ~tp. " ++
649+
"Likely waiting for a response from former active consumer.",
650+
[Stream, ConsumerName]),
651+
%% no active consumer in the (non-empty) group,
652+
%% we are waiting for the reply of a former active
621653
{Group0, []}
622654
end
623655
end.
624656

625-
notify_consumer_effect(Pid, SubId, Active) ->
626-
notify_consumer_effect(Pid, SubId, Active, false).
657+
message_type() ->
658+
case has_unblock_group_support() of
659+
true ->
660+
map;
661+
false ->
662+
tuple
663+
end.
664+
665+
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
666+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
627667

628-
notify_consumer_effect(Pid, SubId, Active, false = _SteppingDown) ->
668+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) ->
669+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()).
670+
671+
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) ->
629672
mod_call_effect(Pid,
630673
{sac,
631-
{{subscription_id, SubId}, {active, Active},
674+
{{subscription_id, SubId},
675+
{active, Active},
632676
{extra, []}}});
633-
notify_consumer_effect(Pid, SubId, Active, true = _SteppingDown) ->
677+
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) ->
634678
mod_call_effect(Pid,
635679
{sac,
636-
{{subscription_id, SubId}, {active, Active},
637-
{extra, [{stepping_down, true}]}}}).
680+
{{subscription_id, SubId},
681+
{active, Active},
682+
{extra, [{stepping_down, true}]}}});
683+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) ->
684+
mod_call_effect(Pid,
685+
{sac, #{subscription_id => SubId,
686+
stream => Stream,
687+
consumer_name => Name,
688+
active => Active}});
689+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = _SteppingDown, map) ->
690+
mod_call_effect(Pid,
691+
{sac, #{subscription_id => SubId,
692+
stream => Stream,
693+
consumer_name => Name,
694+
active => Active,
695+
stepping_down => true}}).
638696

639697
maybe_create_group(VirtualHost,
640698
Stream,
@@ -743,3 +801,6 @@ mod_call_effect(Pid, Msg) ->
743801
send_message(ConnectionPid, Msg) ->
744802
ConnectionPid ! Msg,
745803
ok.
804+
805+
has_unblock_group_support() ->
806+
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

+27-16
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@ end_per_group(_Group, _Config) ->
5252
ok.
5353

5454
init_per_testcase(_TestCase, Config) ->
55+
ok = meck:new(rabbit_feature_flags),
56+
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
5557
Config.
5658

5759
end_per_testcase(_TestCase, _Config) ->
60+
meck:unload(),
5861
ok.
5962

6063
simple_sac_test(_) ->
@@ -71,7 +74,7 @@ simple_sac_test(_) ->
7174
rabbit_stream_sac_coordinator:apply(Command0, State0),
7275
?assert(Active1),
7376
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
74-
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
77+
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
7578

7679
Command1 =
7780
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
@@ -107,7 +110,7 @@ simple_sac_test(_) ->
107110
?assertEqual([consumer(ConnectionPid, 1, true),
108111
consumer(ConnectionPid, 2, false)],
109112
Consumers4),
110-
assertSendMessageEffect(ConnectionPid, 1, true, Effects4),
113+
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
111114

112115
Command4 =
113116
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
@@ -116,7 +119,7 @@ simple_sac_test(_) ->
116119
ok, Effects5} =
117120
rabbit_stream_sac_coordinator:apply(Command4, State4),
118121
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
119-
assertSendMessageEffect(ConnectionPid, 2, true, Effects5),
122+
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
120123

121124
Command5 =
122125
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
@@ -141,7 +144,7 @@ super_stream_partition_sac_test(_) ->
141144
rabbit_stream_sac_coordinator:apply(Command0, State0),
142145
?assert(Active1),
143146
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
144-
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
147+
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
145148

146149
Command1 =
147150
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
@@ -155,7 +158,7 @@ super_stream_partition_sac_test(_) ->
155158
?assertEqual([consumer(ConnectionPid, 0, false),
156159
consumer(ConnectionPid, 1, false)],
157160
Consumers2),
158-
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Effects2),
161+
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2),
159162

160163
Command2 = activate_consumer_command(Stream, ConsumerName),
161164
{#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
@@ -167,7 +170,7 @@ super_stream_partition_sac_test(_) ->
167170
?assertEqual([consumer(ConnectionPid, 0, false),
168171
consumer(ConnectionPid, 1, true)],
169172
Consumers3),
170-
assertSendMessageEffect(ConnectionPid, 1, true, Effects3),
173+
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
171174

172175
Command3 =
173176
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
@@ -197,7 +200,7 @@ super_stream_partition_sac_test(_) ->
197200
consumer(ConnectionPid, 2, false)],
198201
Consumers5),
199202

200-
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Effects5),
203+
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5),
201204

202205
Command5 = activate_consumer_command(Stream, ConsumerName),
203206
{#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} =
@@ -208,7 +211,7 @@ super_stream_partition_sac_test(_) ->
208211
?assertEqual([consumer(ConnectionPid, 1, false),
209212
consumer(ConnectionPid, 2, true)],
210213
Consumers6),
211-
assertSendMessageEffect(ConnectionPid, 2, true, Effects6),
214+
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
212215

213216
Command6 =
214217
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
@@ -310,7 +313,9 @@ ensure_monitors_test(_) ->
310313
ok.
311314

312315
handle_connection_down_test(_) ->
313-
GroupId = {<<"/">>, <<"stream">>, <<"app">>},
316+
Stream = <<"stream">>,
317+
ConsumerName = <<"app">>,
318+
GroupId = {<<"/">>, Stream, ConsumerName},
314319
Pid0 = self(),
315320
Pid1 = spawn(fun() -> ok end),
316321
Group =
@@ -326,7 +331,7 @@ handle_connection_down_test(_) ->
326331
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
327332
assertSize(1, PidsGroups1),
328333
assertSize(1, maps:get(Pid1, PidsGroups1)),
329-
assertSendMessageEffect(Pid1, 1, true, Effects1),
334+
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
330335
?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
331336
Groups1),
332337
{#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
@@ -397,22 +402,28 @@ activate_consumer_command(Stream, ConsumerName) ->
397402
stream = Stream,
398403
consumer_name = ConsumerName}.
399404

400-
assertSendMessageEffect(Pid, SubId, Active, [Effect]) ->
405+
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
401406
?assertEqual({mod_call,
402407
rabbit_stream_sac_coordinator,
403408
send_message,
404409
[Pid,
405410
{sac,
406-
{{subscription_id, SubId}, {active, Active},
407-
{extra, []}}}]},
411+
#{subscription_id => SubId,
412+
stream => Stream,
413+
consumer_name => ConsumerName,
414+
active => Active}
415+
}]},
408416
Effect).
409417

410-
assertSendMessageSteppingDownEffect(Pid, SubId, [Effect]) ->
418+
assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) ->
411419
?assertEqual({mod_call,
412420
rabbit_stream_sac_coordinator,
413421
send_message,
414422
[Pid,
415423
{sac,
416-
{{subscription_id, SubId}, {active, false},
417-
{extra, [{stepping_down, true}]}}}]},
424+
#{subscription_id => SubId,
425+
stream => Stream,
426+
consumer_name => ConsumerName,
427+
active => false,
428+
stepping_down => true}}]},
418429
Effect).

0 commit comments

Comments
 (0)