@@ -52,9 +52,12 @@ end_per_group(_Group, _Config) ->
52
52
ok .
53
53
54
54
init_per_testcase (_TestCase , Config ) ->
55
+ ok = meck :new (rabbit_feature_flags ),
56
+ meck :expect (rabbit_feature_flags , is_enabled , fun (_ ) -> true end ),
55
57
Config .
56
58
57
59
end_per_testcase (_TestCase , _Config ) ->
60
+ meck :unload (),
58
61
ok .
59
62
60
63
simple_sac_test (_ ) ->
@@ -71,7 +74,7 @@ simple_sac_test(_) ->
71
74
rabbit_stream_sac_coordinator :apply (Command0 , State0 ),
72
75
? assert (Active1 ),
73
76
? assertEqual ([consumer (ConnectionPid , 0 , true )], Consumers1 ),
74
- assertSendMessageEffect (ConnectionPid , 0 , true , Effects1 ),
77
+ assertSendMessageEffect (ConnectionPid , 0 , Stream , ConsumerName , true , Effects1 ),
75
78
76
79
Command1 =
77
80
register_consumer_command (Stream , - 1 , ConsumerName , ConnectionPid , 1 ),
@@ -107,7 +110,7 @@ simple_sac_test(_) ->
107
110
? assertEqual ([consumer (ConnectionPid , 1 , true ),
108
111
consumer (ConnectionPid , 2 , false )],
109
112
Consumers4 ),
110
- assertSendMessageEffect (ConnectionPid , 1 , true , Effects4 ),
113
+ assertSendMessageEffect (ConnectionPid , 1 , Stream , ConsumerName , true , Effects4 ),
111
114
112
115
Command4 =
113
116
unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 1 ),
@@ -116,7 +119,7 @@ simple_sac_test(_) ->
116
119
ok , Effects5 } =
117
120
rabbit_stream_sac_coordinator :apply (Command4 , State4 ),
118
121
? assertEqual ([consumer (ConnectionPid , 2 , true )], Consumers5 ),
119
- assertSendMessageEffect (ConnectionPid , 2 , true , Effects5 ),
122
+ assertSendMessageEffect (ConnectionPid , 2 , Stream , ConsumerName , true , Effects5 ),
120
123
121
124
Command5 =
122
125
unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 2 ),
@@ -141,7 +144,7 @@ super_stream_partition_sac_test(_) ->
141
144
rabbit_stream_sac_coordinator :apply (Command0 , State0 ),
142
145
? assert (Active1 ),
143
146
? assertEqual ([consumer (ConnectionPid , 0 , true )], Consumers1 ),
144
- assertSendMessageEffect (ConnectionPid , 0 , true , Effects1 ),
147
+ assertSendMessageEffect (ConnectionPid , 0 , Stream , ConsumerName , true , Effects1 ),
145
148
146
149
Command1 =
147
150
register_consumer_command (Stream , 1 , ConsumerName , ConnectionPid , 1 ),
@@ -155,7 +158,7 @@ super_stream_partition_sac_test(_) ->
155
158
? assertEqual ([consumer (ConnectionPid , 0 , false ),
156
159
consumer (ConnectionPid , 1 , false )],
157
160
Consumers2 ),
158
- assertSendMessageSteppingDownEffect (ConnectionPid , 0 , Effects2 ),
161
+ assertSendMessageSteppingDownEffect (ConnectionPid , 0 , Stream , ConsumerName , Effects2 ),
159
162
160
163
Command2 = activate_consumer_command (Stream , ConsumerName ),
161
164
{#? STATE {groups = #{GroupId := # group {consumers = Consumers3 }}} =
@@ -167,7 +170,7 @@ super_stream_partition_sac_test(_) ->
167
170
? assertEqual ([consumer (ConnectionPid , 0 , false ),
168
171
consumer (ConnectionPid , 1 , true )],
169
172
Consumers3 ),
170
- assertSendMessageEffect (ConnectionPid , 1 , true , Effects3 ),
173
+ assertSendMessageEffect (ConnectionPid , 1 , Stream , ConsumerName , true , Effects3 ),
171
174
172
175
Command3 =
173
176
register_consumer_command (Stream , 1 , ConsumerName , ConnectionPid , 2 ),
@@ -197,7 +200,7 @@ super_stream_partition_sac_test(_) ->
197
200
consumer (ConnectionPid , 2 , false )],
198
201
Consumers5 ),
199
202
200
- assertSendMessageSteppingDownEffect (ConnectionPid , 1 , Effects5 ),
203
+ assertSendMessageSteppingDownEffect (ConnectionPid , 1 , Stream , ConsumerName , Effects5 ),
201
204
202
205
Command5 = activate_consumer_command (Stream , ConsumerName ),
203
206
{#? STATE {groups = #{GroupId := # group {consumers = Consumers6 }}} =
@@ -208,7 +211,7 @@ super_stream_partition_sac_test(_) ->
208
211
? assertEqual ([consumer (ConnectionPid , 1 , false ),
209
212
consumer (ConnectionPid , 2 , true )],
210
213
Consumers6 ),
211
- assertSendMessageEffect (ConnectionPid , 2 , true , Effects6 ),
214
+ assertSendMessageEffect (ConnectionPid , 2 , Stream , ConsumerName , true , Effects6 ),
212
215
213
216
Command6 =
214
217
unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 1 ),
@@ -310,7 +313,9 @@ ensure_monitors_test(_) ->
310
313
ok .
311
314
312
315
handle_connection_down_test (_ ) ->
313
- GroupId = {<<" /" >>, <<" stream" >>, <<" app" >>},
316
+ Stream = <<" stream" >>,
317
+ ConsumerName = <<" app" >>,
318
+ GroupId = {<<" /" >>, Stream , ConsumerName },
314
319
Pid0 = self (),
315
320
Pid1 = spawn (fun () -> ok end ),
316
321
Group =
@@ -326,7 +331,7 @@ handle_connection_down_test(_) ->
326
331
rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State0 ),
327
332
assertSize (1 , PidsGroups1 ),
328
333
assertSize (1 , maps :get (Pid1 , PidsGroups1 )),
329
- assertSendMessageEffect (Pid1 , 1 , true , Effects1 ),
334
+ assertSendMessageEffect (Pid1 , 1 , Stream , ConsumerName , true , Effects1 ),
330
335
? assertEqual (#{GroupId => cgroup ([consumer (Pid1 , 1 , true )])},
331
336
Groups1 ),
332
337
{#? STATE {pids_groups = PidsGroups2 , groups = Groups2 } = _State2 ,
@@ -397,22 +402,28 @@ activate_consumer_command(Stream, ConsumerName) ->
397
402
stream = Stream ,
398
403
consumer_name = ConsumerName }.
399
404
400
- assertSendMessageEffect (Pid , SubId , Active , [Effect ]) ->
405
+ assertSendMessageEffect (Pid , SubId , Stream , ConsumerName , Active , [Effect ]) ->
401
406
? assertEqual ({mod_call ,
402
407
rabbit_stream_sac_coordinator ,
403
408
send_message ,
404
409
[Pid ,
405
410
{sac ,
406
- {{subscription_id , SubId }, {active , Active },
407
- {extra , []}}}]},
411
+ #{subscription_id => SubId ,
412
+ stream => Stream ,
413
+ consumer_name => ConsumerName ,
414
+ active => Active }
415
+ }]},
408
416
Effect ).
409
417
410
- assertSendMessageSteppingDownEffect (Pid , SubId , [Effect ]) ->
418
+ assertSendMessageSteppingDownEffect (Pid , SubId , Stream , ConsumerName , [Effect ]) ->
411
419
? assertEqual ({mod_call ,
412
420
rabbit_stream_sac_coordinator ,
413
421
send_message ,
414
422
[Pid ,
415
423
{sac ,
416
- {{subscription_id , SubId }, {active , false },
417
- {extra , [{stepping_down , true }]}}}]},
424
+ #{subscription_id => SubId ,
425
+ stream => Stream ,
426
+ consumer_name => ConsumerName ,
427
+ active => false ,
428
+ stepping_down => true }}]},
418
429
Effect ).
0 commit comments