Skip to content

Commit a337684

Browse files
authored
Merge pull request #10088 from rabbitmq/convert-rabbit_writer-to-gen_server
rabbit_writer: Convert to a regular gen_server
2 parents 19a7518 + c607eb0 commit a337684

File tree

3 files changed

+90
-71
lines changed

3 files changed

+90
-71
lines changed

deps/amqp_client/src/amqp_channel.erl

+7-6
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,11 @@ handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl},
839839
handle_shutdown({connection_closing, ReportedReason}, State);
840840
handle_channel_exit(Reason, State) ->
841841
%% Unexpected death of a channel infrastructure process
842-
{stop, {infrastructure_died, Reason}, State}.
842+
Reason1 = case Reason of
843+
{shutdown, R} -> R;
844+
_ -> Reason
845+
end,
846+
{stop, {infrastructure_died, Reason1}, State}.
843847

844848
handle_shutdown({_, 200, _}, State) ->
845849
{stop, normal, State};
@@ -872,11 +876,8 @@ do(Method, Content, Flow, #state{driver = direct, writer = W}) ->
872876

873877

874878
flush_writer(#state{driver = network, writer = Writer}) ->
875-
try
876-
rabbit_writer:flush(Writer)
877-
catch
878-
exit:noproc -> ok
879-
end;
879+
_ = catch rabbit_writer:flush(Writer),
880+
ok;
880881
flush_writer(#state{driver = direct}) ->
881882
ok.
882883
amqp_msg(none) ->

deps/amqp_client/test/system_SUITE.erl

+19-3
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,13 @@ channel_writer_death(Config) ->
13471347
when ConnType =:= direct -> ok;
13481348

13491349
exit:{{infrastructure_died, {unknown_properties_record, <<>>}}, _}
1350-
when ConnType =:= network -> ok
1350+
when ConnType =:= network -> ok;
1351+
1352+
%% The writer process exited before the call and the amqp_channel_sup
1353+
%% supervisor shut the supervision tree down because the channel is
1354+
%% significant. The call happened at that shutdown time or just after.
1355+
exit:{shutdown, {gen_server, call, _}} -> ok;
1356+
exit:{noproc, {gen_server, call, _}} -> ok
13511357
end,
13521358
wait_for_death(Channel),
13531359
wait_for_death(Connection).
@@ -1435,7 +1441,12 @@ shortstr_overflow_property(Config) ->
14351441
Ret = amqp_channel:call(Channel, QoS),
14361442
throw({unexpected_success, Ret})
14371443
catch
1438-
exit:{{infrastructure_died, content_properties_shortstr_overflow}, _} -> ok
1444+
exit:{{infrastructure_died, content_properties_shortstr_overflow}, _} -> ok;
1445+
%% The writer process exited before the call and the amqp_channel_sup
1446+
%% supervisor shut the supervision tree down because the channel is
1447+
%% significant. The call happened at that shutdown time or just after.
1448+
exit:{shutdown, {gen_server, call, _}} -> ok;
1449+
exit:{noproc, {gen_server, call, _}} -> ok
14391450
end,
14401451
wait_for_death(Channel),
14411452
wait_for_death(Connection).
@@ -1457,7 +1468,12 @@ shortstr_overflow_field(Config) ->
14571468
consumer_tag = SentString}),
14581469
throw({unexpected_success, Ret})
14591470
catch
1460-
exit:{{infrastructure_died, method_field_shortstr_overflow}, _} -> ok
1471+
exit:{{infrastructure_died, method_field_shortstr_overflow}, _} -> ok;
1472+
%% The writer process exited before the call and the amqp_channel_sup
1473+
%% supervisor shut the supervision tree down because the channel is
1474+
%% significant. The call happened at that shutdown time or just after.
1475+
exit:{shutdown, {gen_server, call, _}} -> ok;
1476+
exit:{noproc, {gen_server, call, _}} -> ok
14611477
end,
14621478
wait_for_death(Channel),
14631479
wait_for_death(Connection).

deps/rabbit_common/src/rabbit_writer.erl

+64-62
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
-module(rabbit_writer).
99

10+
-behavior(gen_server).
11+
1012
%% This module backs writer processes ("writers"). The responsibility of
1113
%% a writer is to serialise protocol methods and write them to the socket.
1214
%% Every writer is associated with a channel and normally it's the channel
@@ -27,7 +29,12 @@
2729
-include("rabbit.hrl").
2830
-export([start/6, start_link/6, start/7, start_link/7, start/8, start_link/8]).
2931

30-
-export([system_continue/3, system_terminate/4, system_code_change/4]).
32+
-export([init/1,
33+
handle_call/3,
34+
handle_cast/2,
35+
handle_info/2,
36+
terminate/2,
37+
code_change/3]).
3138

3239
-export([send_command/2, send_command/3,
3340
send_command_sync/2, send_command_sync/3,
@@ -37,9 +44,6 @@
3744
-export([internal_send_command/4, internal_send_command/6]).
3845
-export([msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
3946

40-
%% internal
41-
-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
42-
4347
-record(wstate, {
4448
%% socket (port)
4549
sock,
@@ -97,10 +101,6 @@
97101
rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) ->
98102
rabbit_types:ok(pid()).
99103

100-
-spec system_code_change(_,_,_,_) -> {'ok',_}.
101-
-spec system_continue(_,_,#wstate{}) -> any().
102-
-spec system_terminate(_,_,_,_) -> no_return().
103-
104104
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
105105
-spec send_command
106106
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
@@ -161,13 +161,15 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
161161
ReaderWantsStats, GCThreshold) ->
162162
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
163163
ReaderWantsStats, GCThreshold),
164-
{ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
164+
Options = [{hibernate_after, ?HIBERNATE_AFTER}],
165+
gen_server:start(?MODULE, [Identity, State], Options).
165166

166167
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
167168
ReaderWantsStats, GCThreshold) ->
168169
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
169170
ReaderWantsStats, GCThreshold),
170-
{ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
171+
Options = [{hibernate_after, ?HIBERNATE_AFTER}],
172+
gen_server:start_link(?MODULE, [Identity, State], Options).
171173

172174
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GCThreshold) ->
173175
(case ReaderWantsStats of
@@ -182,49 +184,57 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GC
182184
writer_gc_threshold = GCThreshold},
183185
#wstate.stats_timer).
184186

185-
system_continue(Parent, Deb, State) ->
186-
mainloop(Deb, State#wstate{reader = Parent}).
187-
188-
system_terminate(Reason, _Parent, _Deb, _State) ->
189-
exit(Reason).
190-
191-
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
192-
{ok, Misc}.
193-
194-
enter_mainloop(Identity, State) ->
187+
init([Identity, State]) ->
195188
?LG_PROCESS_TYPE(writer),
196-
Deb = sys:debug_options([]),
197189
?store_proc_name(Identity),
198-
mainloop(Deb, State).
190+
{ok, State}.
199191

200-
mainloop(Deb, State) ->
192+
handle_call({send_command_sync, MethodRecord}, _From, State) ->
201193
try
202-
mainloop1(Deb, State)
194+
State1 = internal_flush(
195+
internal_send_command_async(MethodRecord, State)),
196+
{reply, ok, State1, 0}
203197
catch
204-
exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
205-
ReaderPid ! {channel_exit, Channel, Error}
206-
end,
207-
done.
208-
209-
mainloop1(Deb, State = #wstate{pending = []}) ->
210-
receive
211-
Message -> {Deb1, State1} = handle_message(Deb, Message, State),
212-
?MODULE:mainloop1(Deb1, State1)
213-
after ?HIBERNATE_AFTER ->
214-
erlang:hibernate(?MODULE, mainloop, [Deb, State])
198+
_Class:Reason ->
199+
{stop, {shutdown, Reason}, State}
215200
end;
216-
mainloop1(Deb, State) ->
217-
receive
218-
Message -> {Deb1, State1} = handle_message(Deb, Message, State),
219-
?MODULE:mainloop1(Deb1, State1)
220-
after 0 ->
221-
?MODULE:mainloop1(Deb, internal_flush(State))
201+
handle_call({send_command_sync, MethodRecord, Content}, _From, State) ->
202+
try
203+
State1 = internal_flush(
204+
internal_send_command_async(MethodRecord, Content, State)),
205+
{reply, ok, State1, 0}
206+
catch
207+
_Class:Reason ->
208+
{stop, {shutdown, Reason}, State}
209+
end;
210+
handle_call(flush, _From, State) ->
211+
try
212+
State1 = internal_flush(State),
213+
{reply, ok, State1, 0}
214+
catch
215+
_Class:Reason ->
216+
{stop, {shutdown, Reason}, State}
222217
end.
223218

224-
handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) ->
225-
sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State);
226-
handle_message(Deb, Message, State) ->
227-
{Deb, handle_message(Message, State)}.
219+
handle_cast(_Message, State) ->
220+
{noreply, State, 0}.
221+
222+
handle_info(timeout, State) ->
223+
try
224+
State1 = internal_flush(State),
225+
{noreply, State1}
226+
catch
227+
_Class:Reason ->
228+
{stop, {shutdown, Reason}, State}
229+
end;
230+
handle_info(Message, State) ->
231+
try
232+
State1 = handle_message(Message, State),
233+
{noreply, State1, 0}
234+
catch
235+
_Class:Reason ->
236+
{stop, {shutdown, Reason}, State}
237+
end.
228238

229239
handle_message({send_command, MethodRecord}, State) ->
230240
internal_send_command_async(MethodRecord, State);
@@ -236,21 +246,6 @@ handle_message({send_command_flow, MethodRecord, Sender}, State) ->
236246
handle_message({send_command_flow, MethodRecord, Content, Sender}, State) ->
237247
credit_flow:ack(Sender),
238248
internal_send_command_async(MethodRecord, Content, State);
239-
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
240-
State1 = internal_flush(
241-
internal_send_command_async(MethodRecord, State)),
242-
gen_server:reply(From, ok),
243-
State1;
244-
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
245-
State) ->
246-
State1 = internal_flush(
247-
internal_send_command_async(MethodRecord, Content, State)),
248-
gen_server:reply(From, ok),
249-
State1;
250-
handle_message({'$gen_call', From, flush}, State) ->
251-
State1 = internal_flush(State),
252-
gen_server:reply(From, ok),
253-
State1;
254249
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
255250
State1 = internal_send_command_async(MethodRecord, State),
256251
rabbit_amqqueue_common:notify_sent(QPid, ChPid),
@@ -277,6 +272,14 @@ handle_message({ok, _Ref} = Msg, State) ->
277272
handle_message(Message, _State) ->
278273
exit({writer, message_not_understood, Message}).
279274

275+
terminate(Reason, State) ->
276+
#wstate{reader = ReaderPid, channel = Channel} = State,
277+
ReaderPid ! {channel_exit, Channel, Reason},
278+
ok.
279+
280+
code_change(_OldVsn, State, _Extra) ->
281+
{ok, State}.
282+
280283
%%---------------------------------------------------------------------------
281284

282285
send_command(W, MethodRecord) ->
@@ -316,8 +319,7 @@ flush(W) -> call(W, flush).
316319
%%---------------------------------------------------------------------------
317320

318321
call(Pid, Msg) ->
319-
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
320-
Res.
322+
gen_server:call(Pid, Msg, infinity).
321323

322324
%%---------------------------------------------------------------------------
323325

0 commit comments

Comments
 (0)