diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index f5361a74bc9f..49323983e040 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -800,6 +800,9 @@ rabbitmq_suite( deps = [ "//deps/rabbit_common:erlang_app", ], + runtime_deps = [ + "@meck//:erlang_app", + ], ) rabbitmq_integration_suite( diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 56d328d71f91..34e00e8aa156 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -104,3 +104,12 @@ stability => stable, depends_on => [stream_queue] }}). + + +-rabbit_feature_flag( + {stream_sac_coordinator_unblock_group, + #{desc => "Bug fix to unblock a group of consumers in a super stream partition", + doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743", + stability => stable, + depends_on => [stream_single_active_consumer] + }}). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index c761b9fa6008..b637845cbd61 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -231,7 +231,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost, of {value, Consumer} -> G1 = remove_from_group(Consumer, Group0), - handle_consumer_removal(G1, Consumer); + handle_consumer_removal(G1, Consumer, Stream, ConsumerName); false -> {Group0, []} end, @@ -247,19 +247,24 @@ apply(#command_activate_consumer{vhost = VirtualHost, stream = Stream, consumer_name = ConsumerName}, #?MODULE{groups = StreamGroups0} = State0) -> + rabbit_log:debug("Activating consumer on ~tp, group ~p", + [Stream, ConsumerName]), {G, Eff} = case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of undefined -> - rabbit_log:warning("trying to activate consumer in group ~tp, but " + rabbit_log:warning("Trying to activate consumer in group ~tp, but " "the group does not longer exist", [{VirtualHost, Stream, ConsumerName}]), {undefined, []}; Group -> #consumer{pid = Pid, subscription_id = SubId} = evaluate_active_consumer(Group), + rabbit_log:debug("New active consumer on ~tp, group ~tp " ++ + "is ~tp from ~tp", + [Stream, ConsumerName, SubId, Pid]), Group1 = update_consumer_state_in_group(Group, Pid, SubId, true), - {Group1, [notify_consumer_effect(Pid, SubId, true)]} + {Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]} end, StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0), @@ -499,7 +504,8 @@ do_register_consumer(VirtualHost, Effects = case Active of true -> - [notify_consumer_effect(ConnectionPid, SubscriptionId, Active)]; + [notify_consumer_effect(ConnectionPid, SubscriptionId, + Stream, ConsumerName, Active)]; _ -> [] end, @@ -527,7 +533,8 @@ do_register_consumer(VirtualHost, active = true}, G1 = add_to_group(Consumer0, Group0), {G1, - [notify_consumer_effect(ConnectionPid, SubscriptionId, true)]}; + [notify_consumer_effect(ConnectionPid, SubscriptionId, + Stream, ConsumerName, true)]}; _G -> %% whatever the current state is, the newcomer will be passive Consumer0 = @@ -546,6 +553,10 @@ do_register_consumer(VirtualHost, %% the current active stays the same {G1, []}; _ -> + rabbit_log:debug("SAC consumer registration: " ++ + "active consumer change on stream ~tp, group ~tp. " ++ + "Notifying ~tp from ~tp it is no longer active.", + [Stream, ConsumerName, ActSubId, ActPid]), %% there's a change, telling the active it's not longer active {update_consumer_state_in_group(G1, ActPid, @@ -553,11 +564,17 @@ do_register_consumer(VirtualHost, false), [notify_consumer_effect(ActPid, ActSubId, + Stream, + ConsumerName, false, true)]} end; false -> - %% no active consumer in the (non-empty) group, we are waiting for the reply of a former active + rabbit_log:debug("SAC consumer registration: no active consumer on stream ~tp, group ~tp. " ++ + "Likely waiting for a response from former active consumer.", + [Stream, ConsumerName]), + %% no active consumer in the (non-empty) group, + %% we are waiting for the reply of a former active {G1, []} end end, @@ -571,10 +588,10 @@ do_register_consumer(VirtualHost, lookup_consumer(ConnectionPid, SubscriptionId, Group1), {State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}. -handle_consumer_removal(#group{consumers = []} = G, _) -> +handle_consumer_removal(#group{consumers = []} = G, _, _, _) -> {G, []}; handle_consumer_removal(#group{partition_index = -1} = Group0, - Consumer) -> + Consumer, Stream, ConsumerName) -> case Consumer of #consumer{active = true} -> %% this is the active consumer we remove, computing the new one @@ -582,16 +599,16 @@ handle_consumer_removal(#group{partition_index = -1} = Group0, case lookup_active_consumer(Group1) of {value, #consumer{pid = Pid, subscription_id = SubId}} -> %% creating the side effect to notify the new active consumer - {Group1, [notify_consumer_effect(Pid, SubId, true)]}; + {Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}; _ -> %% no active consumer found in the group, nothing to do {Group1, []} end; #consumer{active = false} -> - %% not the active consumer, nothing to do."), + %% not the active consumer, nothing to do. {Group0, []} end; -handle_consumer_removal(Group0, Consumer) -> +handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) -> case lookup_active_consumer(Group0) of {value, #consumer{pid = ActPid, subscription_id = ActSubId} = @@ -601,12 +618,18 @@ handle_consumer_removal(Group0, Consumer) -> %% the current active stays the same {Group0, []}; _ -> + rabbit_log:debug("SAC consumer removal: " ++ + "active consumer change on stream ~tp, group ~tp. " ++ + "Notifying ~tp from ~tp it is no longer active.", + [Stream, ConsumerName, ActSubId, ActPid]), + %% there's a change, telling the active it's not longer active {update_consumer_state_in_group(Group0, ActPid, ActSubId, false), - [notify_consumer_effect(ActPid, ActSubId, false, true)]} + [notify_consumer_effect(ActPid, ActSubId, + Stream, ConsumerName, false, true)]} end; false -> case Consumer#consumer.active of @@ -614,27 +637,62 @@ handle_consumer_removal(Group0, Consumer) -> %% the active one is going away, picking a new one #consumer{pid = P, subscription_id = SID} = evaluate_active_consumer(Group0), + rabbit_log:debug("SAC consumer removal: " ++ + "active consumer change on stream ~tp, group ~tp. " ++ + "Notifying ~tp from ~tp it is the new active consumer.", + [Stream, ConsumerName, SID, P]), {update_consumer_state_in_group(Group0, P, SID, true), - [notify_consumer_effect(P, SID, true)]}; + [notify_consumer_effect(P, SID, + Stream, ConsumerName, true)]}; false -> - %% no active consumer in the (non-empty) group, we are waiting for the reply of a former active + rabbit_log:debug("SAC consumer removal: no active consumer on stream ~tp, group ~tp. " ++ + "Likely waiting for a response from former active consumer.", + [Stream, ConsumerName]), + %% no active consumer in the (non-empty) group, + %% we are waiting for the reply of a former active {Group0, []} end end. -notify_consumer_effect(Pid, SubId, Active) -> - notify_consumer_effect(Pid, SubId, Active, false). +message_type() -> + case has_unblock_group_support() of + true -> + map; + false -> + tuple + end. + +notify_consumer_effect(Pid, SubId, Stream, Name, Active) -> + notify_consumer_effect(Pid, SubId, Stream, Name, Active, false). -notify_consumer_effect(Pid, SubId, Active, false = _SteppingDown) -> +notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) -> + notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()). + +notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) -> mod_call_effect(Pid, {sac, - {{subscription_id, SubId}, {active, Active}, + {{subscription_id, SubId}, + {active, Active}, {extra, []}}}); -notify_consumer_effect(Pid, SubId, Active, true = _SteppingDown) -> +notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) -> mod_call_effect(Pid, {sac, - {{subscription_id, SubId}, {active, Active}, - {extra, [{stepping_down, true}]}}}). + {{subscription_id, SubId}, + {active, Active}, + {extra, [{stepping_down, true}]}}}); +notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) -> + mod_call_effect(Pid, + {sac, #{subscription_id => SubId, + stream => Stream, + consumer_name => Name, + active => Active}}); +notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = _SteppingDown, map) -> + mod_call_effect(Pid, + {sac, #{subscription_id => SubId, + stream => Stream, + consumer_name => Name, + active => Active, + stepping_down => true}}). maybe_create_group(VirtualHost, Stream, @@ -743,3 +801,6 @@ mod_call_effect(Pid, Msg) -> send_message(ConnectionPid, Msg) -> ConnectionPid ! Msg, ok. + +has_unblock_group_support() -> + rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group). diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index bd82ed681f01..5dd546b2deb9 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -52,9 +52,12 @@ end_per_group(_Group, _Config) -> ok. init_per_testcase(_TestCase, Config) -> + ok = meck:new(rabbit_feature_flags), + meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end), Config. end_per_testcase(_TestCase, _Config) -> + meck:unload(), ok. simple_sac_test(_) -> @@ -71,7 +74,7 @@ simple_sac_test(_) -> rabbit_stream_sac_coordinator:apply(Command0, State0), ?assert(Active1), ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1), - assertSendMessageEffect(ConnectionPid, 0, true, Effects1), + assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1), Command1 = register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1), @@ -107,7 +110,7 @@ simple_sac_test(_) -> ?assertEqual([consumer(ConnectionPid, 1, true), consumer(ConnectionPid, 2, false)], Consumers4), - assertSendMessageEffect(ConnectionPid, 1, true, Effects4), + assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4), Command4 = unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1), @@ -116,7 +119,7 @@ simple_sac_test(_) -> ok, Effects5} = rabbit_stream_sac_coordinator:apply(Command4, State4), ?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5), - assertSendMessageEffect(ConnectionPid, 2, true, Effects5), + assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5), Command5 = unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2), @@ -141,7 +144,7 @@ super_stream_partition_sac_test(_) -> rabbit_stream_sac_coordinator:apply(Command0, State0), ?assert(Active1), ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1), - assertSendMessageEffect(ConnectionPid, 0, true, Effects1), + assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1), Command1 = register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1), @@ -155,7 +158,7 @@ super_stream_partition_sac_test(_) -> ?assertEqual([consumer(ConnectionPid, 0, false), consumer(ConnectionPid, 1, false)], Consumers2), - assertSendMessageSteppingDownEffect(ConnectionPid, 0, Effects2), + assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2), Command2 = activate_consumer_command(Stream, ConsumerName), {#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} = @@ -167,7 +170,7 @@ super_stream_partition_sac_test(_) -> ?assertEqual([consumer(ConnectionPid, 0, false), consumer(ConnectionPid, 1, true)], Consumers3), - assertSendMessageEffect(ConnectionPid, 1, true, Effects3), + assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3), Command3 = register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2), @@ -197,7 +200,7 @@ super_stream_partition_sac_test(_) -> consumer(ConnectionPid, 2, false)], Consumers5), - assertSendMessageSteppingDownEffect(ConnectionPid, 1, Effects5), + assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5), Command5 = activate_consumer_command(Stream, ConsumerName), {#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} = @@ -208,7 +211,7 @@ super_stream_partition_sac_test(_) -> ?assertEqual([consumer(ConnectionPid, 1, false), consumer(ConnectionPid, 2, true)], Consumers6), - assertSendMessageEffect(ConnectionPid, 2, true, Effects6), + assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6), Command6 = unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1), @@ -310,7 +313,9 @@ ensure_monitors_test(_) -> ok. handle_connection_down_test(_) -> - GroupId = {<<"/">>, <<"stream">>, <<"app">>}, + Stream = <<"stream">>, + ConsumerName = <<"app">>, + GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), Pid1 = spawn(fun() -> ok end), Group = @@ -326,7 +331,7 @@ handle_connection_down_test(_) -> rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0), assertSize(1, PidsGroups1), assertSize(1, maps:get(Pid1, PidsGroups1)), - assertSendMessageEffect(Pid1, 1, true, Effects1), + assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1), ?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])}, Groups1), {#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2, @@ -397,22 +402,28 @@ activate_consumer_command(Stream, ConsumerName) -> stream = Stream, consumer_name = ConsumerName}. -assertSendMessageEffect(Pid, SubId, Active, [Effect]) -> +assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) -> ?assertEqual({mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, {sac, - {{subscription_id, SubId}, {active, Active}, - {extra, []}}}]}, + #{subscription_id => SubId, + stream => Stream, + consumer_name => ConsumerName, + active => Active} + }]}, Effect). -assertSendMessageSteppingDownEffect(Pid, SubId, [Effect]) -> +assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) -> ?assertEqual({mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, {sac, - {{subscription_id, SubId}, {active, false}, - {extra, [{stepping_down, true}]}}}]}, + #{subscription_id => SubId, + stream => Stream, + consumer_name => ConsumerName, + active => false, + stepping_down => true}}]}, Effect). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index d9209b7babf1..b163a48ee00a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -47,6 +47,9 @@ send_limit :: non_neg_integer(), log :: undefined | osiris_log:state(), last_listener_offset = undefined :: undefined | osiris:offset()}). +-record(request, + {start :: integer(), + content :: term()}). -record(stream_connection_state, {data :: rabbit_stream_core:state(), blocked :: boolean(), consumers :: #{subscription_id() => #consumer{}}}). @@ -89,8 +92,10 @@ transport :: tcp | ssl, proxy_socket :: undefined | ranch_transport:socket(), correlation_id_sequence :: integer(), - outstanding_requests :: #{integer() => term()}, - deliver_version :: rabbit_stream_core:command_version()}). + outstanding_requests :: #{integer() => #request{}}, + deliver_version :: rabbit_stream_core:command_version(), + request_timeout :: pos_integer(), + outstanding_requests_timer :: undefined | erlang:reference()}). -record(configuration, {initial_credits :: integer(), credits_required_for_unblocking :: integer(), @@ -223,6 +228,8 @@ init([KeepaliveSup, socket_op(Sock, fun(S) -> rabbit_net:socket_ends(S, inbound) end), DeliverVersion = ?VERSION_1, + RequestTimeout = application:get_env(rabbitmq_stream, + request_timeout, 60_000), Connection = #stream_connection{name = rabbit_data_coercion:to_binary(ConnStr), @@ -249,6 +256,7 @@ init([KeepaliveSup, rabbit_net:maybe_get_proxy_socket(Sock), correlation_id_sequence = 0, outstanding_requests = #{}, + request_timeout = RequestTimeout, deliver_version = DeliverVersion}, State = #stream_connection_state{consumers = #{}, @@ -654,11 +662,14 @@ augment_infos_with_user_provided_connection_name(Infos, end. close(Transport, - #stream_connection{socket = S, virtual_host = VirtualHost}, + #stream_connection{socket = S, virtual_host = VirtualHost, + outstanding_requests = Requests}, #stream_connection_state{consumers = Consumers}) -> [begin + %% we discard the result (updated requests) because they are no longer used _ = maybe_unregister_consumer(VirtualHost, Consumer, - single_active_consumer(Properties)), + single_active_consumer(Properties), + Requests), case Log of undefined -> ok; %% segment may not be defined on subscription (single active consumer) @@ -784,15 +795,36 @@ open(info, {OK, S, Data}, connection_state = State2}} end; open(info, - {sac, {{subscription_id, SubId}, {active, Active}, {extra, Extra}}}, + {sac, {{subscription_id, SubId}, + {active, Active}, {extra, Extra}}}, + State) -> + Msg0 = #{subscription_id => SubId, + active => Active}, + Msg1 = case Extra of + [{stepping_down, true}] -> + Msg0#{stepping_down => true}; + _ -> + Msg0 + end, + open(info, {sac, Msg1}, State); +open(info, + {sac, #{subscription_id := SubId, + active := Active} = Msg}, #statem_data{transport = Transport, - connection = Connection0, + connection = #stream_connection{virtual_host = VirtualHost} = Connection0, connection_state = ConnState0} = State) -> - rabbit_log:debug("Subscription ~tp instructed to become active: " - "~tp", - [SubId, Active]), #stream_connection_state{consumers = Consumers0} = ConnState0, + Stream = case Msg of + #{stream := S} -> + S; + _ -> + stream_from_consumers(SubId, Consumers0) + end, + + rabbit_log:debug("Subscription ~tp on ~tp instructed to become active: " + "~tp", + [SubId, Stream, Active]), {Connection1, ConnState1} = case Consumers0 of #{SubId := @@ -827,10 +859,9 @@ open(info, Conn1 = maybe_send_consumer_update(Transport, Connection0, - SubId, + Consumer1, Active, - true, - Extra), + Msg), {Conn1, ConnState0#stream_connection_state{consumers = Consumers0#{SubId @@ -843,6 +874,21 @@ open(info, {Connection0, ConnState0} end; _ -> + rabbit_log:debug("Subscription ~tp on ~tp has been deleted.", + [SubId, Stream]), + rabbit_log:debug("Active ~tp, message ~tp", [Active, Msg]), + case {Active, Msg} of + {false, #{stepping_down := true, + stream := St, + consumer_name := ConsumerName}} -> + rabbit_log:debug("Former active consumer gone, activating consumer " ++ + "on stream ~tp, group ~tp", [St, ConsumerName]), + _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + St, + ConsumerName); + _ -> + ok + end, {Connection0, ConnState0} end, {keep_state, @@ -924,6 +970,37 @@ open(info, emit_stats, StatemData) -> Connection1 = emit_stats(Connection, State), {keep_state, StatemData#statem_data{connection = Connection1}}; +open(info, check_outstanding_requests, + #statem_data{connection = #stream_connection{outstanding_requests = Requests, + request_timeout = Timeout} = Connection0} = + StatemData) -> + Time = erlang:monotonic_time(millisecond), + rabbit_log:debug("Checking outstanding requests at ~tp: ~tp", [Time, Requests]), + HasTimedOut = maps:fold(fun(_, #request{}, true) -> + true; + (K, #request{content = Ctnt, start = Start}, false) -> + case (Time - Start) > Timeout of + true -> + rabbit_log:debug("Request ~tp with content ~tp has timed out", + [K, Ctnt]), + + true; + false -> + false + end + end, false, Requests), + case HasTimedOut of + true -> + rabbit_log_connection:info("Forcing stream connection ~tp closing: request to client timed out", + [self()]), + _ = demonitor_all_streams(Connection0), + {stop, {request_timeout, <<"Request timeout">>}}; + false -> + Connection1 = ensure_outstanding_requests_timer( + Connection0#stream_connection{outstanding_requests_timer = undefined} + ), + {keep_state, StatemData#statem_data{connection = Connection1}} + end; open(info, {shutdown, Explanation} = Reason, #statem_data{connection = Connection}) -> %% rabbitmq_management or rabbitmq_stream_management plugin @@ -2493,10 +2570,11 @@ handle_frame_post_auth(Transport, [RC]) end, case maps:take(CorrelationId, Requests0) of - {{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} -> + {#request{content = #{subscription_id := SubscriptionId} = Msg}, Rs} -> + Stream = stream_from_consumers(SubscriptionId, Consumers), rabbit_log:debug("Received consumer update response for subscription " - "~tp", - [SubscriptionId]), + "~tp on stream ~tp, correlation ID ~tp", + [SubscriptionId, Stream, CorrelationId]), Consumers1 = case Consumers of #{SubscriptionId := @@ -2511,9 +2589,7 @@ handle_frame_post_auth(Transport, member_pid = LocalMemberPid, offset = - SubscriptionOffsetSpec, - stream = - Stream}} = + SubscriptionOffsetSpec}} = Consumer, OffsetSpec = @@ -2593,9 +2669,12 @@ handle_frame_post_auth(Transport, Properties}}} -> rabbit_log:debug("Not an active consumer"), - case Extra of - [{stepping_down, true}] -> + case Msg of + #{stepping_down := true} -> ConsumerName = consumer_name(Properties), + rabbit_log:debug("Subscription ~tp on stream ~tp, group ~tp " ++ + "has stepped down, activating consumer", + [SubscriptionId, Stream, ConsumerName]), _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, Stream, ConsumerName), @@ -2745,6 +2824,10 @@ init_reader(ConnectionTransport, [SubscriptionId, osiris_log:next_offset(Segment)]), Segment. + +single_active_consumer(#consumer{configuration = + #consumer_configuration{properties = Properties}}) -> + single_active_consumer(Properties); single_active_consumer(#{<<"single-active-consumer">> := <<"true">>}) -> true; @@ -2769,8 +2852,8 @@ maybe_dispatch_on_subscription(Transport, SendFileOct, false = _Sac) -> rabbit_log:debug("Distributing existing messages to subscription " - "~tp", - [SubscriptionId]), + "~tp on ~tp", + [SubscriptionId, Stream]), case send_chunks(DeliverVersion, Transport, ConsumerState, @@ -2792,9 +2875,9 @@ maybe_dispatch_on_subscription(Transport, ConsumerOffset = osiris_log:next_offset(Log1), ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1), - rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp " + rabbit_log:debug("Subscription ~tp on ~tp is now at offset ~tp with ~tp " "message(s) distributed after subscription", - [SubscriptionId, ConsumerOffset, + [SubscriptionId, Stream, ConsumerOffset, messages_consumed(ConsumerCounters1)]), rabbit_stream_metrics:consumer_created(self(), @@ -2859,39 +2942,62 @@ maybe_register_consumer(VirtualHost, SubscriptionId), Active. -%% NOTE: Never called with SAC = false, but adding an explicit type -%% still doesn't convince dialyzer. Keeping this clause commented out -%% instead of disabling some dialyzer checks for this function: -%% -%% maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) -> -%% Connection; maybe_send_consumer_update(Transport, - #stream_connection{socket = S, - correlation_id_sequence = - CorrIdSeq, - outstanding_requests = - OutstandingRequests0} = - Connection, - SubscriptionId, + Connection = #stream_connection{ + socket = S, + correlation_id_sequence = CorrIdSeq}, + Consumer, Active, - true = _Sac, - Extra) -> - rabbit_log:debug("SAC subscription ~tp, active = ~tp", - [SubscriptionId, Active]), - Frame = - rabbit_stream_core:frame({request, CorrIdSeq, - {consumer_update, SubscriptionId, Active}}), + Msg) -> + #consumer{configuration = + #consumer_configuration{subscription_id = SubscriptionId}} = Consumer, + Frame = rabbit_stream_core:frame({request, CorrIdSeq, + {consumer_update, SubscriptionId, Active}}), + + Connection1 = register_request(Connection, Msg), - OutstandingRequests1 = - maps:put(CorrIdSeq, - {{subscription_id, SubscriptionId}, {extra, Extra}}, - OutstandingRequests0), send(Transport, S, Frame), - Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1, - outstanding_requests = OutstandingRequests1}. + Connection1. -maybe_unregister_consumer(_, _, false = _Sac) -> - ok; +register_request(#stream_connection{outstanding_requests = Requests0, + correlation_id_sequence = CorrIdSeq} = C, + RequestContent) -> + rabbit_log:debug("Registering RPC request ~tp with correlation ID ~tp", + [RequestContent, CorrIdSeq]), + + Requests1 = maps:put(CorrIdSeq, request(RequestContent), Requests0), + + ensure_outstanding_requests_timer( + C#stream_connection{correlation_id_sequence = CorrIdSeq + 1, + outstanding_requests = Requests1}). + +request(Content) -> + #request{start = erlang:monotonic_time(millisecond), + content = Content}. + +ensure_outstanding_requests_timer(#stream_connection{ + outstanding_requests = Requests, + outstanding_requests_timer = undefined + } = C) when map_size(Requests) =:= 0 -> + C; +ensure_outstanding_requests_timer(#stream_connection{ + outstanding_requests = Requests, + outstanding_requests_timer = TRef + } = C) when map_size(Requests) =:= 0 -> + _ = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), + C#stream_connection{outstanding_requests_timer = undefined}; +ensure_outstanding_requests_timer(#stream_connection{ + outstanding_requests = Requests, + outstanding_requests_timer = undefined, + request_timeout = Timeout + } = C) when map_size(Requests) > 0 -> + TRef = erlang:send_after(Timeout, self(), check_outstanding_requests), + C#stream_connection{outstanding_requests_timer = TRef}; +ensure_outstanding_requests_timer(C) -> + C. + +maybe_unregister_consumer(_, _, false = _Sac, Requests) -> + Requests; maybe_unregister_consumer(VirtualHost, #consumer{configuration = #consumer_configuration{stream = Stream, @@ -2900,13 +3006,32 @@ maybe_unregister_consumer(VirtualHost, subscription_id = SubscriptionId}}, - true = _Sac) -> + true = _Sac, + Requests) -> ConsumerName = consumer_name(Properties), - rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost, - Stream, - ConsumerName, - self(), - SubscriptionId). + + Requests1 = maps:fold( + fun(_, #request{content = + #{active := false, + subscription_id := SubId, + stepping_down := true}}, Acc) when SubId =:= SubscriptionId -> + _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + Stream, + ConsumerName), + rabbit_log:debug("Outstanding SAC activation request for stream '~tp', " ++ + "group '~tp', sending activation.", + [Stream, ConsumerName]), + Acc; + (K, V, Acc) -> + Acc#{K => V} + end, maps:new(), Requests), + + _ = rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost, + Stream, + ConsumerName, + self(), + SubscriptionId), + Requests1. partition_index(VirtualHost, Stream, Properties) -> case Properties of @@ -2984,7 +3109,8 @@ clean_state_after_stream_deletion_or_failure(Stream, = PublisherToIds, stream_leaders = - Leaders} = + Leaders, + outstanding_requests = Requests0} = C0, #stream_connection_state{consumers = @@ -2994,20 +3120,24 @@ clean_state_after_stream_deletion_or_failure(Stream, case stream_has_subscriptions(Stream, C0) of true -> #{Stream := SubscriptionIds} = StreamSubscriptions, - _ = [begin - rabbit_stream_metrics:consumer_cancelled(self(), - stream_r(Stream, - C0), - SubId), - #{SubId := Consumer} = Consumers, - maybe_unregister_consumer(VirtualHost, Consumer, - single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)) - end - || SubId <- SubscriptionIds], + Requests1 = lists:foldl( + fun(SubId, Rqsts0) -> + rabbit_stream_metrics:consumer_cancelled(self(), + stream_r(Stream, + C0), + SubId), + #{SubId := Consumer} = Consumers, + Rqsts1 = maybe_unregister_consumer( + VirtualHost, Consumer, + single_active_consumer(Consumer), + Rqsts0), + Rqsts1 + end, Requests0, SubscriptionIds), {true, C0#stream_connection{stream_subscriptions = maps:remove(Stream, - StreamSubscriptions)}, + StreamSubscriptions), + outstanding_requests = Requests1}, S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}}; @@ -3084,6 +3214,7 @@ lookup_leader_from_manager(VirtualHost, Stream) -> remove_subscription(SubscriptionId, #stream_connection{virtual_host = VirtualHost, + outstanding_requests = Requests0, stream_subscriptions = StreamSubscriptions} = Connection, @@ -3092,6 +3223,8 @@ remove_subscription(SubscriptionId, #consumer{log = Log, configuration = #consumer_configuration{stream = Stream}} = Consumer, + rabbit_log:debug("Deleting subscription ~tp (stream ~tp)", + [SubscriptionId, Stream]), close_log(Log), #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, SubscriptionsForThisStream1 = @@ -3112,9 +3245,14 @@ remove_subscription(SubscriptionId, rabbit_stream_metrics:consumer_cancelled(self(), stream_r(Stream, Connection2), SubscriptionId), - _ = maybe_unregister_consumer(VirtualHost, Consumer, - single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties)), - {Connection2, State#stream_connection_state{consumers = Consumers1}}. + + Requests1 = maybe_unregister_consumer( + VirtualHost, Consumer, + single_active_consumer( + Consumer#consumer.configuration#consumer_configuration.properties), + Requests0), + {Connection2#stream_connection{outstanding_requests = Requests1}, + State#stream_connection_state{consumers = Consumers1}}. maybe_clean_connection_from_stream(Stream, #stream_connection{stream_leaders = @@ -3636,3 +3774,11 @@ close_log(Log) -> setopts(Transport, Sock, Opts) -> ok = Transport:setopts(Sock, Opts). +stream_from_consumers(SubId, Consumers) -> + case Consumers of + #{SubId := #consumer{configuration = #consumer_configuration{stream = S}}} -> + S; + _ -> + undefined + end. + diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 054f1e8beff6..c03d2a86d403 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -44,7 +44,8 @@ groups() -> unauthenticated_client_rejected_authenticating, timeout_authenticating, timeout_close_sent, - max_segment_size_bytes_validation]}, + max_segment_size_bytes_validation, + close_connection_on_consumer_update_timeout]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase {single_node_1, [], [test_global_counters]}, @@ -110,6 +111,26 @@ end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, + 0, + application, + set_env, + [rabbitmq_stream, request_timeout, 2000]), + rabbit_ct_helpers:testcase_started(Config, TestCase); +init_per_testcase(TestCase, Config) -> + rabbit_ct_helpers:testcase_started(Config, TestCase). + +end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, + 0, + application, + set_env, + [rabbitmq_stream, request_timeout, 60000]), + rabbit_ct_helpers:testcase_finished(Config, TestCase); +end_per_testcase(TestCase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, TestCase). + test_global_counters(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(gen_tcp, Stream, Config), @@ -350,6 +371,37 @@ max_segment_size_bytes_validation(Config) -> test_close(Transport, S, C3), ok. +close_connection_on_consumer_update_timeout(Config) -> + Transport = gen_tcp, + Port = get_stream_port(Config), + {ok, S} = + Transport:connect("localhost", Port, + [{active, false}, {mode, binary}]), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(Transport, S, C0), + C2 = test_authenticate(Transport, S, C1), + Stream = atom_to_binary(?FUNCTION_NAME, utf8), + C3 = test_create_stream(Transport, S, Stream, C2), + + SubId = 42, + C4 = test_subscribe(Transport, S, SubId, Stream, + #{<<"single-active-consumer">> => <<"true">>, + <<"name">> => <<"foo">>}, + C3), + {Cmd, _C5} = receive_commands(Transport, S, C4), + ?assertMatch({request, _, {consumer_update, SubId, true}}, Cmd), + closed = wait_for_socket_close(Transport, S, 10), + {ok, Sb} = + Transport:connect("localhost", Port, + [{active, false}, {mode, binary}]), + Cb0 = rabbit_stream_core:init(0), + Cb1 = test_peer_properties(Transport, Sb, Cb0), + Cb2 = test_authenticate(Transport, Sb, Cb1), + Cb3 = test_delete_stream(Transport, Sb, Stream, Cb2, false), + _Cb4 = test_close(Transport, Sb, Cb3), + closed = wait_for_socket_close(Transport, Sb, 10), + ok. + consumer_count(Config) -> ets_count(Config, ?TABLE_CONSUMER). @@ -525,12 +577,21 @@ test_create_stream(Transport, S, Stream, C0) -> C. test_delete_stream(Transport, S, Stream, C0) -> + test_delete_stream(Transport, S, Stream, C0, true). + +test_delete_stream(Transport, S, Stream, C0, false) -> + do_test_delete_stream(Transport, S, Stream, C0); +test_delete_stream(Transport, S, Stream, C0, true) -> + C1 = do_test_delete_stream(Transport, S, Stream, C0), + test_metadata_update_stream_deleted(Transport, S, Stream, C1). + +do_test_delete_stream(Transport, S, Stream, C0) -> DeleteStreamFrame = rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}), ok = Transport:send(S, DeleteStreamFrame), {Cmd, C1} = receive_commands(Transport, S, C0), ?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd), - test_metadata_update_stream_deleted(Transport, S, Stream, C1). + C1. test_metadata_update_stream_deleted(Transport, S, Stream, C0) -> {Meta, C1} = receive_commands(Transport, S, C0),