21
21
-opaque command () :: # command_register_consumer {} |
22
22
# command_unregister_consumer {} |
23
23
# command_activate_consumer {} |
24
- # command_connection_reconnected {}.
24
+ # command_connection_reconnected {} |
25
+ # command_purge_nodes {}.
25
26
26
27
-opaque state () :: #? MODULE {}.
27
28
47
48
group_consumers /5 ,
48
49
overview /1 ,
49
50
import_state /2 ]).
51
+ -export ([make_purge_nodes /1 ]).
50
52
51
53
% % exported for unit tests only
52
54
-ifdef (TEST ).
@@ -84,25 +86,13 @@ register_consumer(VirtualHost,
84
86
ConnectionPid ,
85
87
Owner ,
86
88
SubscriptionId ) ->
87
- process_command ({sac ,
88
- # command_register_consumer {vhost =
89
- VirtualHost ,
90
- stream =
91
- Stream ,
92
- partition_index
93
- =
94
- PartitionIndex ,
95
- consumer_name
96
- =
97
- ConsumerName ,
98
- connection_pid
99
- =
100
- ConnectionPid ,
101
- owner =
102
- Owner ,
103
- subscription_id
104
- =
105
- SubscriptionId }}).
89
+ process_command (# command_register_consumer {vhost = VirtualHost ,
90
+ stream = Stream ,
91
+ partition_index = PartitionIndex ,
92
+ consumer_name = ConsumerName ,
93
+ connection_pid = ConnectionPid ,
94
+ owner = Owner ,
95
+ subscription_id = SubscriptionId }).
106
96
107
97
-spec unregister_consumer (binary (),
108
98
binary (),
@@ -115,35 +105,24 @@ unregister_consumer(VirtualHost,
115
105
ConsumerName ,
116
106
ConnectionPid ,
117
107
SubscriptionId ) ->
118
- process_command ({sac ,
119
- # command_unregister_consumer {vhost =
120
- VirtualHost ,
121
- stream =
122
- Stream ,
123
- consumer_name
124
- =
125
- ConsumerName ,
126
- connection_pid
127
- =
128
- ConnectionPid ,
129
- subscription_id
130
- =
131
- SubscriptionId }}).
108
+ process_command (# command_unregister_consumer {vhost = VirtualHost ,
109
+ stream = Stream ,
110
+ consumer_name = ConsumerName ,
111
+ connection_pid = ConnectionPid ,
112
+ subscription_id = SubscriptionId }).
132
113
133
114
-spec activate_consumer (binary (), binary (), binary ()) -> ok .
134
115
activate_consumer (VH , Stream , Name ) ->
135
- process_command ({sac ,
136
- # command_activate_consumer {vhost = VH ,
137
- stream = Stream ,
138
- consumer_name = Name }}).
116
+ process_command (# command_activate_consumer {vhost = VH ,
117
+ stream = Stream ,
118
+ consumer_name = Name }).
139
119
140
120
-spec connection_reconnected (connection_pid ()) -> ok .
141
121
connection_reconnected (Pid ) ->
142
- process_command ({sac ,
143
- # command_connection_reconnected {pid = Pid }}).
122
+ process_command (# command_connection_reconnected {pid = Pid }).
144
123
145
124
process_command (Cmd ) ->
146
- case rabbit_stream_coordinator :process_command (Cmd ) of
125
+ case rabbit_stream_coordinator :process_command (wrap_cmd ( Cmd ) ) of
147
126
{ok , Res , _ } ->
148
127
Res ;
149
128
{error , _ } = Err ->
@@ -152,6 +131,10 @@ process_command(Cmd) ->
152
131
Err
153
132
end .
154
133
134
+ -spec wrap_cmd (command ()) -> {sac , command ()}.
135
+ wrap_cmd (Cmd ) ->
136
+ {sac , Cmd }.
137
+
155
138
% % return the current groups for a given virtual host
156
139
-spec consumer_groups (binary (), [atom ()]) ->
157
140
{ok ,
@@ -308,8 +291,31 @@ apply(#command_connection_reconnected{pid = Pid},
308
291
handle_group_connection_reconnected (Pid , St , Eff , G )
309
292
end , {State0 , []}, Groups0 ),
310
293
294
+ {State1 , ok , Eff };
295
+ apply (# command_purge_nodes {nodes = Nodes }, State0 ) ->
296
+ {State1 , Eff } = lists :foldl (fun (N , {S0 , Eff0 }) ->
297
+ {S1 , Eff1 } = purge_node (N , S0 ),
298
+ {S1 , Eff1 ++ Eff0 }
299
+ end , {State0 , []}, Nodes ),
311
300
{State1 , ok , Eff }.
312
301
302
+ purge_node (Node , #? MODULE {groups = Groups0 } = State0 ) ->
303
+ PidsGroups =
304
+ maps :fold (fun (K , # group {consumers = Consumers }, Acc ) ->
305
+ lists :foldl (fun (# consumer {pid = Pid }, AccIn )
306
+ when node (Pid ) =:= Node ->
307
+ PG0 = maps :get (Pid , AccIn , #{}),
308
+ PG1 = PG0 #{K => true },
309
+ AccIn #{Pid => PG1 };
310
+ (_ , AccIn ) ->
311
+ AccIn
312
+ end , Acc , Consumers )
313
+ end , #{}, Groups0 ),
314
+ maps :fold (fun (Pid , Groups , {S0 , Eff0 }) ->
315
+ {S1 , Eff1 } = handle_connection_down0 (Pid , S0 , Groups ),
316
+ {S1 , Eff1 ++ Eff0 }
317
+ end , {State0 , []}, PidsGroups ).
318
+
313
319
handle_group_connection_reconnected (Pid , #? MODULE {groups = Groups0 } = S0 ,
314
320
Eff0 , {VH , S , Name } = K ) ->
315
321
% % TODO sac: handle forgotten_active case (reconciliate state with current active)
@@ -575,6 +581,7 @@ ensure_monitors(#command_connection_reconnected{pid = Pid},
575
581
Monitors #{Pid => sac },
576
582
[{monitor , process , Pid }, {monitor , node , node (Pid )} | Effects ]};
577
583
ensure_monitors (_ , #? MODULE {} = State0 , Monitors , Effects ) ->
584
+ % % TODO sac: ensure the pid-group mapping after purge_nodes?
578
585
{State0 , Monitors , Effects }.
579
586
580
587
-spec handle_connection_down (connection_pid (), state ()) ->
@@ -586,11 +593,14 @@ handle_connection_down(Pid,
586
593
{State0 , []};
587
594
{Groups , PidsGroups1 } ->
588
595
State1 = State0 #? MODULE {pids_groups = PidsGroups1 },
589
- maps :fold (fun (G , _ , Acc ) ->
590
- handle_group_after_connection_down (Pid , Acc , G )
591
- end , {State1 , []}, Groups )
596
+ handle_connection_down0 (Pid , State1 , Groups )
592
597
end .
593
598
599
+ handle_connection_down0 (Pid , State , Groups ) ->
600
+ maps :fold (fun (G , _ , Acc ) ->
601
+ handle_group_after_connection_down (Pid , Acc , G )
602
+ end , {State , []}, Groups ).
603
+
594
604
-spec handle_connection_node_disconnected (connection_pid (), state ()) ->
595
605
{state (), ra_machine :effects ()}.
596
606
handle_connection_node_disconnected (ConnPid ,
@@ -732,6 +742,10 @@ import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
732
742
#? MODULE {groups = map_to_groups (Groups ),
733
743
pids_groups = map_to_pids_groups (PidsGroups )}.
734
744
745
+ - spec make_purge_nodes ([node ()]) -> command ().
746
+ make_purge_nodes (Nodes ) ->
747
+ wrap_cmd (# command_purge_nodes {nodes = Nodes }).
748
+
735
749
map_to_groups (Groups ) when is_map (Groups ) ->
736
750
maps :fold (fun (K , V , Acc ) ->
737
751
Acc #{K => map_to_group (V )}
0 commit comments