Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull timeout #2418

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
# period if you are using Kubernetes.
# RUN_GRACE_PERIOD_SECONDS=10
# WORKER_MAX_RUN_DURATION_SECONDS=300
# WORKER_MAX_PULL_TIMEOUT_SECONDS=10

# TODO: these aren't specified in Runtime, do they belong to the worker process?
# WORKER_MAX_RUN_MEMORY_MB=500
Expand Down
6 changes: 6 additions & 0 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ defmodule Lightning.Config do
Application.get_env(:lightning, :run_grace_period_seconds)
end

@impl true
def max_pull_timeout_seconds do
Application.get_env(:lightning, :max_pull_timeout_seconds)
end

@impl true
def default_max_run_duration do
Application.get_env(:lightning, :max_run_duration_seconds)
Expand Down Expand Up @@ -164,6 +169,7 @@ defmodule Lightning.Config do
@callback email_sender_name() :: String.t()
@callback get_extension_mod(key :: atom()) :: any()
@callback grace_period() :: integer()
@callback max_pull_timeout_seconds() :: integer()
@callback instance_admin_email() :: String.t()
@callback kafka_duplicate_tracking_retention_seconds() :: integer()
@callback kafka_number_of_consumers() :: integer()
Expand Down
4 changes: 4 additions & 0 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ defmodule Lightning.Config.Bootstrap do
src: env!("PLAUSIBLE_SRC", :string, nil),
data_domain: env!("PLAUSIBLE_DATA_DOMAIN", :string, nil)

config :lightning,
:max_pull_timeout_seconds,
env!("WORKER_MAX_PULL_TIMEOUT_SECONDS", :integer, 30)

config :lightning,
:run_grace_period_seconds,
env!("RUN_GRACE_PERIOD_SECONDS", :integer, 10)
Expand Down
26 changes: 25 additions & 1 deletion lib/lightning/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ defmodule Lightning.Janitor do
by the Oban cron plugin.
"""
@impl Oban.Worker
def perform(%Oban.Job{}), do: find_and_update_lost()
def perform(%Oban.Job{}), do: chores()

defp chores do
forfeit_expired_claims()
find_and_update_lost()
end

@doc """
The find_and_update_lost function determines the current time, finds all
Expand All @@ -44,4 +49,23 @@ defmodule Lightning.Janitor do
|> Stream.run()
end)
end

@doc """
The find_and_update_lost function determines the current time, finds all
runs that were claimed before the earliest allowable claim time for
unfinished runs, and marks them as lost.
"""
def forfeit_expired_claims do
stream =
Runs.Query.forfeited()
|> Repo.stream()

Repo.transaction(fn ->
stream
|> Stream.each(fn run ->
Runs.forfeit_claim(run)
end)
|> Stream.run()
end)
end
end
12 changes: 12 additions & 0 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,18 @@ defmodule Lightning.Runs do
end)
end

@spec forfeit_claim(Lightning.Run.t()) ::
{:ok, any()} | {:error, any()}
def forfeit_claim(%Run{} = run) do
Logger.warning(fn ->
"Detected forfeit run: #{inspect(run)}"
end)

run
|> Ecto.Changeset.change(state: "available")
|> Repo.update()
end

defdelegate subscribe(run), to: Events

def get_project_id_for_run(run) do
Expand Down
19 changes: 19 additions & 0 deletions lib/lightning/runs/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ defmodule Lightning.Runs.Query do

require Lightning.Run

@spec forfeited :: Ecto.Queryable.t()
def forfeited do
now = Lightning.current_time()

max_pull_timeout_seconds = Lightning.Config.max_pull_timeout_seconds()
grace_period_ms = Lightning.Config.grace_period() * 1000

fallback_oldest_claim =
now
|> DateTime.add(-max_pull_timeout_seconds, :second)
|> DateTime.add(-grace_period_ms, :millisecond)

from(r in Run,
where: r.claimed_at < ^fallback_oldest_claim,
where: is_nil(r.started_at),
where: r.state == "claimed"
)
end

@doc """
Return all runs that have been claimed by a worker before the earliest
acceptable start time (determined by the run options and grace period) but are
Expand Down
5 changes: 5 additions & 0 deletions lib/lightning_web/channels/worker_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ defmodule LightningWeb.WorkerChannel do
runs =
runs
|> Enum.map(fn run ->
# in heavy load this averages over 10 seconds and causes all runs to
# be marked as lost
# dbg("there was a run and we set it to started")
# :timer.sleep(11_000)

opts = run_options(run)

token =
Expand Down
10 changes: 10 additions & 0 deletions test/lightning/janitor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ defmodule Lightning.JanitorTest do
alias Lightning.Repo
alias Lightning.Run

describe "forfeit_expired_claims/0" do
test "releases runs for reclaim if they have not been started after the pull timeout plus grace" do
dbg("eish, i don't love this. what if there was some other reason for them getting lost?")
dbg("like, what if they did start, and did the work, and all that, but we never heard back from them because of network issues?")
dbg("i wouldn't want to re-do the work.")
dbg("i wish there was some way to only mark them as claimed once we know that the worker actually got the run.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 If I may chip in (uninvited):

As you mentioned yesterday the janitor approach is a band-aid until a better fix is available. If I was applying the bandaid, I would not tie it to the timeout value - I would make it a large multiple of the timeout. E.g. if your timeout is 20 sec, maybe wait 10 min (after all you wait 2 South Effrican hours to determine if something is lost, right?). And, notify Sentry whenever this happens, so that we have an opportunity to take a look at what is going on and tune things. Wrt the concern of potentially redoing work, you could make this something that is user-controllable (i.e. if a Run is 'LostAfterClaimed' would you like us to automagically resubmit it - or would you rather do that manually?).

A longer-term fix (which I can only assume will mean a fair amount of work) would be to allow a run to be claimed by multiple worker processes but only started by one (whoever gets there first). This obviously creates inefficiency elsewhere. Personally, I would do the bandaid if it helps us satisfy the immediate requirements and then identify the issue - e.g. maybe it is a simple as scaling up web pods alongside worker pods?

dbg("can we check that our reply in the websocket channel was actually received by the ws-worker?")
end
end

describe "find_and_update_lost/0" do
@tag :capture_log
test "updates lost runs and their steps" do
Expand Down