diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index d31d3812..4190e99d 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -6,6 +6,7 @@ import java.lang.invoke.{MethodHandles, VarHandle} import java.util.concurrent.locks.ReentrantLock import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration +import scala.annotation.constructorOnly object VThreadScheduler extends Scheduler: private val VTFactory = Thread @@ -21,7 +22,7 @@ object VThreadScheduler extends Scheduler: val sr = ScheduledRunnable(delay, body) () => sr.cancel() - private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable^) extends Cancellable { + private class ScheduledRunnable(delay: FiniteDuration, body: Runnable^) extends Cancellable { @volatile var interruptGuard = true // to avoid interrupting the body val th = VTFactory.newThread: () => diff --git a/shared/src/main/scala/async/Cancellable.scala b/shared/src/main/scala/async/Cancellable.scala index 15dc07b4..ab9d1cce 100644 --- a/shared/src/main/scala/async/Cancellable.scala +++ b/shared/src/main/scala/async/Cancellable.scala @@ -1,8 +1,12 @@ package gears.async +import language.experimental.captureChecking + +import java.util.concurrent.atomic.AtomicLong + /** A trait for cancellable entities that can be grouped. */ trait Cancellable: - + val id = Cancellable.Id() private var group: CompletionGroup = CompletionGroup.Unlinked /** Issue a cancel request */ @@ -28,6 +32,11 @@ trait Cancellable: end Cancellable object Cancellable: + opaque type Id = Long + private object Id: + private val gen = AtomicLong(0) + def apply(): Id = gen.incrementAndGet() + /** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */ trait Tracking extends Cancellable: def isCancelled: Boolean diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index 0a10b877..7f5700c9 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -1,14 +1,17 @@ package gears.async +import language.experimental.captureChecking + import scala.collection.mutable import scala.util.Success import Future.Promise +import scala.annotation.unchecked.uncheckedCaptures /** A group of cancellable objects that are completed together. Cancelling the group means cancelling all its * uncompleted members. */ class CompletionGroup extends Cancellable.Tracking: - private val members: mutable.Set[Cancellable] = mutable.Set() + private val members: mutable.Set[(Cancellable^) @uncheckedCaptures] = mutable.Set[(Cancellable^) @uncheckedCaptures]() private var canceled: Boolean = false private var cancelWait: Option[Promise[Unit]] = None @@ -29,14 +32,14 @@ class CompletionGroup extends Cancellable.Tracking: unlink() /** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */ - def add(member: Cancellable): Unit = + def add(member: Cancellable^): Unit = val alreadyCancelled = synchronized: members += member // Add this member no matter what since we'll wait for it still canceled if alreadyCancelled then member.cancel() /** Remove given member from the members set if it is an element */ - def drop(member: Cancellable): Unit = synchronized: + def drop(member: Cancellable^): Unit = synchronized: members -= member if members.isEmpty && cancelWait.isDefined then cancelWait.get.complete(Success(())) @@ -50,8 +53,8 @@ object CompletionGroup: object Unlinked extends CompletionGroup: override def cancel(): Unit = () override def waitCompletion()(using Async): Unit = () - override def add(member: Cancellable): Unit = () - override def drop(member: Cancellable): Unit = () + override def add(member: Cancellable^): Unit = () + override def drop(member: Cancellable^): Unit = () end Unlinked end CompletionGroup