Skip to content

Commit

Permalink
Support timeout in return value
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenvanvliet committed May 12, 2023
1 parent 180a441 commit 1a0f4a6
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions lib/gen_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,9 @@ defmodule GenStage do
end
The returned tuple may also contain 3 or 4 elements. The third
element may be the `:hibernate` atom or a set of options defined
below.
element may be a set of options defined below. The fourth element
is a timeout, the `:hibernate` atom or a `:continue` tuple. See
the return values for `c:GenServer.init/1` for more information.
Returning `:ignore` will cause `start_link/3` to return `:ignore`
and the process will exit normally without entering the loop or
Expand Down Expand Up @@ -910,14 +911,14 @@ defmodule GenStage do
@callback init(args :: term) ::
{:producer, state}
| {:producer, state, [producer_option]}
| {:producer, state, [producer_option], {:continue, term} | :hibernate}
| {:producer, state, [producer_option], timeout() | {:continue, term} | :hibernate}
| {:producer_consumer, state}
| {:producer_consumer, state, [producer_consumer_option]}
| {:producer_consumer, state, [producer_consumer_option],
{:continue, term} | :hibernate}
timeout() | {:continue, term} | :hibernate}
| {:consumer, state}
| {:consumer, state, [consumer_option]}
| {:consumer, state, [consumer_option], {:continue, term} | :hibernate}
| {:consumer, state, [consumer_option], timeout() | {:continue, term} | :hibernate}
| :ignore
| {:stop, reason :: any}
when state: any
Expand Down Expand Up @@ -999,6 +1000,7 @@ defmodule GenStage do
"""
@callback handle_demand(demand :: pos_integer, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, new_state}
Expand Down Expand Up @@ -1079,6 +1081,7 @@ defmodule GenStage do
state :: term
) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, new_state}
Expand All @@ -1093,6 +1096,7 @@ defmodule GenStage do
"""
@callback handle_events(events :: [event], from, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, new_state}
Expand Down Expand Up @@ -1135,9 +1139,11 @@ defmodule GenStage do
"""
@callback handle_call(request :: term, from :: GenServer.from(), state :: term) ::
{:reply, reply, [event], new_state}
| {:reply, reply, [event], new_state, timeout()}
| {:reply, reply, [event], new_state, :hibernate}
| {:reply, reply, [event], new_state, {:continue, term}}
| {:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, reply, new_state}
Expand All @@ -1154,12 +1160,21 @@ defmodule GenStage do
the loop with new state `new_state`. Only `:producer` and `:producer_consumer`
stages can return a non-empty list of events.
Returning `{:noreply, [event], state, timeout}` is similar to `{:noreply, state}`
, except that it also sets a timeout. See the "Timeouts" section in the
`GenServer` documentation for more information.
Returning `{:noreply, [event], new_state, :hibernate}` is similar to
`{:noreply, new_state}` except the process is hibernated before continuing the
loop. See the return values for `c:GenServer.handle_call/3` for more information
on hibernation. Only `:producer` and `:producer_consumer` stages can return a
non-empty list of events.
Returning `{:noreply, [event], new_state, {:continue, continue_arg}}` is similar
to `{:noreply, new_state}` except that immediately after entering the loop, the
`c:handle_continue/2` callback will be invoked with `continue_arg` as the first
argument and `state` as the second one.
Returning `{:stop, reason, new_state}` stops the loop and `terminate/2` is
called with the reason `reason` and state `new_state`. The process exits with
reason `reason`.
Expand All @@ -1169,6 +1184,7 @@ defmodule GenStage do
"""
@callback handle_cast(request :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason :: term, new_state}
Expand All @@ -1190,6 +1206,7 @@ defmodule GenStage do
"""
@callback handle_info(message :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason :: term, new_state}
Expand All @@ -1212,6 +1229,7 @@ defmodule GenStage do
"""
@callback handle_continue(continue :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, timeout()}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason :: term, new_state}
Expand Down Expand Up @@ -1995,12 +2013,20 @@ defmodule GenStage do
{:noreply, stage, {:continue, _term} = continue} ->
{:ok, stage, continue}

{:noreply, stage, timeout} ->
{:ok, stage, timeout}

{:stop, reason, stage} ->
{:stop, reason, stage}
end
end

defp handle_gen_server_init_args(:hibernate, stage), do: {:ok, stage, :hibernate}

defp handle_gen_server_init_args(timeout, stage)
when (is_integer(timeout) and timeout >= 0) or timeout == :infinity,
do: {:ok, stage, timeout}

defp handle_gen_server_init_args(nil, stage), do: {:ok, stage}

@doc false
Expand Down Expand Up @@ -2041,6 +2067,10 @@ defmodule GenStage do
stage = dispatch_events(events, length(events), %{stage | state: state})
{:reply, reply, stage, continue}

{:reply, reply, events, state, timeout} ->
stage = dispatch_events(events, length(events), %{stage | state: state})
{:reply, reply, stage, timeout}

{:stop, reason, reply, state} ->
{:stop, reason, reply, %{stage | state: state}}

Expand Down Expand Up @@ -2338,6 +2368,10 @@ defmodule GenStage do
stage = dispatch_events(events, length(events), %{stage | state: state})
{:noreply, stage, continue}

{:noreply, events, state, timeout} when is_list(events) ->
stage = dispatch_events(events, length(events), %{stage | state: state})
{:noreply, stage, timeout}

{:stop, reason, state} ->
{:stop, reason, %{stage | state: state}}

Expand Down Expand Up @@ -2699,6 +2733,11 @@ defmodule GenStage do
ask(from, ask, [:noconnect])
consumer_dispatch(batches, from, mod, state, stage, continue)

{:noreply, events, state, timeout} ->
stage = dispatch_events(events, length(events), stage)
ask(from, ask, [:noconnect])
consumer_dispatch(batches, from, mod, state, stage, timeout)

{:stop, reason, state} ->
{:stop, reason, %{stage | state: state}}

Expand Down

0 comments on commit 1a0f4a6

Please sign in to comment.