Skip to content

Add support for stop(cause:). #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,45 @@
module Async
# Raised when a task is explicitly stopped.
class Stop < Exception
# Represents the source of the stop operation.
class Cause < Exception
if RUBY_VERSION >= "3.4"
# @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller.
def self.backtrace
caller_locations(2..-1)
end
else
# @returns [Array(String)] The backtrace of the caller.
def self.backtrace
caller(2..-1)
end
end

# Create a new cause of the stop operation, with the given message.
#
# @parameter message [String] The error message.
# @returns [Cause] The cause of the stop operation.
def self.for(message = "Task was stopped")
instance = self.new(message)
instance.set_backtrace(self.backtrace)
return instance
end
end

# Create a new stop operation.
def initialize(message = "Task was stopped")
super(message)
end

# Used to defer stopping the current task until later.
class Later
# Create a new stop later operation.
#
# @parameter task [Task] The task to stop later.
def initialize(task)
# @parameter cause [Exception] The cause of the stop operation.
def initialize(task, cause = nil)
@task = task
@cause = cause
end

# @returns [Boolean] Whether the task is alive.
Expand All @@ -34,7 +66,7 @@ def alive?

# Transfer control to the operation - this will stop the task.
def transfer
@task.stop
@task.stop(false, cause: @cause)
end
end
end
Expand Down Expand Up @@ -266,7 +298,13 @@ def wait
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
#
# @parameter later [Boolean] Whether to stop the task later, or immediately.
def stop(later = false)
# @parameter cause [Exception] The cause of the stop operation.
def stop(later = false, cause: $!)
# If no cause is given, we generate one from the current call stack:
unless cause
cause = Stop::Cause.for("Stopping task!")
end

if self.stopped?
# If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry.
return stopped!
Expand All @@ -280,27 +318,27 @@ def stop(later = false)
# If we are deferring stop...
if @defer_stop == false
# Don't stop now... but update the state so we know we need to stop later.
@defer_stop = true
@defer_stop = cause
return false
end

if self.current?
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
if later
# If the fiber is the current fiber and we want to stop it later, schedule it:
Fiber.scheduler.push(Stop::Later.new(self))
Fiber.scheduler.push(Stop::Later.new(self, cause))
else
# Otherwise, raise the exception directly:
raise Stop, "Stopping current task!"
raise Stop, "Stopping current task!", cause: cause
end
else
# If the fiber is not curent, we can raise the exception directly:
begin
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
Fiber.scheduler.raise(@fiber, Stop)
Fiber.scheduler.raise(@fiber, Stop, cause: cause)
rescue FiberError
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
Fiber.scheduler.push(Stop::Later.new(self))
Fiber.scheduler.push(Stop::Later.new(self, cause))
end
end
else
Expand Down Expand Up @@ -340,7 +378,7 @@ def defer_stop

# If we were asked to stop, we should do so now:
if defer_stop
raise Stop, "Stopping current task (was deferred)!"
raise Stop, "Stopping current task (was deferred)!", cause: defer_stop
end
end
else
Expand All @@ -351,7 +389,7 @@ def defer_stop

# @returns [Boolean] Whether stop has been deferred.
def stop_deferred?
@defer_stop
!!@defer_stop
end

# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
Expand Down
22 changes: 21 additions & 1 deletion test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,26 @@
expect(transient).to be(:running?)
end.wait
end

it "can stop a task and provide a cause" do
error = nil

cause = Async::Stop::Cause.for("boom")

task = reactor.async do |task|
begin
task.stop(cause: cause)
rescue Async::Stop => error
raise
end
end

reactor.run

expect(task).to be(:stopped?)
expect(error).to be_a(Async::Stop)
expect(error.cause).to be == cause
end
end

with "#sleep" do
Expand Down Expand Up @@ -910,7 +930,7 @@ def sleep_forever

reactor.run_once(0)

expect(child_task.stop_deferred?).to be == nil
expect(child_task.stop_deferred?).to be == false
end
end

Expand Down
Loading