Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graceful shutdown #178

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/mochiweb_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

-module(mochiweb_http).
-author('[email protected]').
-export([start/1, start_link/1, stop/0, stop/1]).
-export([start/1, start_link/1, stop/0, stop/1, stop/2]).
-export([loop/3]).
-export([after_response/2, reentry/1]).
-export([parse_range_request/1, range_skip_length/2]).
Expand Down Expand Up @@ -53,6 +53,9 @@ stop() ->
stop(Name) ->
mochiweb_socket_server:stop(Name).

stop(Name, Timeout) ->
mochiweb_socket_server:stop(Name, Timeout).

%% @spec start(Options) -> ServerRet
%% Options = [option()]
%% Option = {name, atom()} | {ip, string() | tuple()} | {backlog, integer()}
Expand Down
54 changes: 23 additions & 31 deletions src/mochiweb_socket_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-include("internal.hrl").

-export([start/1, start_link/1, stop/1]).
-export([start/1, start_link/1, stop/1, stop/2]).
-export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3,
handle_info/2]).
-export([get/2, set/3]).
Expand All @@ -30,7 +30,7 @@
ssl_opts=[{ssl_imp, new}],
acceptor_pool=sets:new(),
profile_fun=undefined,
shutdown_delay=0}).
shutdown_notify_pid=undefined}).

-define(is_old_state(State), not is_record(State, mochiweb_socket_server)).

Expand Down Expand Up @@ -61,14 +61,15 @@ set(Name, Property, _Value) ->
[Name, Property]).

stop(Name) when is_atom(Name) orelse is_pid(Name) ->
ShutdownDelay = get(Name, shutdown_delay),
graceful_shutdown(Name, ShutdownDelay),
gen_server:call(Name, stop);
gen_server:call(Name, stop);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this change on purpose?

stop({Scope, Name}) when Scope =:= local orelse Scope =:= global ->
stop(Name);
stop(Options) ->
State = parse_options(Options),
stop(State#mochiweb_socket_server.name).
stop(Name, Timeout) when is_atom(Name) orelse is_pid(Name) andalso is_integer(Timeout) ->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

andalso has precedence over orelse. Therefore, this guard will be true for:
is_atom(Name) (independently of Timeout's type)
or
is_pid(Name) andalso is_integer(Timeout)

Is that the expected behaviour? I believe it should be:

Suggested change
stop(Name, Timeout) when is_atom(Name) orelse is_pid(Name) andalso is_integer(Timeout) ->
stop(Name, Timeout) when (is_atom(Name) orelse is_pid(Name)) andalso is_integer(Timeout) ->

gen_server:call(Name, prep_stop, Timeout),
gen_server:call(Name, stop).

%% Internal API

Expand Down Expand Up @@ -149,9 +150,8 @@ parse_options([{ssl_opts, SslOpts} | Rest], State) when is_list(SslOpts) ->
parse_options(Rest, State#mochiweb_socket_server{ssl_opts=SslOpts1});
parse_options([{profile_fun, ProfileFun} | Rest], State) when is_function(ProfileFun) ->
parse_options(Rest, State#mochiweb_socket_server{profile_fun=ProfileFun});
parse_options([{shutdown_delay, ShutdownDelay} | Rest], State) ->
ShutdownDelayInt = ensure_int(ShutdownDelay),
parse_options(Rest, State#mochiweb_socket_server{shutdown_delay=ShutdownDelayInt}).
parse_options([{shutdown_notify_pid, NotifyPid} | Rest], State) when is_pid(NotifyPid) ->
parse_options(Rest, State#mochiweb_socket_server{shutdown_notify_pid=NotifyPid}).


start_server(F, State=#mochiweb_socket_server{ssl=Ssl, name=Name}) ->
Expand Down Expand Up @@ -242,9 +242,7 @@ do_get(port, #mochiweb_socket_server{port=Port}) ->
do_get(waiting_acceptors, #mochiweb_socket_server{acceptor_pool=Pool}) ->
sets:size(Pool);
do_get(active_sockets, #mochiweb_socket_server{active_sockets=ActiveSockets}) ->
ActiveSockets;
do_get(shutdown_delay, #mochiweb_socket_server{shutdown_delay=ShutdownDelay}) ->
ShutdownDelay.
ActiveSockets.


state_to_proplist(#mochiweb_socket_server{name=Name,
Expand Down Expand Up @@ -273,8 +271,11 @@ handle_call({get, Property}, _From, State) ->
{reply, Res, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(prep_stop, _From, State) ->
{reply, close_listen_socket(State), State};
handle_call(prep_stop, From, State) ->
close_listen_socket(State),
State1 = State#mochiweb_socket_server{shutdown_notify_pid=From, acceptor_pool_size=0},
% Reply will be given when active_socket count goes to 0
{noreply, State1};
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the polling logic by using a noreply response and a reply when active_sockets become 0.

handle_call(_Message, _From, State) ->
Res = error,
{reply, Res, State}.
Expand Down Expand Up @@ -317,7 +318,8 @@ recycle_acceptor(Pid, State=#mochiweb_socket_server{
acceptor_pool=Pool,
acceptor_pool_size=PoolSize,
max=Max,
active_sockets=ActiveSockets}) ->
active_sockets=ActiveSockets,
shutdown_notify_pid=NotifyPid}) ->
%% A socket is considered to be active from immediately after it
%% has been accepted (see the {accepted, Pid, Timing} cast above).
%% This function will be called when an acceptor is transitioning
Expand All @@ -335,6 +337,12 @@ recycle_acceptor(Pid, State=#mochiweb_socket_server{
State1 = State#mochiweb_socket_server{
acceptor_pool=Pool1,
active_sockets=ActiveSockets1},
case NotifyPid of
undefined -> ok;
_ -> if ActiveSockets1 =< 0 -> gen_server:reply(NotifyPid, ok);
true -> error_logger:info_msg("~p clients outstanding",[ActiveSockets1])
end
end,
%% Spawn a new acceptor only if it will not overrun the maximum socket
%% count or the maximum pool size.
case NewSize + ActiveSockets1 < Max andalso NewSize < PoolSize of
Expand Down Expand Up @@ -376,22 +384,6 @@ handle_info(Info, State) ->
error_logger:info_report([{'INFO', Info}, {'State', State}]),
{noreply, State}.

graceful_shutdown(_, 0) ->
ok;
graceful_shutdown(Name, ShutdownDelay) ->
gen_server:call(Name, prep_stop),
WaitLoop = fun (_, Delay) when Delay =< 0 ->
ok;
(Loop, Delay) ->
case mochiweb_socket_server:get(Name, active_sockets) of
0 -> ok;
X -> error_logger:info_msg("Waiting for ~p clients to finish~n", [X]),
timer:sleep(min(5, Delay)),
Loop(Loop, Delay - 5)
end
end,
WaitLoop(WaitLoop, ShutdownDelay).

%%
%% Tests
%%
Expand All @@ -416,7 +408,7 @@ upgrade_state_test() ->
ssl=ssl, ssl_opts=ssl_opts,
acceptor_pool=acceptor_pool,
profile_fun=undefined,
shutdown_delay=0},
shutdown_notify_pid=undefined},
?assertEqual(CmpState, State).

-endif.
16 changes: 5 additions & 11 deletions test/mochiweb_socket_server_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ client_fun(Socket, [{send_pid, To} | Cmds]) ->
To ! {client, self()},
client_fun(Socket, Cmds);
client_fun(Socket, [{send, Data, Tester} | Cmds]) ->
client_fun(Socket, [{send, Data, Tester, 0} | Cmds]);
client_fun(Socket, [{send, Data, Tester, Delay} | Cmds]) ->
timer:sleep(Delay),
case gen_tcp:send(Socket, Data) of
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed usage of timer:sleep in unit test

ok -> ok;
{error, E} -> Tester ! {client_send_error, self(), E}
Expand Down Expand Up @@ -143,13 +140,10 @@ normal_acceptor_test_fun() ->
?assertEqual(Expected, Result)
end || {Max, PoolSize, NumClients, Expected} <- Tests].

graceful_shutdown_test_fun() ->
graceful_shutdown_test_fun(ShutDownDelay) ->
Tester = self(),
NumClients = 2,
ClientSendDelay = 10,
BufferTime = 5,
ShutDownDelay = (NumClients * ClientSendDelay) + BufferTime,
ServerOpts = [{max, NumClients}, {acceptor_pool_size, NumClients}, {shutdown_delay, ShutDownDelay}],
ServerOpts = [{max, NumClients}, {acceptor_pool_size, NumClients}],
ServerLoop =
fun (Socket, _Opts) ->
Tester ! {server_accepted, self()},
Expand All @@ -159,7 +153,7 @@ graceful_shutdown_test_fun() ->
{Server, Port} = socket_server(ServerOpts, ServerLoop),
Data = <<"data">>,
ClientCmds = [{send_pid, Tester}, {wait_msg, go},
{send, Data, Tester, ClientSendDelay},
{send, Data, Tester},
{close_sock}, {send_msg, done, Tester}],
start_client_conns(Port, NumClients, fun client_fun/2, ClientCmds, Tester),

Expand All @@ -181,7 +175,7 @@ graceful_shutdown_test_fun() ->
end,
{Connected, _} = ConnectLoop(ConnectLoop, [], [], 0),

spawn(mochiweb_socket_server, stop, [Server]),
spawn(mochiweb_socket_server, stop, [Server, ShutDownDelay]),

WaitLoop =
fun (_Loop, Done, Error, []) ->
Expand Down Expand Up @@ -209,6 +203,6 @@ normal_acceptor_test_() ->


graceful_shutdown_test_() ->
{timeout, ?LARGE_TIMEOUT, [fun() -> graceful_shutdown_test_fun() end]}.
{timeout, ?LARGE_TIMEOUT, [fun() -> graceful_shutdown_test_fun(?LARGE_TIMEOUT - 1) end]}.

-endif.