Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove parallel errors from ZPure #1432

Merged
merged 5 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions core-tests/shared/src/test/scala/zio/prelude/fx/ZPureSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ object ZPureSpec extends ZIOBaseSpec {
},
test("providing environment should preserve errors") {
val zPure: ZPure[Nothing, Unit, Unit, (Int, Int), Int, Int] =
ZPure.fail(1).zipPar(ZPure.fail(2)).as(0)
ZPure.fail(1).as(0)
val actual = zPure.provideEnvironment(ZEnvironment((1, 2))).runValidation
val expected = Validation.Failure(Chunk.empty, NonEmptyChunk(1, 2))
val expected = Validation.Failure(Chunk.empty, NonEmptyChunk(1))
assert(actual)(equalTo(expected))
},
test("provideSome") {
Expand Down Expand Up @@ -821,20 +821,6 @@ object ZPureSpec extends ZIOBaseSpec {
}
)
),
test("parallel errors example") {
def validateName(s: String): ZPure[Nothing, Unit, Unit, Any, String, String] =
if (s == "John Doe") ZPure.succeed(s) else ZPure.fail("Wrong name!")
def validateAge(age: Int): ZPure[Nothing, Unit, Unit, Any, String, Int] =
if (age >= 18) ZPure.succeed(age) else ZPure.fail("Under age")
def validateAuthorized(authorized: Boolean): ZPure[Nothing, Unit, Unit, Any, String, Unit] =
if (authorized) ZPure.unit else ZPure.fail("Not authorized")
val validation =
validateName("Jane Doe") zipPar validateAge(17) zipPar validateAuthorized(false)
val result = validation.sandbox.either.run
assert(result)(
isLeft(equalTo(Cause("Wrong name!") && Cause("Under age") && Cause("Not authorized")))
)
},
test("state is restored after failure") {
val foo: ZPure[Nothing, String, Int, Any, Nothing, Unit] = ZPure.set(3)
val bar: ZPure[Nothing, Int, String, Any, Nothing, Unit] = ZPure.set("bar")
Expand Down
18 changes: 10 additions & 8 deletions core/shared/src/main/scala/zio/prelude/Associative.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1453,15 +1453,17 @@ object Associative extends AssociativeLowPriority {

trait AssociativeLowPriority {

implicit def FxCauseProdAssociative[A]: Associative[Prod[fx.Cause[A]]] = new Associative[Prod[fx.Cause[A]]] {
def combine(l: => Prod[fx.Cause[A]], r: => Prod[fx.Cause[A]]): Prod[fx.Cause[A]] =
Prod(Prod.unwrap(l) ++ Prod.unwrap(r))
}
implicit def parSeqProdAssociative[A]: Associative[Prod[ParSeq[Unit, A]]] =
new Associative[Prod[ParSeq[Unit, A]]] {
def combine(l: => Prod[ParSeq[Unit, A]], r: => Prod[ParSeq[Unit, A]]): Prod[ParSeq[Unit, A]] =
Prod(Prod.unwrap(l) ++ Prod.unwrap(r))
}

implicit def FxCauseSumCommutative[A]: Commutative[Sum[fx.Cause[A]]] = new Commutative[Sum[fx.Cause[A]]] {
def combine(l: => Sum[fx.Cause[A]], r: => Sum[fx.Cause[A]]): Sum[fx.Cause[A]] =
Sum(Sum.unwrap(l) && Sum.unwrap(r))
}
implicit def parSeqSumCommutative[A]: Commutative[Sum[ParSeq[Unit, A]]] =
new Commutative[Sum[ParSeq[Unit, A]]] {
def combine(l: => Sum[ParSeq[Unit, A]], r: => Sum[ParSeq[Unit, A]]): Sum[ParSeq[Unit, A]] =
Sum(Sum.unwrap(l) && Sum.unwrap(r))
}

/**
* The `Commutative` and `PartialInverse` instance for the product of `Int` values.
Expand Down
126 changes: 23 additions & 103 deletions core/shared/src/main/scala/zio/prelude/fx/ZPure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package zio.prelude.fx

import zio._
import zio.prelude._
import zio.prelude.coherent.CovariantIdentityBoth
import zio.{Cause => _, _}

import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.ClassTag
Expand All @@ -36,36 +36,12 @@ import scala.util.Try
sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
import ZPure._

/**
* A symbolic alias for `zipParRight`.
*/
final def &>[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, B] =
self zipParRight that

/**
* A symbolic alias for `zipRight`.
*/
final def *>[W1 >: W, S3, R1 <: R, E1 >: E, B](that: ZPure[W1, S2, S3, R1, E1, B]): ZPure[W1, S1, S3, R1, E1, B] =
self zipRight that

/**
* A symbolic alias for `zipParLeft`.
*/
final def <&[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, A] =
self zipParLeft that

/**
* A symbolic alias for `zipPar`.
*/
final def <&>[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](that: ZPure[W1, S3, S3, R1, E1, B])(implicit
zippable: Zippable[A, B]
): ZPure[W1, S3, S3, R1, E1, zippable.Out] =
self zipPar that

/**
* A symbolic alias for `zipLeft`.
*/
Expand Down Expand Up @@ -263,12 +239,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
): ZPure[W, S3, S3, R, Nothing, B] =
self.foldM(e => ZPure.succeed(failure(e)), a => ZPure.succeed(success(a)))

final def foldCauseM[W1 >: W, S0 <: S1, S3, R1 <: R, E1, B](
failure: Cause[E] => ZPure[W1, S0, S3, R1, E1, B],
success: A => ZPure[W1, S2, S3, R1, E1, B]
)(implicit ev: CanFail[E]): ZPure[W1, S0, S3, R1, E1, B] =
Fold(self, failure, success)

/**
* Recovers from errors by accepting one computation to execute for the case
* of an error, and one computation to execute for the case of success.
Expand All @@ -277,7 +247,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
failure: E => ZPure[W1, S0, S3, R1, E1, B],
success: A => ZPure[W1, S2, S3, R1, E1, B]
)(implicit ev: CanFail[E]): ZPure[W1, S0, S3, R1, E1, B] =
foldCauseM((cause: Cause[E]) => failure(cause.first), success)
Fold(self, failure, success)

/**
* Exposes the output state into the value channel.
Expand Down Expand Up @@ -361,14 +331,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
final def mapError[E1](f: E => E1)(implicit ev: CanFail[E]): ZPure[W, S1, S2, R, E1, A] =
catchAll(e => fail(f(e)))

/**
* Returns a computation with its full cause of failure mapped using the
* specified function. This can be users to transform errors while
* preserving the original structure of the `Cause`.
*/
final def mapErrorCause[E2](f: Cause[E] => Cause[E2]): ZPure[W, S1, S2, R, E2, A] =
foldCauseM(cause => ZPure.failCause(f(cause)), ZPure.succeed)

/**
* Transforms the updated state of this computation with the specified
* function.
Expand Down Expand Up @@ -578,29 +540,29 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
* the updated state and the result.
*/
final def run(s: S1)(implicit ev1: Any <:< R, ev2: E <:< Nothing): (S2, A) =
runAll(s)._2.fold(cause => ev2(cause.first), identity)
runAll(s)._2.fold(ev2, identity)

/**
* Runs this computation with the specified initial state, returning both the
* log and either all the failures that occurred or the updated state and the
* result.
*/
final def runAll(s: S1)(implicit ev: Any <:< R): (Chunk[W], Either[Cause[E], (S2, A)]) =
final def runAll(s: S1)(implicit ev: Any <:< R): (Chunk[W], Either[E, (S2, A)]) =
Runner(s, self)

/**
* Runs this computation to produce its result or the first failure to
* occur.
*/
final def runEither(implicit ev1: Unit <:< S1, ev2: Any <:< R): Either[E, A] =
runAll(())._2.fold(cause => Left(cause.first), { case (_, a) => Right(a) })
runAll(())._2.fold(error => Left(error), { case (_, a) => Right(a) })

/**
* Runs this computation to produce its result and the log.
*/
final def runLog(implicit ev1: Unit <:< S1, ev2: Any <:< R, ev3: E <:< Nothing): (Chunk[W], A) = {
val (log, either) = runAll(())
(log, either.fold(cause => ev3(cause.first), { case (_, a) => a }))
(log, either.fold(error => ev3(error), { case (_, a) => a }))
}

/**
Expand All @@ -622,16 +584,10 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
*/
final def runValidation(implicit ev1: Unit <:< S1, ev2: Any <:< R): ZValidation[W, E, A] =
runAll(()) match {
case (log, Left(cause)) => ZValidation.Failure(log, NonEmptyChunk.fromChunk(cause.toChunk).get)
case (log, Left(error)) => ZValidation.Failure(log, NonEmptyChunk.single(error))
case (log, Right((_, a))) => ZValidation.Success(log, a)
}

/**
* Exposes the full cause of failures of this computation.
*/
final def sandbox: ZPure[W, S1, S2, R, Cause[E], A] =
foldCauseM(ZPure.fail, ZPure.succeed)

/**
* Converts an option on values into an option on errors leaving the state unchanged.
*/
Expand Down Expand Up @@ -684,7 +640,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
def toZIO(implicit ev: Unit <:< S1): zio.ZIO[R, E, A] =
ZIO.environmentWithZIO[R] { r =>
provideEnvironment(r).runAll(())._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((_, a)) => ZIO.succeed(a)
}
}
Expand All @@ -696,7 +652,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val result = provideEnvironment(r).runAll(s1)
result._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((_, a)) => ZIO.succeed(a)
}
}
Expand All @@ -708,7 +664,7 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val result = provideEnvironment(r).runAll(s1)
result._2 match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right(result) => ZIO.succeed(result)
}
}
Expand All @@ -720,17 +676,11 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
ZIO.environmentWithZIO[R] { r =>
val (log, result) = provideEnvironment(r).runAll(s1)
result match {
case Left(cause) => ZIO.failCause(cause.toCause)
case Left(error) => ZIO.fail(error)
case Right((s2, a)) => ZIO.succeed((log, s2, a))
}
}

/**
* Submerges the full cause of failures of this computation.
*/
def unsandbox[E1](implicit ev: E <:< Cause[E1]): ZPure[W, S1, S2, R, E1, A] =
foldM(e => ZPure.failCause(ev(e)), a => ZPure.succeed(a))

/**
* Combines this computation with the specified computation, passing the
* updated state from this computation to that computation and combining the
Expand Down Expand Up @@ -771,33 +721,6 @@ sealed trait ZPure[+W, -S1, +S2, -R, +E, +A] { self =>
)(f: (A, B) => C): ZPure[W1, S1, S3, R1, E1, C] =
self.flatMap(a => that.map(b => f(a, b)))

final def zipWithPar[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
)(f: (A, B) => C): ZPure[W1, S3, S3, R1, E1, C] =
self.foldCauseM(
c1 =>
that.foldCauseM(
c2 => ZPure.failCause(c1 && c2),
_ => ZPure.failCause(c1)
),
a => that.map(b => f(a, b))
)

final def zipPar[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](that: ZPure[W1, S3, S3, R1, E1, B])(implicit
zippable: Zippable[A, B]
): ZPure[W1, S3, S3, R1, E1, zippable.Out] =
self.zipWithPar(that)(zippable.zip(_, _))

final def zipParLeft[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, A] =
self.zipWithPar(that)((a, _) => a)

final def zipParRight[W1 >: W, S3 >: S2 <: S1, R1 <: R, E1 >: E, B, C](
that: ZPure[W1, S3, S3, R1, E1, B]
): ZPure[W1, S3, S3, R1, E1, B] =
self.zipWithPar(that)((_, b) => b)

/**
* Returns a successful computation if the value is `Right`, or fails with error `None`.
*/
Expand Down Expand Up @@ -915,10 +838,7 @@ object ZPure {
new EnvironmentWithPurePartiallyApplied

def fail[E](e: E): ZPure[Nothing, Any, Nothing, Any, E, Nothing] =
failCause(Cause(e))

def failCause[E](cause: Cause[E]): ZPure[Nothing, Any, Nothing, Any, E, Nothing] =
ZPure.Fail(cause)
ZPure.Fail(e)

/**
* Constructs a computation from an `Either`.
Expand Down Expand Up @@ -1183,15 +1103,15 @@ object ZPure {
}

private final case class Succeed[+A](value: A) extends ZPure[Nothing, Any, Nothing, Any, Nothing, A]
private final case class Fail[+E](error: Cause[E]) extends ZPure[Nothing, Any, Nothing, Any, E, Nothing]
private final case class Fail[+E](error: E) extends ZPure[Nothing, Any, Nothing, Any, E, Nothing]
private final case class Modify[-S1, +S2, +A](run0: S1 => (A, S2)) extends ZPure[Nothing, S1, S2, Any, Nothing, A]
private final case class FlatMap[+W, -S1, S2, +S3, -R, +E, A, +B](
value: ZPure[W, S1, S2, R, E, A],
continue: A => ZPure[W, S2, S3, R, E, B]
) extends ZPure[W, S1, S3, R, E, B]
private final case class Fold[+W, -S1, S2, +S3, -R, E1, +E2, A, +B](
value: ZPure[W, S1, S2, R, E1, A],
failure: Cause[E1] => ZPure[W, S1, S3, R, E2, B],
failure: E1 => ZPure[W, S1, S3, R, E2, B],
success: A => ZPure[W, S2, S3, R, E2, B]
) extends ZPure[W, S1, S3, R, E2, B]
with Function[A, ZPure[W, S2, S3, R, E2, B]] {
Expand Down Expand Up @@ -1222,7 +1142,7 @@ object ZPure {
def apply[W, S1, S2, R, E, A](
state: S1,
zPure: ZPure[W, S1, S2, R, E, A]
): (Chunk[W], Either[Cause[E], (S2, A)]) = {
): (Chunk[W], Either[E, (S2, A)]) = {
val (runner, running) = pool.get()

if (running.compareAndSet(false, true)) {
Expand All @@ -1237,7 +1157,7 @@ object ZPure {
}
}

final private case class Err(cause: Cause[Any]) extends Exception {
final private case class Err(cause: Any) extends Exception {
override def fillInStackTrace(): Throwable = this
}
}
Expand All @@ -1261,12 +1181,12 @@ object ZPure {
private def run[W, S1, S2, R, E, A](
state: S1,
zPure: ZPure[W, S1, S2, R, E, A]
): (Chunk[W], Either[Cause[E], (S2, A)]) = {
): (Chunk[W], Either[E, (S2, A)]) = {
val result =
try
Right(loop(state, zPure.asInstanceOf[Erased]))
catch {
case Runner.Err(c) => Left(c.asInstanceOf[Cause[E]])
case Runner.Err(c) => Left(c.asInstanceOf[E])
}

(_logs.result().asInstanceOf[Chunk[W]], result)
Expand Down Expand Up @@ -1326,9 +1246,9 @@ object ZPure {

ZPure.Fold(
zPure.value,
(cause: Cause[Any]) => {
(error: Any) => {
_logs = previousLogs
ZPure.set(state) *> zPure.failure(cause)
ZPure.set(state) *> zPure.failure(error)
},
(a: Any) => {
val logs0 = _logs.result()
Expand All @@ -1340,7 +1260,7 @@ object ZPure {
} else {
ZPure.Fold(
zPure.value,
ZPure.set(state) *> zPure.failure(_: Cause[Any]),
ZPure.set(state) *> zPure.failure(_: Any),
zPure.success
)
}
Expand All @@ -1359,8 +1279,8 @@ object ZPure {
val zPure = provide0.asInstanceOf[Provide[Any, Any, Any, Any, Any, Any]]
val previousEnv = _environment
_environment = zPure.r
curZPure = zPure.continue.foldCauseM(
e => { _environment = previousEnv; ZPure.failCause(e) },
curZPure = zPure.continue.foldM(
e => { _environment = previousEnv; ZPure.fail(e) },
a => { _environment = previousEnv; ZPure.succeed(a) }
)

Expand Down
22 changes: 0 additions & 22 deletions core/shared/src/main/scala/zio/prelude/fx/package.scala

This file was deleted.

Loading