@@ -231,7 +231,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
231
231
of
232
232
{value , Consumer } ->
233
233
G1 = remove_from_group (Consumer , Group0 ),
234
- handle_consumer_removal (G1 , Consumer );
234
+ handle_consumer_removal (G1 , Consumer , Stream , ConsumerName );
235
235
false ->
236
236
{Group0 , []}
237
237
end ,
@@ -247,19 +247,24 @@ apply(#command_activate_consumer{vhost = VirtualHost,
247
247
stream = Stream ,
248
248
consumer_name = ConsumerName },
249
249
#? MODULE {groups = StreamGroups0 } = State0 ) ->
250
+ rabbit_log :debug (" Activating consumer on ~tp , group ~p " ,
251
+ [Stream , ConsumerName ]),
250
252
{G , Eff } =
251
253
case lookup_group (VirtualHost , Stream , ConsumerName , StreamGroups0 ) of
252
254
undefined ->
253
- rabbit_log :warning (" trying to activate consumer in group ~tp , but "
255
+ rabbit_log :warning (" Trying to activate consumer in group ~tp , but "
254
256
" the group does not longer exist" ,
255
257
[{VirtualHost , Stream , ConsumerName }]),
256
258
{undefined , []};
257
259
Group ->
258
260
# consumer {pid = Pid , subscription_id = SubId } =
259
261
evaluate_active_consumer (Group ),
262
+ rabbit_log :debug (" New active consumer on ~tp , group ~tp " ++
263
+ " is ~tp from ~tp " ,
264
+ [Stream , ConsumerName , SubId , Pid ]),
260
265
Group1 =
261
266
update_consumer_state_in_group (Group , Pid , SubId , true ),
262
- {Group1 , [notify_consumer_effect (Pid , SubId , true )]}
267
+ {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
263
268
end ,
264
269
StreamGroups1 =
265
270
update_groups (VirtualHost , Stream , ConsumerName , G , StreamGroups0 ),
@@ -499,7 +504,8 @@ do_register_consumer(VirtualHost,
499
504
Effects =
500
505
case Active of
501
506
true ->
502
- [notify_consumer_effect (ConnectionPid , SubscriptionId , Active )];
507
+ [notify_consumer_effect (ConnectionPid , SubscriptionId ,
508
+ Stream , ConsumerName , Active )];
503
509
_ ->
504
510
[]
505
511
end ,
@@ -527,7 +533,8 @@ do_register_consumer(VirtualHost,
527
533
active = true },
528
534
G1 = add_to_group (Consumer0 , Group0 ),
529
535
{G1 ,
530
- [notify_consumer_effect (ConnectionPid , SubscriptionId , true )]};
536
+ [notify_consumer_effect (ConnectionPid , SubscriptionId ,
537
+ Stream , ConsumerName , true )]};
531
538
_G ->
532
539
% % whatever the current state is, the newcomer will be passive
533
540
Consumer0 =
@@ -546,18 +553,28 @@ do_register_consumer(VirtualHost,
546
553
% % the current active stays the same
547
554
{G1 , []};
548
555
_ ->
556
+ rabbit_log :debug (" SAC consumer registration: " ++
557
+ " active consumer change on stream ~tp , group ~tp . " ++
558
+ " Notifying ~tp from ~tp it is no longer active." ,
559
+ [Stream , ConsumerName , ActSubId , ActPid ]),
549
560
% % there's a change, telling the active it's not longer active
550
561
{update_consumer_state_in_group (G1 ,
551
562
ActPid ,
552
563
ActSubId ,
553
564
false ),
554
565
[notify_consumer_effect (ActPid ,
555
566
ActSubId ,
567
+ Stream ,
568
+ ConsumerName ,
556
569
false ,
557
570
true )]}
558
571
end ;
559
572
false ->
560
- % % no active consumer in the (non-empty) group, we are waiting for the reply of a former active
573
+ rabbit_log :debug (" SAC consumer registration: no active consumer on stream ~tp , group ~tp . " ++
574
+ " Likely waiting for a response from former active consumer." ,
575
+ [Stream , ConsumerName ]),
576
+ % % no active consumer in the (non-empty) group,
577
+ % % we are waiting for the reply of a former active
561
578
{G1 , []}
562
579
end
563
580
end ,
@@ -571,27 +588,27 @@ do_register_consumer(VirtualHost,
571
588
lookup_consumer (ConnectionPid , SubscriptionId , Group1 ),
572
589
{State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects }.
573
590
574
- handle_consumer_removal (# group {consumers = []} = G , _ ) ->
591
+ handle_consumer_removal (# group {consumers = []} = G , _ , _ , _ ) ->
575
592
{G , []};
576
593
handle_consumer_removal (# group {partition_index = - 1 } = Group0 ,
577
- Consumer ) ->
594
+ Consumer , Stream , ConsumerName ) ->
578
595
case Consumer of
579
596
# consumer {active = true } ->
580
597
% % this is the active consumer we remove, computing the new one
581
598
Group1 = compute_active_consumer (Group0 ),
582
599
case lookup_active_consumer (Group1 ) of
583
600
{value , # consumer {pid = Pid , subscription_id = SubId }} ->
584
601
% % creating the side effect to notify the new active consumer
585
- {Group1 , [notify_consumer_effect (Pid , SubId , true )]};
602
+ {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]};
586
603
_ ->
587
604
% % no active consumer found in the group, nothing to do
588
605
{Group1 , []}
589
606
end ;
590
607
# consumer {active = false } ->
591
- % % not the active consumer, nothing to do."),
608
+ % % not the active consumer, nothing to do.
592
609
{Group0 , []}
593
610
end ;
594
- handle_consumer_removal (Group0 , Consumer ) ->
611
+ handle_consumer_removal (Group0 , Consumer , Stream , ConsumerName ) ->
595
612
case lookup_active_consumer (Group0 ) of
596
613
{value ,
597
614
# consumer {pid = ActPid , subscription_id = ActSubId } =
@@ -601,40 +618,81 @@ handle_consumer_removal(Group0, Consumer) ->
601
618
% % the current active stays the same
602
619
{Group0 , []};
603
620
_ ->
621
+ rabbit_log :debug (" SAC consumer removal: " ++
622
+ " active consumer change on stream ~tp , group ~tp . " ++
623
+ " Notifying ~tp from ~tp it is no longer active." ,
624
+ [Stream , ConsumerName , ActSubId , ActPid ]),
625
+
604
626
% % there's a change, telling the active it's not longer active
605
627
{update_consumer_state_in_group (Group0 ,
606
628
ActPid ,
607
629
ActSubId ,
608
630
false ),
609
- [notify_consumer_effect (ActPid , ActSubId , false , true )]}
631
+ [notify_consumer_effect (ActPid , ActSubId ,
632
+ Stream , ConsumerName , false , true )]}
610
633
end ;
611
634
false ->
612
635
case Consumer # consumer .active of
613
636
true ->
614
637
% % the active one is going away, picking a new one
615
638
# consumer {pid = P , subscription_id = SID } =
616
639
evaluate_active_consumer (Group0 ),
640
+ rabbit_log :debug (" SAC consumer removal: " ++
641
+ " active consumer change on stream ~tp , group ~tp . " ++
642
+ " Notifying ~tp from ~tp it is the new active consumer." ,
643
+ [Stream , ConsumerName , SID , P ]),
617
644
{update_consumer_state_in_group (Group0 , P , SID , true ),
618
- [notify_consumer_effect (P , SID , true )]};
645
+ [notify_consumer_effect (P , SID ,
646
+ Stream , ConsumerName , true )]};
619
647
false ->
620
- % % no active consumer in the (non-empty) group, we are waiting for the reply of a former active
648
+ rabbit_log :debug (" SAC consumer removal: no active consumer on stream ~tp , group ~tp . " ++
649
+ " Likely waiting for a response from former active consumer." ,
650
+ [Stream , ConsumerName ]),
651
+ % % no active consumer in the (non-empty) group,
652
+ % % we are waiting for the reply of a former active
621
653
{Group0 , []}
622
654
end
623
655
end .
624
656
625
- notify_consumer_effect (Pid , SubId , Active ) ->
626
- notify_consumer_effect (Pid , SubId , Active , false ).
657
+ message_type () ->
658
+ case has_unblock_group_support () of
659
+ true ->
660
+ map ;
661
+ false ->
662
+ tuple
663
+ end .
664
+
665
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active ) ->
666
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , false ).
627
667
628
- notify_consumer_effect (Pid , SubId , Active , false = _SteppingDown ) ->
668
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , SteppingDown ) ->
669
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , SteppingDown , message_type ()).
670
+
671
+ notify_consumer_effect (Pid , SubId , _Stream , _Name , Active , false = _SteppingDown , tuple ) ->
629
672
mod_call_effect (Pid ,
630
673
{sac ,
631
- {{subscription_id , SubId }, {active , Active },
674
+ {{subscription_id , SubId },
675
+ {active , Active },
632
676
{extra , []}}});
633
- notify_consumer_effect (Pid , SubId , Active , true = _SteppingDown ) ->
677
+ notify_consumer_effect (Pid , SubId , _Stream , _Name , Active , true = _SteppingDown , tuple ) ->
634
678
mod_call_effect (Pid ,
635
679
{sac ,
636
- {{subscription_id , SubId }, {active , Active },
637
- {extra , [{stepping_down , true }]}}}).
680
+ {{subscription_id , SubId },
681
+ {active , Active },
682
+ {extra , [{stepping_down , true }]}}});
683
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , false = _SteppingDown , map ) ->
684
+ mod_call_effect (Pid ,
685
+ {sac , #{subscription_id => SubId ,
686
+ stream => Stream ,
687
+ consumer_name => Name ,
688
+ active => Active }});
689
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , true = _SteppingDown , map ) ->
690
+ mod_call_effect (Pid ,
691
+ {sac , #{subscription_id => SubId ,
692
+ stream => Stream ,
693
+ consumer_name => Name ,
694
+ active => Active ,
695
+ stepping_down => true }}).
638
696
639
697
maybe_create_group (VirtualHost ,
640
698
Stream ,
@@ -743,3 +801,6 @@ mod_call_effect(Pid, Msg) ->
743
801
send_message (ConnectionPid , Msg ) ->
744
802
ConnectionPid ! Msg ,
745
803
ok .
804
+
805
+ has_unblock_group_support () ->
806
+ rabbit_feature_flags :is_enabled (stream_sac_coordinator_unblock_group ).
0 commit comments