Skip to content

Commit

Permalink
evalOnExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
homycdev committed Jun 14, 2023
1 parent e9aeb8c commit 237a2ba
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 1 deletion.
27 changes: 27 additions & 0 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,38 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
*/
def evalOn(ec: ExecutionContext): IO[A] = IO.EvalOn(this, ec)

/**
* Shifts the execution of the current IO to the specified [[java.util.concurrent.Executor]].
*
* @param executor
* @return
*/
def evalOnExecutor(executor: Executor): IO[A] = {
require(executor != null, "Cannot pass undefined Executor as an argument")
executor match {
case ec: ExecutionContext =>
evalOn(ec: ExecutionContext)
case executor =>
IO.executionContext.flatMap { refEc =>
val newEc: ExecutionContext =
ExecutionContext.fromExecutor(executor, refEc.reportFailure)
evalOn(newEc)
}
}
}

def startOn(ec: ExecutionContext): IO[FiberIO[A @uncheckedVariance]] = start.evalOn(ec)

def startOnExecutor(executor: Executor): IO[FiberIO[A @uncheckedVariance]] =
start.evalOnExecutor(executor)

def backgroundOn(ec: ExecutionContext): ResourceIO[IO[OutcomeIO[A @uncheckedVariance]]] =
Resource.make(startOn(ec))(_.cancel).map(_.join)

def backgroundOnExecutor(
executor: Executor): ResourceIO[IO[OutcomeIO[A @uncheckedVariance]]] =
Resource.make(startOnExecutor(executor))(_.cancel).map(_.join)

/**
* Given an effect which might be [[uncancelable]] and a finalizer, produce an effect which
* can be canceled by running the finalizer. This combinator is useful for handling scenarios
Expand Down
43 changes: 43 additions & 0 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
*/
def evalOn[A](fa: F[A], ec: ExecutionContext): F[A]

/**
* [[Async.evalOn]] with provided [[java.util.concurrent.Executor]]
*/
def evalOnExecutor[A](fa: F[A], executor: Executor): F[A] = {
require(executor != null, "Cannot pass undefined Executor as an argument")
executor match {
case ec: ExecutionContext =>
evalOn[A](fa, ec: ExecutionContext)
case executor =>
flatMap(executionContext) { refEc =>
val newEc: ExecutionContext =
ExecutionContext.fromExecutor(executor, refEc.reportFailure)
evalOn[A](fa, newEc)
}
}
}

/**
* [[Async.evalOn]] as a natural transformation.
*/
Expand All @@ -174,6 +191,14 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
def apply[A](fa: F[A]): F[A] = evalOn(fa, ec)
}

/**
* [[Async.evalOnExecutor]] as a natural transformation.
*/
def evalOnExecutorK(executor: Executor): F ~> F =
new (F ~> F) {
def apply[A](fa: F[A]): F[A] = evalOnExecutor(fa, executor)
}

/**
* Start a new fiber on a different execution context.
*
Expand All @@ -182,6 +207,14 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
def startOn[A](fa: F[A], ec: ExecutionContext): F[Fiber[F, Throwable, A]] =
evalOn(start(fa), ec)

/**
* Start a new fiber on a different executor.
*
* See [[GenSpawn.start]] for more details.
*/
def startOnExecutor[A](fa: F[A], executor: Executor): F[Fiber[F, Throwable, A]] =
evalOnExecutor(start(fa), executor)

/**
* Start a new background fiber on a different execution context.
*
Expand All @@ -192,6 +225,16 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
ec: ExecutionContext): Resource[F, F[Outcome[F, Throwable, A]]] =
Resource.make(startOn(fa, ec))(_.cancel)(this).map(_.join)

/**
* Start a new background fiber on a different executor.
*
* See [[GenSpawn.background]] for more details.
*/
def backgroundOnExecutor[A](
fa: F[A],
executor: Executor): Resource[F, F[Outcome[F, Throwable, A]]] =
Resource.make(startOnExecutor(fa, executor))(_.cancel)(this).map(_.join)

/**
* Obtain a reference to the current execution context.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package cats.effect.kernel.syntax

import cats.effect.kernel.{Async, Fiber, Outcome, Resource, Sync}
import cats.effect.kernel._

import java.util.concurrent.Executor
import scala.concurrent.ExecutionContext

trait AsyncSyntax {
Expand All @@ -30,13 +31,23 @@ final class AsyncOps[F[_], A] private[syntax] (private[syntax] val wrapped: F[A]
def evalOn(ec: ExecutionContext)(implicit F: Async[F]): F[A] =
Async[F].evalOn(wrapped, ec)

def evalOnExecutor(executor: Executor)(implicit F: Async[F]): F[A] =
Async[F].evalOnExecutor(wrapped, executor)

def startOn(ec: ExecutionContext)(implicit F: Async[F]): F[Fiber[F, Throwable, A]] =
Async[F].startOn(wrapped, ec)

def startOnExecutor(executor: Executor)(implicit F: Async[F]): F[Fiber[F, Throwable, A]] =
Async[F].startOnExecutor(wrapped, executor)

def backgroundOn(ec: ExecutionContext)(
implicit F: Async[F]): Resource[F, F[Outcome[F, Throwable, A]]] =
Async[F].backgroundOn(wrapped, ec)

def backgroundOnExecutor(executor: Executor)(
implicit F: Async[F]): Resource[F, F[Outcome[F, Throwable, A]]] =
Async[F].backgroundOnExecutor(wrapped, executor)

def syncStep[G[_]: Sync](limit: Int)(implicit F: Async[F]): G[Either[F[A], A]] =
Async[F].syncStep[G, A](wrapped, limit)
}

0 comments on commit 237a2ba

Please sign in to comment.