Skip to content

Commit

Permalink
feat: implement handle_cancelled/3 worker callback
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Mar 21, 2024
1 parent deae8ff commit ed72a94
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 2 deletions.
43 changes: 41 additions & 2 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,38 @@ defmodule NimblePool do
pool_state
) :: :ok

@doc """
Handle cancelled checkout requests.
This callback is executed when a checkout request is cancelled unexpectedly.
The context argument may be `:queued` or `:checked_out`:
* `:queued` means the cancellation happened before resource checkout. This may happen
when the pool is starving under load and can not serve resources. Since no checkout
happened the worker_state argument will be `nil`.
* `:checked_out` means the cancellation happened after resource checkout. This may happen
when the function given to `checkout!/4` raises.
This callback is optional.
"""
@doc callback: :worker
@callback handle_cancelled(
worker_state :: worker_state | nil,
pool_state,
context :: :queued | :checked_out
) :: :ok

@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
terminate_worker: 3,
terminate_pool: 2
terminate_pool: 2,
handle_cancelled: 3

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -745,14 +769,29 @@ defmodule NimblePool do
{:noreply, %{state | resources: resources, async: async, state: pool_state}}
end

defp cancel_request_ref(ref, reason, %{requests: requests} = state) do
defp cancel_request_ref(
ref,
reason,
%{requests: requests, worker: worker, state: pool_state} = state
) do

case requests do
# Exited or timed out before we could serve it
%{^ref => {_, mon_ref, :command, _command, _deadline}} ->
if function_exported?(worker, :handle_cancelled, 3) do
args = [nil, pool_state, :queued]
apply_worker_callback(worker, :handle_cancelled, args)
end

{:noreply, remove_request(state, ref, mon_ref)}

# Exited or errored during client processing
%{^ref => {_, mon_ref, :state, worker_server_state}} ->
if function_exported?(worker, :handle_cancelled, 3) do
args = [worker_server_state, pool_state, :checked_out]
apply_worker_callback(worker, :handle_cancelled, args)
end

state = remove_request(state, ref, mon_ref)
{:noreply, remove_worker(reason, worker_server_state, state)}

Expand Down
91 changes: 91 additions & 0 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ defmodule NimblePoolTest do
def terminate_pool(reason, pool_state) do
TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state])
end

def handle_cancelled(worker_state, pool_state, context) do
TestAgent.next(pool_state, :handle_cancelled, [worker_state, pool_state, context])
end
end

defp stateless_pool!(instructions, opts \\ []) do
Expand Down Expand Up @@ -1019,6 +1023,7 @@ defmodule NimblePoolTest do
handle_checkout: fn :checkout, _from, _next, pool_state ->
{:ok, :client_state_out, :server_state_out, pool_state}
end,
handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end,
terminate_worker: fn reason, :server_state_out, state ->
send(parent, {:terminate, reason})
{:ok, state}
Expand Down Expand Up @@ -1054,6 +1059,7 @@ defmodule NimblePoolTest do
handle_update: fn :update, _next, pool_state ->
{:ok, :updated_state, pool_state}
end,
handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end,
terminate_worker: fn reason, :updated_state, pool_state ->
send(parent, {:terminate, reason})
{:ok, pool_state}
Expand Down Expand Up @@ -1680,4 +1686,89 @@ defmodule NimblePoolTest do
assert termination_time_pool > termination_time_worker
end
end

describe "handle_cancelled" do
test "should run when client raise after checkout" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_cancelled: fn worker_state, _pool_state, :checked_out ->
send(parent, {:ping, worker_state})
:ok
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

assert_raise(
RuntimeError,
fn ->
NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
raise "unexpected error"
end)
end
)

assert_receive({:ping, :worker1})

NimblePool.stop(pool, :shutdown)
end

test "should run when checkout timeout" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_cancelled: fn nil, _pool_state, :queued ->
send(parent, {:ping, nil})
:ok
end,
handle_checkin: fn :client_state_in, _from, next, pool_state ->
{:ok, next, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end
],
pool_size: 1
)

task1 =
Task.async(fn ->
NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
send(parent, :lock)
assert_receive :release
{:result, :client_state_in}
end)
end)

assert_receive :lock

assert {:timeout, {NimblePool, :checkout, _}} =
catch_exit(
NimblePool.checkout!(
pool,
:checkout,
fn _ref, :client_state_out -> raise "should never execute this line" end,
1
)
)

send(task1.pid, :release)

assert_receive({:ping, nil})

NimblePool.stop(pool, :shutdown)
end
end
end

0 comments on commit ed72a94

Please sign in to comment.