Skip to content

Commit

Permalink
Remove parallel errors from ZPure (#1432)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Dec 27, 2024
1 parent 14e7654 commit b7b1f09
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 201 deletions.
18 changes: 2 additions & 16 deletions core-tests/shared/src/test/scala/zio/prelude/fx/ZPureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ object ZPureSpec extends ZIOBaseSpec {
},
test("providing environment should preserve errors") {
val zPure: ZPure[Nothing, Unit, Unit, (Int, Int), Int, Int] =
ZPure.fail(1).zipPar(ZPure.fail(2)).as(0)
ZPure.fail(1).as(0)
val actual = zPure.provideEnvironment(ZEnvironment((1, 2))).runValidation
val expected = Validation.Failure(Chunk.empty, NonEmptyChunk(1, 2))
val expected = Validation.Failure(Chunk.empty, NonEmptyChunk(1))
assert(actual)(equalTo(expected))
},
test("provideSome") {
Expand Down Expand Up @@ -821,20 +821,6 @@ object ZPureSpec extends ZIOBaseSpec {
}
)
),
test("parallel errors example") {
def validateName(s: String): ZPure[Nothing, Unit, Unit, Any, String, String] =
if (s == "John Doe") ZPure.succeed(s) else ZPure.fail("Wrong name!")
def validateAge(age: Int): ZPure[Nothing, Unit, Unit, Any, String, Int] =
if (age >= 18) ZPure.succeed(age) else ZPure.fail("Under age")
def validateAuthorized(authorized: Boolean): ZPure[Nothing, Unit, Unit, Any, String, Unit] =
if (authorized) ZPure.unit else ZPure.fail("Not authorized")
val validation =
validateName("Jane Doe") zipPar validateAge(17) zipPar validateAuthorized(false)
val result = validation.sandbox.either.run
assert(result)(
isLeft(equalTo(Cause("Wrong name!") && Cause("Under age") && Cause("Not authorized")))
)
},
test("state is restored after failure") {
val foo: ZPure[Nothing, String, Int, Any, Nothing, Unit] = ZPure.set(3)
val bar: ZPure[Nothing, Int, String, Any, Nothing, Unit] = ZPure.set("bar")
Expand Down
18 changes: 10 additions & 8 deletions core/shared/src/main/scala/zio/prelude/Associative.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1453,15 +1453,17 @@ object Associative extends AssociativeLowPriority {

trait AssociativeLowPriority {

implicit def FxCauseProdAssociative[A]: Associative[Prod[fx.Cause[A]]] = new Associative[Prod[fx.Cause[A]]] {
def combine(l: => Prod[fx.Cause[A]], r: => Prod[fx.Cause[A]]): Prod[fx.Cause[A]] =
Prod(Prod.unwrap(l) ++ Prod.unwrap(r))
}
implicit def parSeqProdAssociative[A]: Associative[Prod[ParSeq[Unit, A]]] =
new Associative[Prod[ParSeq[Unit, A]]] {
def combine(l: => Prod[ParSeq[Unit, A]], r: => Prod[ParSeq[Unit, A]]): Prod[ParSeq[Unit, A]] =
Prod(Prod.unwrap(l) ++ Prod.unwrap(r))
}

implicit def FxCauseSumCommutative[A]: Commutative[Sum[fx.Cause[A]]] = new Commutative[Sum[fx.Cause[A]]] {
def combine(l: => Sum[fx.Cause[A]], r: => Sum[fx.Cause[A]]): Sum[fx.Cause[A]] =
Sum(Sum.unwrap(l) && Sum.unwrap(r))
}
implicit def parSeqSumCommutative[A]: Commutative[Sum[ParSeq[Unit, A]]] =
new Commutative[Sum[ParSeq[Unit, A]]] {
def combine(l: => Sum[ParSeq[Unit, A]], r: => Sum[ParSeq[Unit, A]]): Sum[ParSeq[Unit, A]] =
Sum(Sum.unwrap(l) && Sum.unwrap(r))
}

/**
* The `Commutative` and `PartialInverse` instance for the product of `Int` values.
Expand Down
126 changes: 23 additions & 103 deletions core/shared/src/main/scala/zio/prelude/fx/ZPure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package zio.prelude.fx

import zio._
import zio.prelude._
import zio.prelude.coherent.CovariantIdentityBoth
import zio.{Cause => _, _}

import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.ClassTag
Expand All @@ -36,36 +36,12 @@ import scala.util.Try
sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
import ZPure._

/**
* A symbolic alias for `zipParRight`.
*/
final def &>[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, B] =
self zipParRight that

/**
* A symbolic alias for `zipRight`.
*/
final def *>[W1 >: W, S3, R1 <: R, E1 >: E, B](that: ZPure[W1, S2, S3, R1, E1, B]): ZPure[W1, S1, S3, R1, E1, B] =
self zipRight that

/**
* A symbolic alias for `zipParLeft`.
*/
final def <&[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, A] =
self zipParLeft that

/**
* A symbolic alias for `zipPar`.
*/
final def <&>[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](that: ZPure[W1, S3, S3, R1, E1, B])(implicit
zippable: Zippable[A, B]
): ZPure[W1, S3, S3, R1, E1, zippable.Out] =
self zipPar that

/**
* A symbolic alias for `zipLeft`.
*/
Expand Down Expand Up @@ -263,12 +239,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
): ZPure[W, S3, S3, R, Nothing, B] =
self.foldM(e => ZPure.succeed(failure(e)), a => ZPure.succeed(success(a)))

final def foldCauseM[W1 >: W, S0 <: S1, S3, R1 <: R, E1, B](
failure: Cause[E] => ZPure[W1, S0, S3, R1, E1, B],
success: A => ZPure[W1, S2, S3, R1, E1, B]
)(implicit ev: CanFail[E]): ZPure[W1, S0, S3, R1, E1, B] =
Fold(self, failure, success)

/**
* Recovers from errors by accepting one computation to execute for the case
* of an error, and one computation to execute for the case of success.
Expand All @@ -277,7 +247,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
failure: E => ZPure[W1, S0, S3, R1, E1, B],
success: A => ZPure[W1, S2, S3, R1, E1, B]
)(implicit ev: CanFail[E]): ZPure[W1, S0, S3, R1, E1, B] =
foldCauseM((cause: Cause[E]) => failure(cause.first), success)
Fold(self, failure, success)

/**
* Exposes the output state into the value channel.
Expand Down Expand Up @@ -361,14 +331,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
final def mapError[E1](f: E => E1)(implicit ev: CanFail[E]): ZPure[W, S1, S2, R, E1, A] =
catchAll(e => fail(f(e)))

/**
* Returns a computation with its full cause of failure mapped using the
* specified function. This can be users to transform errors while
* preserving the original structure of the `Cause`.
*/
final def mapErrorCause[E2](f: Cause[E] => Cause[E2]): ZPure[W, S1, S2, R, E2, A] =
foldCauseM(cause => ZPure.failCause(f(cause)), ZPure.succeed)

/**
* Transforms the updated state of this computation with the specified
* function.
Expand Down Expand Up @@ -578,29 +540,29 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
* the updated state and the result.
*/
final def run(s: S1)(implicit ev1: Any <:< R, ev2: E <:< Nothing): (S2, A) =
runAll(s)._2.fold(cause => ev2(cause.first), identity)
runAll(s)._2.fold(ev2, identity)

/**
* Runs this computation with the specified initial state, returning both the
* log and either all the failures that occurred or the updated state and the
* result.
*/
final def runAll(s: S1)(implicit ev: Any <:< R): (Chunk[W], Either[Cause[E], (S2, A)]) =
final def runAll(s: S1)(implicit ev: Any <:< R): (Chunk[W], Either[E, (S2, A)]) =
Runner(s, self)

/**
* Runs this computation to produce its result or the first failure to
* occur.
*/
final def runEither(implicit ev1: Unit <:< S1, ev2: Any <:< R): Either[E, A] =
runAll(())._2.fold(cause => Left(cause.first), { case (_, a) => Right(a) })
runAll(())._2.fold(error => Left(error), { case (_, a) => Right(a) })

/**
* Runs this computation to produce its result and the log.
*/
final def runLog(implicit ev1: Unit <:< S1, ev2: Any <:< R, ev3: E <:< Nothing): (Chunk[W], A) = {
val (log, either) = runAll(())
(log, either.fold(cause => ev3(cause.first), { case (_, a) => a }))
(log, either.fold(error => ev3(error), { case (_, a) => a }))
}

/**
Expand All @@ -622,16 +584,10 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
*/
final def runValidation(implicit ev1: Unit <:< S1, ev2: Any <:< R): ZValidation[W, E, A] =
runAll(()) match {
case (log, Left(cause)) => ZValidation.Failure(log, NonEmptyChunk.fromChunk(cause.toChunk).get)
case (log, Left(error)) => ZValidation.Failure(log, NonEmptyChunk.single(error))
case (log, Right((_, a))) => ZValidation.Success(log, a)
}

/**
* Exposes the full cause of failures of this computation.
*/
final def sandbox: ZPure[W, S1, S2, R, Cause[E], A] =
foldCauseM(ZPure.fail, ZPure.succeed)

/**
* Converts an option on values into an option on errors leaving the state unchanged.
*/
Expand Down Expand Up @@ -684,7 +640,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
def toZIO(implicit ev: Unit <:< S1): zio.ZIO[R, E, A] =
ZIO.environmentWithZIO[R] { r =>
provideEnvironment(r).runAll(())._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((_, a)) => ZIO.succeed(a)
}
}
Expand All @@ -696,7 +652,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val result = provideEnvironment(r).runAll(s1)
result._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((_, a)) => ZIO.succeed(a)
}
}
Expand All @@ -708,7 +664,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val result = provideEnvironment(r).runAll(s1)
result._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right(result) => ZIO.succeed(result)
}
}
Expand All @@ -720,17 +676,11 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val (log, result) = provideEnvironment(r).runAll(s1)
result match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((s2, a)) => ZIO.succeed((log, s2, a))
}
}

/**
* Submerges the full cause of failures of this computation.
*/
def unsandbox[E1](implicit ev: E <:< Cause[E1]): ZPure[W, S1, S2, R, E1, A] =
foldM(e => ZPure.failCause(ev(e)), a => ZPure.succeed(a))

/**
* Combines this computation with the specified computation, passing the
* updated state from this computation to that computation and combining the
Expand Down Expand Up @@ -771,33 +721,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
)(f: (A, B) => C): ZPure[W1, S1, S3, R1, E1, C] =
self.flatMap(a => that.map(b => f(a, b)))

final def zipWithPar[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
)(f: (A, B) => C): ZPure[W1, S3, S3, R1, E1, C] =
self.foldCauseM(
c1 =>
that.foldCauseM(
c2 => ZPure.failCause(c1 && c2),
_ => ZPure.failCause(c1)
),
a => that.map(b => f(a, b))
)

final def zipPar[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](that: ZPure[W1, S3, S3, R1, E1, B])(implicit
zippable: Zippable[A, B]
): ZPure[W1, S3, S3, R1, E1, zippable.Out] =
self.zipWithPar(that)(zippable.zip(_, _))

final def zipParLeft[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, A] =
self.zipWithPar(that)((a, _) => a)

final def zipParRight[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, B] =
self.zipWithPar(that)((_, b) => b)

/**
* Returns a successful computation if the value is `Right`, or fails with error `None`.
*/
Expand Down Expand Up @@ -915,10 +838,7 @@ object ZPure {
new EnvironmentWithPurePartiallyApplied

def fail[E](e: E): ZPure[Nothing, Any, Nothing, Any, E, Nothing] =
failCause(Cause(e))

def failCause[E](cause: Cause[E]): ZPure[Nothing, Any, Nothing, Any, E, Nothing] =
ZPure.Fail(cause)
ZPure.Fail(e)

/**
* Constructs a computation from an `Either`.
Expand Down Expand Up @@ -1183,15 +1103,15 @@ object ZPure {
}

private final case class Succeed[+A](value: A) extends ZPure[Nothing, Any, Nothing, Any, Nothing, A]
private final case class Fail[+E](error: Cause[E]) extends ZPure[Nothing, Any, Nothing, Any, E, Nothing]
private final case class Fail[+E](error: E) extends ZPure[Nothing, Any, Nothing, Any, E, Nothing]
private final case class Modify[-S1, +S2, +A](run0: S1 => (A, S2)) extends ZPure[Nothing, S1, S2, Any, Nothing, A]
private final case class FlatMap[+W, -S1, S2, +S3, -R, +E, A, +B](
value: ZPure[W, S1, S2, R, E, A],
continue: A => ZPure[W, S2, S3, R, E, B]
) extends ZPure[W, S1, S3, R, E, B]
private final case class Fold[+W, -S1, S2, +S3, -R, E1, +E2, A, +B](
value: ZPure[W, S1, S2, R, E1, A],
failure: Cause[E1] => ZPure[W, S1, S3, R, E2, B],
failure: E1 => ZPure[W, S1, S3, R, E2, B],
success: A => ZPure[W, S2, S3, R, E2, B]
) extends ZPure[W, S1, S3, R, E2, B]
with Function[A, ZPure[W, S2, S3, R, E2, B]] {
Expand Down Expand Up @@ -1222,7 +1142,7 @@ object ZPure {
def apply[W, S1, S2, R, E, A](
state: S1,
zPure: ZPure[W, S1, S2, R, E, A]
): (Chunk[W], Either[Cause[E], (S2, A)]) = {
): (Chunk[W], Either[E, (S2, A)]) = {
val (runner, running) = pool.get()

if (running.compareAndSet(false, true)) {
Expand All @@ -1237,7 +1157,7 @@ object ZPure {
}
}

final private case class Err(cause: Cause[Any]) extends Exception {
final private case class Err(cause: Any) extends Exception {
override def fillInStackTrace(): Throwable = this
}
}
Expand All @@ -1261,12 +1181,12 @@ object ZPure {
private def run[W, S1, S2, R, E, A](
state: S1,
zPure: ZPure[W, S1, S2, R, E, A]
): (Chunk[W], Either[Cause[E], (S2, A)]) = {
): (Chunk[W], Either[E, (S2, A)]) = {
val result =
try
Right(loop(state, zPure.asInstanceOf[Erased]))
catch {
case Runner.Err(c) => Left(c.asInstanceOf[Cause[E]])
case Runner.Err(c) => Left(c.asInstanceOf[E])
}

(_logs.result().asInstanceOf[Chunk[W]], result)
Expand Down Expand Up @@ -1326,9 +1246,9 @@ object ZPure {

ZPure.Fold(
zPure.value,
(cause: Cause[Any]) => {
(error: Any) => {
_logs = previousLogs
ZPure.set(state) *> zPure.failure(cause)
ZPure.set(state) *> zPure.failure(error)
},
(a: Any) => {
val logs0 = _logs.result()
Expand All @@ -1340,7 +1260,7 @@ object ZPure {
} else {
ZPure.Fold(
zPure.value,
ZPure.set(state) *> zPure.failure(_: Cause[Any]),
ZPure.set(state) *> zPure.failure(_: Any),
zPure.success
)
}
Expand All @@ -1359,8 +1279,8 @@ object ZPure {
val zPure = provide0.asInstanceOf[Provide[Any, Any, Any, Any, Any, Any]]
val previousEnv = _environment
_environment = zPure.r
curZPure = zPure.continue.foldCauseM(
e => { _environment = previousEnv; ZPure.failCause(e) },
curZPure = zPure.continue.foldM(
e => { _environment = previousEnv; ZPure.fail(e) },
a => { _environment = previousEnv; ZPure.succeed(a) }
)

Expand Down
22 changes: 0 additions & 22 deletions core/shared/src/main/scala/zio/prelude/fx/package.scala

This file was deleted.

Loading

0 comments on commit b7b1f09

Please sign in to comment.