Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: deferred-start Futures #108

Open
mbovel opened this issue Nov 25, 2024 · 5 comments
Open

Feature request: deferred-start Futures #108

mbovel opened this issue Nov 25, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@mbovel
Copy link
Member

mbovel commented Nov 25, 2024

Could we add a Future constructor in the gears library that allows computations to be defined without starting them immediately?

This would enable defining a DAG of inter-dependent asynchronous tasks in an arbitrary order. For example, in pseudo-code:

t0 = t1.result + t2.result
t1 = t3.result + 4
t2 = t3.result + 2
t3 = 1

The key requirement here is that tasks should not execute until explicitly triggered, allowing dependencies to be set up first.

Below is a working user-land implementation:

// async-lazy.scala
//> using dep "ch.epfl.lamp::gears::0.2.0"

import gears.async.{Future, Async}
import gears.async.default.given
import Async.await

final class AsyncLazy[T](body: (Async, Async.Spawn) ?=> T):
  private var _future: Future[T] | Null = null

  def get()(using Async, Async.Spawn): T =
    if _future == null then
      this.synchronized:
        if _future == null then
          _future = Future(body)
    _future.nn.await

@main def main =
  println("starting")
  val a = new Array(4)
  a(0) = AsyncLazy:
    println("compute a(0)")
    a(1).get() + a(2).get()
  a(1) = AsyncLazy:
    println("compute a(1)")
    a(3).get() + 4
  a(2) = AsyncLazy:
    println("compute a(2)")
    a(3).get() + 2
  a(3) = AsyncLazy:
    println("compute a(3)")
    1

  Async.blocking:
    println(a(0).get())

Running this code produces the following output:

$ scala -S 3.5.1 --server=false async-lazy.scala
starting
compute a(0)
compute a(1)
compute a(3)
compute a(2)
8

I believe a dedicated feature in gears could achieve something similar more efficiently, by saving the extra synchronization.

A Future constructor could return a deferred-start Future, of which the execution begins only when explicitly triggered. In the example above, one could then loop over Futures to start them after they have all been defined.

@mbovel
Copy link
Member Author

mbovel commented Nov 27, 2024

Discussed today with @natsukagami, @odersky and @EugeneFlesselle.

We could use lazy val to implement what I have above, if the async context is passed as a class argument:

//> using dep "ch.epfl.lamp::gears::0.2.0"

import scala.concurrent.duration.given

import gears.async.{Future, Async, AsyncOperations}
import gears.async.default.given
import Async.await

final class AsyncLazy[T](body: (Async, Async.Spawn) ?=> T)(using Async, Async.Spawn):
  lazy val future: Future[T] = Future(body)
  def get(): T = future.await

@main def main =

  Async.blocking:
    println("starting")

    val a = new Array[AsyncLazy[Int]](4)

    a(0) = AsyncLazy:
      println("compute a(0)")
      a(1).get() + a(2).get()
    a(1) = AsyncLazy:
      println("compute a(1)")
      a(3).get() + 4
    a(2) = AsyncLazy:
      println("compute a(2)")
      a(3).get() + 2
    a(3) = AsyncLazy:
      println("compute a(3)")
      1

    println("initialized")
    println(a(0).get())

That's nicer than what I have above, but it still requires extra locking/synchronization compared to what we would get with #109.

Another drawback of the "lazy" approach in both snippets here where Futures are started on a per-need basis, as opposed to #109 where Future are started manually, is that it might under-use ressources. Considering all results are needed anyway and we have several threads available, it is better to start computing as many results as possible eagerly.

@mbovel
Copy link
Member Author

mbovel commented Nov 29, 2024

Discussed today with @vkuncak.

For this simple example, we can also tie the knot using a lazy val, allowing to use an immutable array:

//> using dep "ch.epfl.lamp::gears::0.2.0"

import scala.concurrent.duration.given

import gears.async.{Future, Async, AsyncOperations}
import gears.async.default.given
import Async.await

final class AsyncLazy[T](body: (Async, Async.Spawn) ?=> T)(using Async, Async.Spawn):
  lazy val future: Future[T] = Future(body)
  def get(): T = future.await

@main def main =

  Async.blocking:
    println("starting")

    lazy val a: IArray[AsyncLazy[Int]] =
      IArray.tabulate(4):
        case 0 => AsyncLazy:
          println("compute a(0)")
          a(1).get() + a(2).get()
        case 1 => AsyncLazy:
          println("compute a(1)")
          a(3).get() + 4
        case 2 => AsyncLazy:
          println("compute a(2)")
          a(3).get() + 2
        case 3 => AsyncLazy:
          println("compute a(3)")
          1
    
    println("initialized")
    println(a(0).get())

Or we can define each task as lazy val directly, without the AsyncLazy wrapper:

//> using dep "ch.epfl.lamp::gears::0.2.0"

import scala.concurrent.duration.given

import gears.async.{Future, Async, AsyncOperations}
import gears.async.default.given
import Async.await

@main def main =

  Async.blocking:
    println("starting")

    lazy val a0 = Future:
      println("compute a(0)")
      a1.await + a2.await
    lazy val a1 = Future:
      println("compute a(1)")
      a3.await + 4
    lazy val a2 = Future:
      println("compute a(2)")
      a3.await + 2
    lazy val a3 = Future:
      println("compute a(3)")
      1

    println("initialized")
    println(a0.await)

@vkuncak
Copy link

vkuncak commented Nov 29, 2024

There may be locking that is much more global than what is desired in the above examples.

@mbovel
Copy link
Member Author

mbovel commented Nov 30, 2024

There may be locking that is much more global than what is desired in the above examples.

Right, one should use awaitAll for the Futures to actually run in parallel:

//> using dep "ch.epfl.lamp::gears::0.2.0"

import scala.concurrent.duration.given

import gears.async.{Future, Async, AsyncOperations}
import gears.async.default.given
import Async.await

@main def main =
  Async.blocking:
    println("starting")
    lazy val a0 = Future:
      trace("a0"):
        a1.await + a2.await
    lazy val a1 = Future:
      trace("a1"):
        AsyncOperations.sleep(3.seconds)
        a3.await + 4
    lazy val a2 = Future:
      trace("a2"):
        AsyncOperations.sleep(3.seconds)
        a3.await + 2
    lazy val a3 = Future:
      trace("a3"):
        1
    
    println("initialized")
    time:
      println(List(a0, a1, a2, a3).awaitAll)

def trace[T](name: String)(body: => T): T =
  println(s"start $name")
  val res = body
  println(s"end $name")
  res

def time[T](body: => T): T =
  val start = System.currentTimeMillis() / 1_000.0
  val res = body
  val end = System.currentTimeMillis() / 1_000.0
  println(f"${end - start}%.0f seconds")
  res

This runs in 3 seconds. Better, isn't it?

@natsukagami
Copy link
Contributor

I agree that @mbovel 's solution is the most correct one, though it would be nice not to have to write .awaitAll at the end. Overall I think the issue is a bit muddled: there seems to be a bit of overlap between

  • "I want a Future that doesn't start until I need it" -- this is what lazy val gives you, but on the other hand it might cause undesired delays (such as when we only do a0.await in the above example).
  • "I want a Future that doesn't start and just hold all listeners until I start it (manually or otherwise)" -- this is what Decouple Future initialization from its computation #109 is doing. This gives you a bit more control over lazy val Futures, at the expense of having to manually start the deferred futures (or risk just immediately deadlocking the program). However given that one would want to .awaitAll the lazy val Futures anyway I think it's not a big deal.

I was thinking we could do something like a deferrable scope, where Futures spawn in it do not run until some .await is triggered:

@main def main =
  Async.blocking:
    println("starting")
    Async.deferrable:
      val a0 = Future.deferred:
        trace("a0"):
          a1.await + a2.await
      val a1 = Future.deferred:
        trace("a1"):
          AsyncOperations.sleep(3.seconds)
          a3.await + 4
      val a2 = Future.deferred:
        trace("a2"):
          AsyncOperations.sleep(3.seconds)
          a3.await + 2
      val a3 = Future.deferred:
        trace("a3"):
          1
      a0.await // .await called, start all futures

(note that if a non-deferred Future is created and it awaits one of the deferred futures, all futures will start too)

This would solve the problem of having to .awaitAll all created futures, while trying to utilize the underlying synchronization built into the Futures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants