Skip to content

Commit 7c60367

Browse files
committed
Fix conflicts
References #7743
1 parent ef0be70 commit 7c60367

File tree

4 files changed

+19
-49
lines changed

4 files changed

+19
-49
lines changed

deps/rabbit/src/rabbit_core_ff.erl

+8-11
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,14 @@
111111
depends_on => [stream_queue]
112112
}}).
113113

114-
<<<<<<< HEAD
114+
-rabbit_feature_flag(
115+
{stream_sac_coordinator_unblock_group,
116+
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
117+
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
118+
stability => stable,
119+
depends_on => [stream_single_active_consumer]
120+
}}).
121+
115122
%% -------------------------------------------------------------------
116123
%% Direct exchange routing v2.
117124
%% -------------------------------------------------------------------
@@ -231,13 +238,3 @@ delete_table(FeatureName, Tab) ->
231238
%% adheres to the callback interface
232239
ok
233240
end.
234-
=======
235-
236-
-rabbit_feature_flag(
237-
{stream_sac_coordinator_unblock_group,
238-
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
239-
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
240-
stability => stable,
241-
depends_on => [stream_single_active_consumer]
242-
}}).
243-
>>>>>>> 221f10d2d9 (Unblock group of consumers on super stream partition)

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+1-7
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,7 @@ apply(#command_activate_consumer{vhost = VirtualHost,
274274
{G, Eff} =
275275
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
276276
undefined ->
277-
<<<<<<< HEAD
278-
rabbit_log:warning("trying to activate consumer in group ~p, but "
279-
=======
280277
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
281-
>>>>>>> 221f10d2d9 (Unblock group of consumers on super stream partition)
282278
"the group does not longer exist",
283279
[{VirtualHost, Stream, ConsumerName}]),
284280
{undefined, []};
@@ -828,10 +824,8 @@ send_message(ConnectionPid, Msg) ->
828824
ConnectionPid ! Msg,
829825
ok.
830826

831-
<<<<<<< HEAD
832827
is_ff_enabled() ->
833828
rabbit_feature_flags:is_enabled(stream_single_active_consumer).
834-
=======
829+
835830
has_unblock_group_support() ->
836831
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).
837-
>>>>>>> 221f10d2d9 (Unblock group of consumers on super stream partition)

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

-16
Original file line numberDiff line numberDiff line change
@@ -971,8 +971,6 @@ open(info, emit_stats,
971971
StatemData) ->
972972
Connection1 = emit_stats(Connection, State),
973973
{keep_state, StatemData#statem_data{connection = Connection1}};
974-
<<<<<<< HEAD
975-
=======
976974
open(info, check_outstanding_requests,
977975
#statem_data{connection = #stream_connection{outstanding_requests = Requests,
978976
request_timeout = Timeout} = Connection0} =
@@ -1012,7 +1010,6 @@ open(info, {shutdown, Explanation} = Reason,
10121010
[self(), Explanation]),
10131011
_ = demonitor_all_streams(Connection),
10141012
{stop, Reason};
1015-
>>>>>>> 62d016d3c5 (Introduce timeout for stream server-side requests)
10161013
open(info, Unknown, _StatemData) ->
10171014
rabbit_log_connection:warning("Received unknown message ~p in state ~s",
10181015
[Unknown, ?FUNCTION_NAME]),
@@ -2971,33 +2968,20 @@ maybe_send_consumer_update(Transport,
29712968
correlation_id_sequence = CorrIdSeq},
29722969
Consumer,
29732970
Active,
2974-
<<<<<<< HEAD
2975-
true = _Sac,
2976-
Extra) ->
2977-
rabbit_log:debug("SAC subscription ~p, active = ~p",
2978-
[SubscriptionId, Active]),
2979-
=======
29802971
Msg) ->
2981-
<<<<<<< HEAD
29822972
#consumer{configuration = #consumer_configuration{
29832973
stream = Stream,
29842974
subscription_id = SubscriptionId
29852975
}} = Consumer,
29862976
rabbit_log:debug("SAC subscription ~tp on ~tp, active = ~tp, " ++
29872977
"sending consumer update with correlation ID ~tp",
29882978
[SubscriptionId, Stream, Active, CorrIdSeq]),
2989-
>>>>>>> 221f10d2d9 (Unblock group of consumers on super stream partition)
2990-
Frame =
2991-
rabbit_stream_core:frame({request, CorrIdSeq,
2992-
{consumer_update, SubscriptionId, Active}}),
2993-
=======
29942979
#consumer{configuration =
29952980
#consumer_configuration{subscription_id = SubscriptionId}} = Consumer,
29962981
Frame = rabbit_stream_core:frame({request, CorrIdSeq,
29972982
{consumer_update, SubscriptionId, Active}}),
29982983

29992984
Connection1 = register_request(Connection, Msg),
3000-
>>>>>>> 62d016d3c5 (Introduce timeout for stream server-side requests)
30012985

30022986
send(Transport, S, Frame),
30032987
Connection1.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

+10-15
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,6 @@ end_per_group(_, Config) ->
131131
rabbit_ct_helpers:run_steps(Config,
132132
rabbit_ct_broker_helpers:teardown_steps()).
133133

134-
<<<<<<< HEAD
135-
init_per_testcase(_TestCase, Config) ->
136-
Config.
137-
138-
end_per_testcase(sac_ff, Config) ->
139-
rabbit_ct_broker_helpers:rpc(Config,
140-
0,
141-
rabbit_feature_flags,
142-
enable,
143-
[stream_single_active_consumer]),
144-
ok;
145-
end_per_testcase(_Test, _Config) ->
146-
ok.
147-
=======
148134
init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
149135
ok = rabbit_ct_broker_helpers:rpc(Config,
150136
0,
@@ -161,10 +147,19 @@ end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config)
161147
application,
162148
set_env,
163149
[rabbitmq_stream, request_timeout, 60000]),
150+
_ = rabbit_ct_broker_helpers:rpc(Config,
151+
0,
152+
rabbit_feature_flags,
153+
enable,
154+
[stream_single_active_consumer]),
164155
rabbit_ct_helpers:testcase_finished(Config, TestCase);
165156
end_per_testcase(TestCase, Config) ->
157+
_ = rabbit_ct_broker_helpers:rpc(Config,
158+
0,
159+
rabbit_feature_flags,
160+
enable,
161+
[stream_single_active_consumer]),
166162
rabbit_ct_helpers:testcase_finished(Config, TestCase).
167-
>>>>>>> 62d016d3c5 (Introduce timeout for stream server-side requests)
168163

169164
test_global_counters(Config) ->
170165
Stream = atom_to_binary(?FUNCTION_NAME, utf8),

0 commit comments

Comments
 (0)