From b5da9655b91e8d199e26dad991def3d05186cdf6 Mon Sep 17 00:00:00 2001 From: Albert Schimpf Date: Thu, 19 Dec 2019 15:32:30 +0100 Subject: [PATCH] riak_core_vnode_worker_pool to gen_statem (#19) Rewrote gen_fsm_compat -> gen_statem for module riak_core_vnode_worker_pool --- include/riak_core_vnode.hrl | 5 +- rebar.config | 2 +- src/riak_core_util.erl | 5 +- src/riak_core_vnode.erl | 13 +- src/riak_core_vnode_worker_pool.erl | 378 ++++++++++++++++------------ test/worker_pool_test.erl | 99 ++++++-- 6 files changed, 301 insertions(+), 201 deletions(-) diff --git a/include/riak_core_vnode.hrl b/include/riak_core_vnode.hrl index 9ff955a66..e4589cff8 100644 --- a/include/riak_core_vnode.hrl +++ b/include/riak_core_vnode.hrl @@ -1,9 +1,10 @@ -type sender_type() :: fsm | server | raw. -type sender() :: {sender_type(), reference() | tuple(), pid()} | %% TODO: Double-check that these special cases are kosher - {server, undefined, undefined} | % special case in - % riak_core_vnode_master.erl + {fsm, undefined, pid()} | % what are these special cases and what is the reference used for?? + {server, undefined, undefined} | % special case in riak_core_vnode_master.erl ignore. + -type partition() :: chash:index_as_int(). -type vnode_req() :: term(). -type keyspaces() :: [{partition(), [partition()]}]. diff --git a/rebar.config b/rebar.config index c57e19120..7885d5e92 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ %{cover_enabled, true}. -{erl_opts, [warnings_as_errors, +{erl_opts, [ debug_info]}. {edoc_opts, [{preprocess, true}]}. {xref_checks, []}. diff --git a/src/riak_core_util.erl b/src/riak_core_util.erl index 8ffed9312..425710a86 100644 --- a/src/riak_core_util.erl +++ b/src/riak_core_util.erl @@ -718,10 +718,7 @@ proxy_spawn(Fun) -> %% @private -make_fold_reqv(v1, FoldFun, Acc0, _Forwardable, _Opts) - when is_function(FoldFun, 3) -> - #riak_core_fold_req_v1{foldfun=FoldFun, acc0=Acc0}; -make_fold_reqv(v2, FoldFun, Acc0, Forwardable, Opts) +make_fold_reqv(_, FoldFun, Acc0, Forwardable, Opts) when is_function(FoldFun, 3) andalso (Forwardable == true orelse Forwardable == false) andalso is_list(Opts) -> diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index f1f2a6dd0..4d387affb 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -1095,6 +1095,8 @@ current_state(Pid) -> gen_fsm:sync_send_all_state_event(Pid, current_state). pool_death_test() -> + %% expect error log + error_logger:tty(false), meck:unload(), meck:new(test_vnode, [non_strict, no_link]), meck:expect(test_vnode, init, fun(_) -> {ok, [], [{pool, test_pool_mod, 1, []}]} end), @@ -1102,23 +1104,20 @@ pool_death_test() -> meck:new(test_pool_mod, [non_strict, no_link]), meck:expect(test_pool_mod, init_worker, fun(_, _, _) -> {ok, []} end), - %% expect error log - error_logger:tty(false), - - {ok, Pid} = ?MODULE:test_link(test_vnode, 0), - {_, StateData1} = ?MODULE:current_state(Pid), + {ok, Pid} = riak_core_vnode:test_link(test_vnode, 0), + {_, StateData1} = riak_core_vnode:current_state(Pid), PoolPid1 = StateData1#state.pool_pid, exit(PoolPid1, kill), wait_for_process_death(PoolPid1), ?assertNot(is_process_alive(PoolPid1)), + wait_for_state_update(StateData1, Pid), - {_, StateData2} = ?MODULE:current_state(Pid), + {_, StateData2} = riak_core_vnode:current_state(Pid), PoolPid2 = StateData2#state.pool_pid, ?assertNot(PoolPid2 =:= undefined), exit(Pid, normal), wait_for_process_death(Pid), - error_logger:tty(false), meck:validate(test_pool_mod), meck:validate(test_vnode). diff --git a/src/riak_core_vnode_worker_pool.erl b/src/riak_core_vnode_worker_pool.erl index 7b8134f86..be3273c1d 100644 --- a/src/riak_core_vnode_worker_pool.erl +++ b/src/riak_core_vnode_worker_pool.erl @@ -39,210 +39,214 @@ %% confuse (or cause a race) with this module's checkout management. -module(riak_core_vnode_worker_pool). --behaviour(gen_fsm_compat). - -%% gen_fsm_compat callbacks --export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, - terminate/3, code_change/4]). - -%% gen_fsm_compat states --export([ready/2, queueing/2, ready/3, queueing/3, shutdown/2, shutdown/3]). +-behaviour(gen_statem). %% API --export([start_link/6, start_link/5, stop/2, shutdown_pool/2, handle_work/3, worker_started/1, checkin_worker/2]). +-export([start_link/5, start_link/6, stop/2, shutdown_pool/2, handle_work/3, worker_started/1, checkin_worker/2]). --ifdef(PULSE). --compile(export_all). --compile({parse_transform, pulse_instrument}). --compile({pulse_replace_module, [{gen_fsm_compat, pulse_gen_fsm}]}). --endif. +%% gen_statem callbacks +-export([init/1, terminate/3, code_change/4, callback_mode/0]). --record(state, { - queue :: queue:queue() | list(), - pool :: pid(), - monitors = [] :: list(), - queue_strategy = fifo :: fifo | filo, - shutdown :: undefined | {pid(), reference()} - }). +%% gen_statem states +-export([ready/3, queue/3, shutdown/3]). --type pool_opt() :: - {strategy, fifo | filo}. +%% ======== +%% API +%% ======== start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps) -> start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps, []). --spec start_link(atom(), pos_integer(), pos_integer(), term(), term(), - [pool_opt()]) -> - {ok, pid()}. start_link(WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps, Opts) -> - gen_fsm_compat:start_link(?MODULE, [WorkerMod, PoolSize, VNodeIndex, WorkerArgs, - WorkerProps, Opts], []). + gen_statem:start_link(?MODULE, [WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps, Opts], []). + +% #1 cast handle_work(Pid, Work, From) -> - gen_fsm_compat:send_event(Pid, {work, Work, From}). + gen_statem:cast(Pid, {work, Work, From}). -stop(Pid, Reason) -> - gen_fsm_compat:sync_send_all_state_event(Pid, {stop, Reason}). +% #2 cast worker_started(Pid) -> - gen_fsm_compat:send_all_state_event(Pid, worker_start). + gen_statem:cast(Pid, worker_start). + +% #3 cast checkin_worker(Pid, WorkerPid) -> - gen_fsm_compat:send_all_state_event(Pid, {checkin, WorkerPid}). + gen_statem:cast(Pid, {checkin, WorkerPid}). + +% #4 call +stop(Pid, Reason) -> + gen_statem:stop(Pid, Reason, infinity). + + +% #5 call %% wait for all the workers to finish any current work +-spec shutdown_pool(pid(), integer()) -> ok | {error, vnode_shutdown}. shutdown_pool(Pid, Wait) -> - gen_fsm_compat:sync_send_all_state_event(Pid, {shutdown, Wait}, infinity). + gen_statem:call(Pid, {shutdown, Wait}, infinity). + + +%% ======================== +%% ======== +%% State, Mode, Init, Terminate +%% ======== +%% ======================== + +-record(state, { + queue :: queue:queue() | list(), + pool :: pid(), + monitors = [] :: list(), + queue_strategy = fifo :: fifo | filo, + shutdown :: undefined | {pid(), reference()} +}). + +callback_mode() -> [state_functions, state_enter]. init([WorkerMod, PoolSize, VNodeIndex, WorkerArgs, WorkerProps, Opts]) -> {ok, Pid} = poolboy:start_link([{worker_module, riak_core_vnode_worker}, - {worker_args, [VNodeIndex, WorkerArgs, WorkerProps, self()]}, - {worker_callback_mod, WorkerMod}, - {size, PoolSize}, {max_overflow, 0}]), - DfltStrategy = application:get_env(riak_core, queue_worker_strategy, fifo), - State = case proplists:get_value(strategy, Opts, DfltStrategy) of + {worker_args, [VNodeIndex, WorkerArgs, WorkerProps, self()]}, + {worker_callback_mod, WorkerMod}, + {size, PoolSize}, {max_overflow, 0}]), + DefaultStrategy = application:get_env(riak_core, queue_worker_strategy, fifo), + State = case proplists:get_value(strategy, Opts, DefaultStrategy) of fifo -> - #state{ - pool = Pid, - queue = queue:new(), - queue_strategy = fifo - }; - filo -> - #state{ - pool = Pid, - queue = [], - queue_strategy = filo - } - end, + #state{ + pool = Pid, + queue = queue:new(), + queue_strategy = fifo + }; + filo -> + #state{ + pool = Pid, + queue = [], + queue_strategy = filo + } + end, {ok, ready, State}. -ready(_Event, _From, State) -> - {reply, ok, ready, State}. +% #4 call +terminate(_Reason, _StateName, #state{pool = Pool}) -> + %% stop poolboy + poolboy:stop(Pool), + ok. + + +code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. + -ready({work, Work, From} = Msg, #state{pool=Pool, monitors=Monitors} = State) -> +%% ======================== +%% ======== +%% States +%% ======== +%% ======================== + +%% ready +%% ======== + +%% enter +ready(enter, _, State) -> + {keep_state, State}; +%% #1 +ready(cast, {work, Work, From} = Msg, #state{pool = Pool, monitors = Monitors} = State) -> case poolboy:checkout(Pool, false) of full -> - {next_state, queueing, in(Msg, State)}; + {next_state, queue, in(Msg, State)}; Pid when is_pid(Pid) -> NewMonitors = monitor_worker(Pid, From, Work, Monitors), riak_core_vnode_worker:handle_work(Pid, Work, From), - {next_state, ready, State#state{monitors=NewMonitors}} + {next_state, ready, State#state{monitors = NewMonitors}} end; -ready(_Event, State) -> - {next_state, ready, State}. - -queueing(_Event, _From, State) -> - {reply, ok, queueing, State}. - -queueing({work, _Work, _From} = Msg, State) -> - {next_state, queueing, in(Msg, State)}; -queueing(_Event, State) -> - {next_state, queueing, State}. - -shutdown(_Event, _From, State) -> - {reply, ok, shutdown, State}. - -shutdown({work, _Work, From}, State) -> - %% tell the process requesting work that we're shutting down +%% #2 +ready(cast, worker_start, State) -> + worker_started(State, ready); +%% #3 +ready(cast, {checkin, WorkerPid}, State) -> + checkin(State, WorkerPid); +%% #5 +ready({call, From}, {shutdown, Wait}, State) -> + %% change to shutdown state with a state_timeout of 'Wait' ms, force after timeout expires + {next_state, shutdown, State#state{shutdown = From}, [{state_timeout, Wait, force_shutdown}]}; +%% info EXIT signal of erlang:monitor(process, Worker) +ready(info, {'DOWN', _Ref, _Type, Pid, Info}, State) -> + {ok, NewState} = exit_worker(State, Pid, Info), + {keep_state, NewState}. + + +%% queueing +%% ======== + +%% enter +queue(enter, _, State) -> {keep_state, State}; +queue(cast, {work, _Work, _From} = Msg, State) -> + {next_state, queue, in(Msg, State)}; +%% #2 +queue(cast, worker_start, State) -> + worker_started(State, queue); +%% #3 +queue(cast, {checkin, WorkerPid}, State) -> + checkin(State, WorkerPid); +%% #5 +queue({call, From}, {shutdown, Wait}, State) -> + %% change to shutdown state with a state_timeout of 'Wait' ms, force after timeout expires + {next_state, shutdown, State#state{shutdown = From}, [{state_timeout, Wait, force_shutdown}]}; +%% info EXIT signal of erlang:monitor(process, Worker) +queue(info, {'DOWN', _Ref, _Type, Pid, Info}, State) -> + {ok, NewState} = exit_worker(State, Pid, Info), + {keep_state, NewState}. + + +%% shutdown +%% ======== + +%% enter +shutdown(enter, _, #state{monitors = Monitors, shutdown = From} = State) -> + discard_queued_work(State), + case Monitors of + [] -> + {stop_and_reply, shutdown, [{reply, From, ok}]}; + _ -> + {keep_state, State#state{queue = new(State)}} + end; +%% force shutdown timeout +shutdown(state_timeout, _, #state{monitors = Monitors, shutdown = FromOrigin}) -> + %% we've waited too long to shutdown, time to force the issue. + _ = [riak_core_vnode:reply(From, {error, vnode_shutdown}) || {_, _, From, _} <- Monitors], + {stop_and_reply, shutdown, [{reply, FromOrigin, {error, vnode_shutdown}}]}; +%% #1 +shutdown(cast, {work, _Work, From}, State) -> riak_core_vnode:reply(From, {error, vnode_shutdown}), - {next_state, shutdown, State}; -shutdown(_Event, State) -> - {next_state, shutdown, State}. - -handle_event({checkin, Pid}, shutdown, #state{pool=Pool, monitors=Monitors0} = State) -> + {keep_state, State}; +%% #2 +shutdown(cast, worker_start, State) -> + worker_started(State, shutdown); +%% #3 +shutdown(cast, {checkin, Pid}, #state{pool = Pool, monitors = Monitors0, shutdown = From} = State) -> Monitors = demonitor_worker(Pid, Monitors0), poolboy:checkin(Pool, Pid), case Monitors of [] -> %% work all done, time to exit! - {stop, shutdown, State}; + {stop_and_reply, shutdown, [{reply, From, ok}]}; _ -> - {next_state, shutdown, State#state{monitors=Monitors}} - end; -handle_event({checkin, Worker}, _, #state{pool = Pool, monitors=Monitors} = State) -> - case out(State) of - {{value, {work, Work, From}}, Rem} -> - %% there is outstanding work to do - instead of checking - %% the worker back in, just hand it more work to do - NewMonitors = monitor_worker(Worker, From, Work, Monitors), - riak_core_vnode_worker:handle_work(Worker, Work, From), - {next_state, queueing, State#state{queue=Rem, - monitors=NewMonitors}}; - {empty, Empty} -> - NewMonitors = demonitor_worker(Worker, Monitors), - poolboy:checkin(Pool, Worker), - {next_state, ready, State#state{queue=Empty, monitors=NewMonitors}} + {keep_state, State#state{monitors = Monitors}} end; -handle_event(worker_start, StateName, #state{pool=Pool, monitors=Monitors}=State) -> - %% a new worker just started - if we have work pending, try to do it - case out(State) of - {{value, {work, Work, From}}, Rem} -> - case poolboy:checkout(Pool, false) of - full -> - {next_state, queueing, State}; - Pid when is_pid(Pid) -> - NewMonitors = monitor_worker(Pid, From, Work, Monitors), - riak_core_vnode_worker:handle_work(Pid, Work, From), - {next_state, queueing, State#state{queue=Rem, monitors=NewMonitors}} - end; - {empty, _} -> - %% StateName might be either 'ready' or 'shutdown' - {next_state, StateName, State} - end; -handle_event(_Event, StateName, State) -> - {next_state, StateName, State}. - -handle_sync_event({stop, Reason}, _From, _StateName, State) -> - {stop, Reason, ok, State}; - -handle_sync_event({shutdown, Time}, From, _StateName, - #state{monitors=Monitors} = State) -> - discard_queued_work(State), - case Monitors of - [] -> - {stop, shutdown, ok, State}; - _ -> - case Time of - infinity -> - ok; - _ when is_integer(Time) -> - erlang:send_after(Time, self(), shutdown), - ok - end, - {next_state, shutdown, State#state{shutdown=From, queue=new(State)}} - end; -handle_sync_event(_Event, _From, StateName, State) -> - {reply, {error, unknown_message}, StateName, State}. - -handle_info({'DOWN', _Ref, _, Pid, Info}, StateName, #state{monitors=Monitors} = State) -> - %% remove the listing for the dead worker - case lists:keyfind(Pid, 1, Monitors) of - {Pid, _, From, Work} -> - riak_core_vnode:reply(From, {error, {worker_crash, Info, Work}}), - NewMonitors = lists:keydelete(Pid, 1, Monitors), - %% trigger to do more work will be 'worker_start' message - %% when poolboy replaces this worker (if not a 'checkin' - %% or 'handle_work') - {next_state, StateName, State#state{monitors=NewMonitors}}; - false -> - {next_state, StateName, State} - end; -handle_info(shutdown, shutdown, #state{monitors=Monitors} = State) -> - %% we've waited too long to shutdown, time to force the issue. - _ = [riak_core_vnode:reply(From, {error, vnode_shutdown}) || - {_, _, From, _} <- Monitors], - {stop, shutdown, State}; -handle_info(_Info, StateName, State) -> - {next_state, StateName, State}. - -terminate(_Reason, _StateName, #state{pool=Pool}) -> - %% stop poolboy - poolboy:stop(Pool), - ok. - -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. +%% #5 +shutdown({call, From}, {shutdown, _Wait}, State) -> + %% duplicate shutdown call + {keep_state, State, [{reply, From, {error, vnode_shutdown}}]}; +%% info EXIT signal of erlang:monitor(process, Worker) +shutdown(info, {'DOWN', _Ref, _, Pid, Info}, State) -> + {ok, NewState} = exit_worker(State, Pid, Info), + {keep_state, NewState}. + +%% ======================== +%% ======== +%% Internal Helper Functions +%% ======== +%% ======================== %% Keep track of which worker we pair with what work/from and monitor the %% worker. Only active workers are tracked @@ -277,10 +281,10 @@ discard_queued_work(State) -> in(Msg, State = #state{queue_strategy = fifo, queue = Q}) -> - State#state{queue=queue:in(Msg, Q)}; + State#state{queue = queue:in(Msg, Q)}; in(Msg, State = #state{queue_strategy = filo, queue = Q}) -> - State#state{queue=[Msg | Q]}. + State#state{queue = [Msg | Q]}. out(#state{queue_strategy = fifo, queue = Q}) -> queue:out(Q); @@ -295,3 +299,49 @@ new(#state{queue_strategy = fifo}) -> new(#state{queue_strategy = filo}) -> []. +worker_started(#state{pool = Pool, monitors = Monitors} = State, StateName) -> + %% a new worker just started - if we have work pending, try to do it + case out(State) of + {{value, {work, Work, From}}, Rem} -> + case poolboy:checkout(Pool, false) of + full -> + {next_state, queue, State}; + Pid when is_pid(Pid) -> + NewMonitors = monitor_worker(Pid, From, Work, Monitors), + riak_core_vnode_worker:handle_work(Pid, Work, From), + {next_state, queue, State#state{queue = Rem, monitors = NewMonitors}} + end; + {empty, _} -> + %% StateName might be either 'ready' or 'shutdown' + {next_state, StateName, State} + end. + + +checkin(#state{pool = Pool, monitors = Monitors} = State, Worker) -> + case out(State) of + {{value, {work, Work, From}}, Rem} -> + %% there is outstanding work to do - instead of checking + %% the worker back in, just hand it more work to do + NewMonitors = monitor_worker(Worker, From, Work, Monitors), + riak_core_vnode_worker:handle_work(Worker, Work, From), + {next_state, queue, State#state{queue = Rem, monitors = NewMonitors}}; + {empty, Empty} -> + NewMonitors = demonitor_worker(Worker, Monitors), + poolboy:checkin(Pool, Worker), + {next_state, ready, State#state{queue = Empty, monitors = NewMonitors}} + end. + + +exit_worker(#state{monitors = Monitors} = State, Pid, Info) -> + %% remove the listing for the dead worker + case lists:keyfind(Pid, 1, Monitors) of + {Pid, _, From, Work} -> + riak_core_vnode:reply(From, {error, {worker_crash, Info, Work}}), + NewMonitors = lists:keydelete(Pid, 1, Monitors), + %% trigger to do more work will be 'worker_start' message + %% when poolboy replaces this worker (if not a 'checkin' or 'handle_work') + {ok, State#state{monitors = NewMonitors}}; + false -> + {ok, State} + end. + diff --git a/test/worker_pool_test.erl b/test/worker_pool_test.erl index c85202aaa..76c85676a 100644 --- a/test/worker_pool_test.erl +++ b/test/worker_pool_test.erl @@ -20,23 +20,23 @@ -module(worker_pool_test). -behaviour(riak_core_vnode_worker). --include_lib("eunit/include/eunit.hrl"). -export([init_worker/3, handle_work/3]). -init_worker(_VnodeIndex, Noreply, _WorkerProps) -> - {ok, Noreply}. +init_worker(_VnodeIndex, DoReply, _WorkerProps) -> + {ok, DoReply}. -handle_work(Work, From, true = State) -> +handle_work(Work, _From, false = DoReply) -> Work(), - riak_core_vnode:reply(From, ok), - {noreply, State}; -handle_work(Work, _From, false = State) -> + {noreply, DoReply}; +handle_work(Work, _From, true = DoReply) -> Work(), - {reply, ok, State}. + {reply, ok, DoReply}. -ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + receive_result(N) -> receive {N, ok} when N rem 2 /= 0 -> @@ -48,38 +48,79 @@ receive_result(N) -> timeout end. -simple_worker_pool() -> - {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, false, []), +simple_reply_worker_pool() -> + {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, true, []), [ riak_core_vnode_worker_pool:handle_work(Pool, fun() -> - timer:sleep(100), + timer:sleep(10), 1/(N rem 2) end, {raw, N, self()}) || N <- lists:seq(1, 10)], - timer:sleep(1200), + timer:sleep(200), - %% make sure we got all the expected responses + %% make sure we got all replies [ ?assertEqual(true, receive_result(N)) || N <- lists:seq(1, 10)], unlink(Pool), - riak_core_vnode_worker_pool:stop(Pool, normal). + ok = riak_core_vnode_worker_pool:stop(Pool, normal), + ok = wait_for_process_death(Pool). simple_noreply_worker_pool() -> - {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, true, []), + {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, false, []), [ riak_core_vnode_worker_pool:handle_work(Pool, fun() -> - timer:sleep(100), + timer:sleep(10), 1/(N rem 2) end, {raw, N, self()}) || N <- lists:seq(1, 10)], - timer:sleep(1200), + timer:sleep(200), - %% make sure we got all the expected responses + %% make sure that the non-crashing work calls receive timeouts + [ ?assertEqual(timeout, receive_result(N)) || N <- lists:seq(1, 10), N rem 2 == 1], + [ ?assertEqual(true, receive_result(N)) || N <- lists:seq(1, 10), N rem 2 == 0], - [ ?assertEqual(true, receive_result(N)) || N <- lists:seq(1, 10)], unlink(Pool), - riak_core_vnode_worker_pool:stop(Pool, normal). + ok = riak_core_vnode_worker_pool:stop(Pool, normal), + ok = wait_for_process_death(Pool). + +shutdown_pool_empty_success() -> + {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, false, []), + unlink(Pool), + ok = riak_core_vnode_worker_pool:shutdown_pool(Pool, 100), + ok = wait_for_process_death(Pool), + ok. + +shutdown_pool_worker_finish_success() -> + {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, false, []), + riak_core_vnode_worker_pool:handle_work(Pool, fun() -> timer:sleep(50) end, {raw, 1, self()}), + unlink(Pool), + ok = riak_core_vnode_worker_pool:shutdown_pool(Pool, 100), + ok = wait_for_process_death(Pool), + ok. + +shutdown_pool_force_timeout() -> + {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, false, []), + riak_core_vnode_worker_pool:handle_work(Pool, fun() -> timer:sleep(100) end, {raw, 1, self()}), + unlink(Pool), + {error, vnode_shutdown} = riak_core_vnode_worker_pool:shutdown_pool(Pool, 50), + ok = wait_for_process_death(Pool), + ok. + +shutdown_pool_duplicate_calls() -> + {ok, Pool} = riak_core_vnode_worker_pool:start_link(?MODULE, 3, 10, false, []), + riak_core_vnode_worker_pool:handle_work(Pool, fun() -> timer:sleep(100) end, {raw, 1, self()}), + unlink(Pool), + + %% request shutdown a bit later a second time + spawn_link(fun() -> + timer:sleep(30), + {error, vnode_shutdown} = riak_core_vnode_worker_pool:shutdown_pool(Pool, 50) + end), + + {error, vnode_shutdown} = riak_core_vnode_worker_pool:shutdown_pool(Pool, 50), + ok = wait_for_process_death(Pool), + ok. pool_test_() -> @@ -87,9 +128,21 @@ pool_test_() -> fun() -> error_logger:tty(false) end, fun(_) -> error_logger:tty(true) end, [ - fun simple_worker_pool/0, - fun simple_noreply_worker_pool/0 - ] + fun simple_reply_worker_pool/0, + fun simple_noreply_worker_pool/0, + fun shutdown_pool_empty_success/0, + fun shutdown_pool_worker_finish_success/0, + fun shutdown_pool_force_timeout/0, + fun shutdown_pool_duplicate_calls/0 + ] }. +wait_for_process_death(Pid) -> + wait_for_process_death(Pid, is_process_alive(Pid)). + +wait_for_process_death(Pid, true) -> + wait_for_process_death(Pid, is_process_alive(Pid)); +wait_for_process_death(_Pid, false) -> + ok. + -endif.