Skip to content

Commit 1886cae

Browse files
authored
Merge pull request #13092 from rabbitmq/stream-consumer-cancel-event
Emit cancellation event only when stream consumer is cancelled
2 parents 9978212 + 69d0382 commit 1886cae

File tree

3 files changed

+23
-18
lines changed

3 files changed

+23
-18
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -976,11 +976,7 @@ close(#stream_client{readers = Readers,
976976
name = QName}) ->
977977
maps:foreach(fun (CTag, #stream{log = Log}) ->
978978
close_log(Log),
979-
rabbit_core_metrics:consumer_deleted(self(), CTag, QName),
980-
rabbit_event:notify(consumer_deleted,
981-
[{consumer_tag, CTag},
982-
{channel, self()},
983-
{queue, QName}])
979+
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
984980
end, Readers).
985981

986982
update(Q, State)

deps/rabbitmq_stream/src/rabbit_stream_metrics.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
-export([init/0]).
2323
-export([consumer_created/9,
2424
consumer_updated/9,
25-
consumer_cancelled/3]).
25+
consumer_cancelled/4]).
2626
-export([publisher_created/4,
2727
publisher_updated/7,
2828
publisher_deleted/3]).
@@ -104,16 +104,20 @@ consumer_updated(Connection,
104104

105105
ok.
106106

107-
consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
107+
consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
108108
ets:delete(?TABLE_CONSUMER,
109109
{StreamResource, Connection, SubscriptionId}),
110110
rabbit_global_counters:consumer_deleted(stream),
111111
rabbit_core_metrics:consumer_deleted(Connection,
112112
consumer_tag(SubscriptionId),
113113
StreamResource),
114-
rabbit_event:notify(consumer_deleted,
115-
[{consumer_tag, consumer_tag(SubscriptionId)},
116-
{channel, self()}, {queue, StreamResource}]),
114+
case Notify of
115+
true ->
116+
rabbit_event:notify(consumer_deleted,
117+
[{consumer_tag, consumer_tag(SubscriptionId)},
118+
{channel, self()}, {queue, StreamResource}]);
119+
_ -> ok
120+
end,
117121
ok.
118122

119123
publisher_created(Connection,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
%% The Original Code is RabbitMQ.
1010
%%
1111
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
12-
%% Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
12+
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1313
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
1414
%%
1515

@@ -2249,7 +2249,7 @@ handle_frame_post_auth(Transport,
22492249
{Connection, State};
22502250
true ->
22512251
{Connection1, State1} =
2252-
remove_subscription(SubscriptionId, Connection, State),
2252+
remove_subscription(SubscriptionId, Connection, State, true),
22532253
response_ok(Transport, Connection, unsubscribe, CorrelationId),
22542254
{Connection1, State1}
22552255
end;
@@ -3081,7 +3081,7 @@ evaluate_state_after_secret_update(Transport,
30813081
_ ->
30823082
{C1, S1} =
30833083
lists:foldl(fun(SubId, {Conn, St}) ->
3084-
remove_subscription(SubId, Conn, St)
3084+
remove_subscription(SubId, Conn, St, false)
30853085
end, {C0, S0}, Subs),
30863086
{Acc#{Str => ok}, C1, S1}
30873087
end
@@ -3216,7 +3216,8 @@ notify_connection_closed(#statem_data{connection =
32163216
ConnectionState}) ->
32173217
rabbit_core_metrics:connection_closed(self()),
32183218
[rabbit_stream_metrics:consumer_cancelled(self(),
3219-
stream_r(S, Connection), SubId)
3219+
stream_r(S, Connection),
3220+
SubId, false)
32203221
|| #consumer{configuration =
32213222
#consumer_configuration{stream = S,
32223223
subscription_id = SubId}}
@@ -3304,7 +3305,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33043305
rabbit_stream_metrics:consumer_cancelled(self(),
33053306
stream_r(Stream,
33063307
C0),
3307-
SubId),
3308+
SubId,
3309+
false),
33083310
maybe_unregister_consumer(
33093311
VirtualHost, Consumer,
33103312
single_active_consumer(Consumer),
@@ -3314,7 +3316,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33143316
rabbit_stream_metrics:consumer_cancelled(self(),
33153317
stream_r(Stream,
33163318
C0),
3317-
SubId),
3319+
SubId,
3320+
false),
33183321
maybe_unregister_consumer(
33193322
VirtualHost, Consumer,
33203323
single_active_consumer(Consumer),
@@ -3431,7 +3434,8 @@ remove_subscription(SubscriptionId,
34313434
stream_subscriptions =
34323435
StreamSubscriptions} =
34333436
Connection,
3434-
#stream_connection_state{consumers = Consumers} = State) ->
3437+
#stream_connection_state{consumers = Consumers} = State,
3438+
Notify) ->
34353439
#{SubscriptionId := Consumer} = Consumers,
34363440
#consumer{log = Log,
34373441
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
@@ -3457,7 +3461,8 @@ remove_subscription(SubscriptionId,
34573461
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
34583462
rabbit_stream_metrics:consumer_cancelled(self(),
34593463
stream_r(Stream, Connection2),
3460-
SubscriptionId),
3464+
SubscriptionId,
3465+
Notify),
34613466

34623467
Requests1 = maybe_unregister_consumer(
34633468
VirtualHost, Consumer,

0 commit comments

Comments
 (0)