@@ -253,7 +253,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
253
253
of
254
254
{value , Consumer } ->
255
255
G1 = remove_from_group (Consumer , Group0 ),
256
- handle_consumer_removal (G1 , Consumer );
256
+ handle_consumer_removal (G1 , Consumer , Stream , ConsumerName );
257
257
false ->
258
258
{Group0 , []}
259
259
end ,
@@ -269,19 +269,28 @@ apply(#command_activate_consumer{vhost = VirtualHost,
269
269
stream = Stream ,
270
270
consumer_name = ConsumerName },
271
271
#? MODULE {groups = StreamGroups0 } = State0 ) ->
272
+ rabbit_log :debug (" Activating consumer on ~tp , group ~p " ,
273
+ [Stream , ConsumerName ]),
272
274
{G , Eff } =
273
275
case lookup_group (VirtualHost , Stream , ConsumerName , StreamGroups0 ) of
274
276
undefined ->
277
+ <<<<<<< HEAD
275
278
rabbit_log :warning (" trying to activate consumer in group ~p , but "
279
+ =======
280
+ rabbit_log :warning (" Trying to activate consumer in group ~tp , but "
281
+ >>>>>>> 221 f10d2d9 (Unblock group of consumers on super stream partition )
276
282
" the group does not longer exist" ,
277
283
[{VirtualHost , Stream , ConsumerName }]),
278
284
{undefined , []};
279
285
Group ->
280
286
# consumer {pid = Pid , subscription_id = SubId } =
281
287
evaluate_active_consumer (Group ),
288
+ rabbit_log :debug (" New active consumer on ~tp , group ~tp " ++
289
+ " is ~tp from ~tp " ,
290
+ [Stream , ConsumerName , SubId , Pid ]),
282
291
Group1 =
283
292
update_consumer_state_in_group (Group , Pid , SubId , true ),
284
- {Group1 , [notify_consumer_effect (Pid , SubId , true )]}
293
+ {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
285
294
end ,
286
295
StreamGroups1 =
287
296
update_groups (VirtualHost , Stream , ConsumerName , G , StreamGroups0 ),
@@ -521,7 +530,8 @@ do_register_consumer(VirtualHost,
521
530
Effects =
522
531
case Active of
523
532
true ->
524
- [notify_consumer_effect (ConnectionPid , SubscriptionId , Active )];
533
+ [notify_consumer_effect (ConnectionPid , SubscriptionId ,
534
+ Stream , ConsumerName , Active )];
525
535
_ ->
526
536
[]
527
537
end ,
@@ -549,7 +559,8 @@ do_register_consumer(VirtualHost,
549
559
active = true },
550
560
G1 = add_to_group (Consumer0 , Group0 ),
551
561
{G1 ,
552
- [notify_consumer_effect (ConnectionPid , SubscriptionId , true )]};
562
+ [notify_consumer_effect (ConnectionPid , SubscriptionId ,
563
+ Stream , ConsumerName , true )]};
553
564
_G ->
554
565
% % whatever the current state is, the newcomer will be passive
555
566
Consumer0 =
@@ -568,18 +579,28 @@ do_register_consumer(VirtualHost,
568
579
% % the current active stays the same
569
580
{G1 , []};
570
581
_ ->
582
+ rabbit_log :debug (" SAC consumer registration: " ++
583
+ " active consumer change on stream ~tp , group ~tp . " ++
584
+ " Notifying ~tp from ~tp it is no longer active." ,
585
+ [Stream , ConsumerName , ActSubId , ActPid ]),
571
586
% % there's a change, telling the active it's not longer active
572
587
{update_consumer_state_in_group (G1 ,
573
588
ActPid ,
574
589
ActSubId ,
575
590
false ),
576
591
[notify_consumer_effect (ActPid ,
577
592
ActSubId ,
593
+ Stream ,
594
+ ConsumerName ,
578
595
false ,
579
596
true )]}
580
597
end ;
581
598
false ->
582
- % % no active consumer in the (non-empty) group, we are waiting for the reply of a former active
599
+ rabbit_log :debug (" SAC consumer registration: no active consumer on stream ~tp , group ~tp . " ++
600
+ " Likely waiting for a response from former active consumer." ,
601
+ [Stream , ConsumerName ]),
602
+ % % no active consumer in the (non-empty) group,
603
+ % % we are waiting for the reply of a former active
583
604
{G1 , []}
584
605
end
585
606
end ,
@@ -593,27 +614,27 @@ do_register_consumer(VirtualHost,
593
614
lookup_consumer (ConnectionPid , SubscriptionId , Group1 ),
594
615
{State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects }.
595
616
596
- handle_consumer_removal (# group {consumers = []} = G , _ ) ->
617
+ handle_consumer_removal (# group {consumers = []} = G , _ , _ , _ ) ->
597
618
{G , []};
598
619
handle_consumer_removal (# group {partition_index = - 1 } = Group0 ,
599
- Consumer ) ->
620
+ Consumer , Stream , ConsumerName ) ->
600
621
case Consumer of
601
622
# consumer {active = true } ->
602
623
% % this is the active consumer we remove, computing the new one
603
624
Group1 = compute_active_consumer (Group0 ),
604
625
case lookup_active_consumer (Group1 ) of
605
626
{value , # consumer {pid = Pid , subscription_id = SubId }} ->
606
627
% % creating the side effect to notify the new active consumer
607
- {Group1 , [notify_consumer_effect (Pid , SubId , true )]};
628
+ {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]};
608
629
_ ->
609
630
% % no active consumer found in the group, nothing to do
610
631
{Group1 , []}
611
632
end ;
612
633
# consumer {active = false } ->
613
- % % not the active consumer, nothing to do."),
634
+ % % not the active consumer, nothing to do.
614
635
{Group0 , []}
615
636
end ;
616
- handle_consumer_removal (Group0 , Consumer ) ->
637
+ handle_consumer_removal (Group0 , Consumer , Stream , ConsumerName ) ->
617
638
case lookup_active_consumer (Group0 ) of
618
639
{value ,
619
640
# consumer {pid = ActPid , subscription_id = ActSubId } =
@@ -623,40 +644,81 @@ handle_consumer_removal(Group0, Consumer) ->
623
644
% % the current active stays the same
624
645
{Group0 , []};
625
646
_ ->
647
+ rabbit_log :debug (" SAC consumer removal: " ++
648
+ " active consumer change on stream ~tp , group ~tp . " ++
649
+ " Notifying ~tp from ~tp it is no longer active." ,
650
+ [Stream , ConsumerName , ActSubId , ActPid ]),
651
+
626
652
% % there's a change, telling the active it's not longer active
627
653
{update_consumer_state_in_group (Group0 ,
628
654
ActPid ,
629
655
ActSubId ,
630
656
false ),
631
- [notify_consumer_effect (ActPid , ActSubId , false , true )]}
657
+ [notify_consumer_effect (ActPid , ActSubId ,
658
+ Stream , ConsumerName , false , true )]}
632
659
end ;
633
660
false ->
634
661
case Consumer # consumer .active of
635
662
true ->
636
663
% % the active one is going away, picking a new one
637
664
# consumer {pid = P , subscription_id = SID } =
638
665
evaluate_active_consumer (Group0 ),
666
+ rabbit_log :debug (" SAC consumer removal: " ++
667
+ " active consumer change on stream ~tp , group ~tp . " ++
668
+ " Notifying ~tp from ~tp it is the new active consumer." ,
669
+ [Stream , ConsumerName , SID , P ]),
639
670
{update_consumer_state_in_group (Group0 , P , SID , true ),
640
- [notify_consumer_effect (P , SID , true )]};
671
+ [notify_consumer_effect (P , SID ,
672
+ Stream , ConsumerName , true )]};
641
673
false ->
642
- % % no active consumer in the (non-empty) group, we are waiting for the reply of a former active
674
+ rabbit_log :debug (" SAC consumer removal: no active consumer on stream ~tp , group ~tp . " ++
675
+ " Likely waiting for a response from former active consumer." ,
676
+ [Stream , ConsumerName ]),
677
+ % % no active consumer in the (non-empty) group,
678
+ % % we are waiting for the reply of a former active
643
679
{Group0 , []}
644
680
end
645
681
end .
646
682
647
- notify_consumer_effect (Pid , SubId , Active ) ->
648
- notify_consumer_effect (Pid , SubId , Active , false ).
683
+ message_type () ->
684
+ case has_unblock_group_support () of
685
+ true ->
686
+ map ;
687
+ false ->
688
+ tuple
689
+ end .
690
+
691
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active ) ->
692
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , false ).
693
+
694
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , SteppingDown ) ->
695
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , SteppingDown , message_type ()).
649
696
650
- notify_consumer_effect (Pid , SubId , Active , false = _SteppingDown ) ->
697
+ notify_consumer_effect (Pid , SubId , _Stream , _Name , Active , false = _SteppingDown , tuple ) ->
651
698
mod_call_effect (Pid ,
652
699
{sac ,
653
- {{subscription_id , SubId }, {active , Active },
700
+ {{subscription_id , SubId },
701
+ {active , Active },
654
702
{extra , []}}});
655
- notify_consumer_effect (Pid , SubId , Active , true = _SteppingDown ) ->
703
+ notify_consumer_effect (Pid , SubId , _Stream , _Name , Active , true = _SteppingDown , tuple ) ->
656
704
mod_call_effect (Pid ,
657
705
{sac ,
658
- {{subscription_id , SubId }, {active , Active },
659
- {extra , [{stepping_down , true }]}}}).
706
+ {{subscription_id , SubId },
707
+ {active , Active },
708
+ {extra , [{stepping_down , true }]}}});
709
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , false = _SteppingDown , map ) ->
710
+ mod_call_effect (Pid ,
711
+ {sac , #{subscription_id => SubId ,
712
+ stream => Stream ,
713
+ consumer_name => Name ,
714
+ active => Active }});
715
+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , true = _SteppingDown , map ) ->
716
+ mod_call_effect (Pid ,
717
+ {sac , #{subscription_id => SubId ,
718
+ stream => Stream ,
719
+ consumer_name => Name ,
720
+ active => Active ,
721
+ stepping_down => true }}).
660
722
661
723
maybe_create_group (VirtualHost ,
662
724
Stream ,
@@ -766,5 +828,10 @@ send_message(ConnectionPid, Msg) ->
766
828
ConnectionPid ! Msg ,
767
829
ok .
768
830
831
+ <<<<<<< HEAD
769
832
is_ff_enabled () ->
770
833
rabbit_feature_flags :is_enabled (stream_single_active_consumer ).
834
+ =======
835
+ has_unblock_group_support () ->
836
+ rabbit_feature_flags :is_enabled (stream_sac_coordinator_unblock_group ).
837
+ >>>>>>> 221 f10d2d9 (Unblock group of consumers on super stream partition )
0 commit comments