diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index a4d643c8fb..7b315e5a9b 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -654,10 +654,9 @@ object Pull extends PullLowPriority { def cont(r: Terminal[Unit]): Pull[Pure, INothing, Unit] = r } - private abstract class Bind[+F[_], +O, X, +R](misstep: Pull[F, O, X]) + private abstract class Bind[+F[_], +O, X, +R](val step: Pull[F, O, X]) extends Pull[F, O, R] with ContP[X, F, O, R] { - def step: Pull[F, O, X] = misstep def cont(r: Terminal[X]): Pull[F, O, R] def delegate: Bind[F, O, X, R] = this } @@ -694,12 +693,12 @@ object Pull extends PullLowPriority { // This class is not created by combinators in public Pull API, only during compilation private class BindBind[F[_], O, X, Y]( - var innerBind: Bind[F, O, X, Y], - var endBind: Bind[F, O, Y, Unit] - ) extends Bind[F, O, X, Unit](null) { - override def step: Pull[F, O, X] = innerBind.step - def cont(xterm: Terminal[X]): Pull[F, O, Unit] = - try bindBindAux(xterm, this) + step: Pull[F, O, X], + val bb: Bind[F, O, X, Y], + val del: Bind[F, O, Y, Unit] + ) extends Bind[F, O, X, Unit](step) { + def cont(tx: Terminal[X]): Pull[F, O, Unit] = + try bindBindAux(bb.cont(tx), del) catch { case NonFatal(e) => Fail(e) } } @@ -712,12 +711,7 @@ object Pull extends PullLowPriority { case ty: Terminal[_] => del match { case cici: BindBind[F, O, r, Y] => - val innerBind = cici.innerBind - val endBind = cici.endBind - cici.innerBind = null - cici.endBind = null - val nextStep = innerBind.cont(ty) - bindBindAux[F, O, r, Y](nextStep, endBind) + bindBindAux[F, O, r, Y](cici.bb.cont(ty), cici.del) case _ => del.cont(ty) } case x => new DelegateBind(x, del) @@ -871,7 +865,7 @@ object Pull extends PullLowPriority { case b: Bind[G, X, y, Unit] => b.step match { case c: Bind[G, X, x, _] => - viewL(new BindBind[G, X, x, y](c, b.delegate)) + viewL(new BindBind[G, X, x, y](c.step, c.delegate, b.delegate)) case e: Action[G, X, y2] => contP = b.delegate e @@ -904,22 +898,37 @@ object Pull extends PullLowPriority { } - trait Run[-G[_], -X] { - def done(scope: Scope[F]): F[B] - def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[B] - def interrupted(inter: Interrupted): F[B] - def fail(e: Throwable): F[B] + trait Run[-G[_], -X, +End] { + def done(scope: Scope[F]): End + def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): End + def interrupted(inter: Interrupted): End + def fail(e: Throwable): End } + type CallRun[+G[_], +X, End] = Run[G, X, End] => End + + object TheBuildR extends Run[Pure, INothing, F[CallRun[Pure, Nothing, F[INothing]]]] { + type TheRun = Run[Pure, INothing, F[INothing]] + def fail(e: Throwable) = F.raiseError(e) + def done(scope: Scope[F]) = + F.pure((cont: TheRun) => cont.done(scope)) + def out(head: Chunk[INothing], scope: Scope[F], tail: Pull[Pure, INothing, Unit]) = + F.pure((cont: TheRun) => cont.out(head, scope, tail)) + def interrupted(i: Interrupted) = + F.pure((cont: TheRun) => cont.interrupted(i)) + } + + def buildR[G[_], X, End]: Run[G, X, F[CallRun[G, X, F[End]]]] = + TheBuildR.asInstanceOf[Run[G, X, F[CallRun[G, X, F[End]]]]] - def go[G[_], X]( + def go[G[_], X, End]( scope: Scope[F], extendedTopLevelScope: Option[Scope[F]], translation: G ~> F, - runner: Run[G, X], + runner: Run[G, X, F[End]], stream: Pull[G, X, Unit] - ): F[B] = { + ): F[End] = { - def interruptGuard(scope: Scope[F], view: Cont[INothing, G, X])(next: => F[B]): F[B] = + def interruptGuard(scope: Scope[F], view: Cont[INothing, G, X])(next: => F[End]): F[End] = scope.isInterrupted.flatMap { case None => next case Some(outcome) => @@ -931,18 +940,18 @@ object Pull extends PullLowPriority { go(scope, extendedTopLevelScope, translation, runner, view(result)) } - def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[B] = + def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[End] = go(scope, extendedTopLevelScope, translation, runner, view(Fail(err))) - class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X] { + class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X, F[End]] { private val prevRunner = runner - def done(doneScope: Scope[F]): F[B] = + def done(doneScope: Scope[F]): F[End] = go(doneScope, extendedTopLevelScope, translation, prevRunner, view(unit)) - def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[B] = { + def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[End] = { @tailrec - def outLoop(acc: Pull[G, X, Unit], pred: Run[G, X]): F[B] = + def outLoop(acc: Pull[G, X, Unit], pred: Run[G, X, F[End]]): F[End] = // bit of an ugly hack to avoid a stack overflow when these accummulate pred match { case vrun: ViewRunner => outLoop(bindView(acc, vrun.view), vrun.prevRunner) @@ -951,40 +960,40 @@ object Pull extends PullLowPriority { outLoop(tail, this) } - def interrupted(inter: Interrupted): F[B] = + def interrupted(inter: Interrupted): F[End] = go(scope, extendedTopLevelScope, translation, prevRunner, view(inter)) - def fail(e: Throwable): F[B] = goErr(e, view) + def fail(e: Throwable): F[End] = goErr(e, view) } - class TranslateRunner[H[_]](fk: H ~> G, view: Cont[Unit, G, X]) extends Run[H, X] { - def done(doneScope: Scope[F]): F[B] = + class TranslateRunner[H[_]](fk: H ~> G, view: Cont[Unit, G, X]) extends Run[H, X, F[End]] { + def done(doneScope: Scope[F]): F[End] = go(doneScope, extendedTopLevelScope, translation, runner, view(unit)) - def out(head: Chunk[X], scope: Scope[F], tail: Pull[H, X, Unit]): F[B] = { + def out(head: Chunk[X], scope: Scope[F], tail: Pull[H, X, Unit]): F[End] = { val next = bindView(Translate(tail, fk), view) runner.out(head, scope, next) } - def interrupted(inter: Interrupted): F[B] = + def interrupted(inter: Interrupted): F[End] = go(scope, extendedTopLevelScope, translation, runner, view(inter)) - def fail(e: Throwable): F[B] = goErr(e, view) + def fail(e: Throwable): F[End] = goErr(e, view) } - abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y] { - def done(scope: Scope[F]): F[B] = + abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y, F[End]] { + def done(scope: Scope[F]): F[End] = interruptGuard(scope, view) { go(scope, extendedTopLevelScope, translation, runner, view(Succeeded(None))) } - def interrupted(inter: Interrupted): F[B] = + def interrupted(inter: Interrupted): F[End] = go(scope, extendedTopLevelScope, translation, runner, view(inter)) - def fail(e: Throwable): F[B] = goErr(e, view) + def fail(e: Throwable): F[End] = goErr(e, view) } class UnconsRunR[Y](view: Cont[Option[(Chunk[Y], Pull[G, Y, Unit])], G, X]) extends StepRunR[Y, (Chunk[Y], Pull[G, Y, Unit])](view) { - def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] = + def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = // For a Uncons, we continue in same Scope at which we ended compilation of inner stream interruptGuard(outScope, view) { val result = Succeeded(Some((head, tail))) @@ -995,7 +1004,7 @@ object Pull extends PullLowPriority { class StepLegRunR[Y](view: Cont[Option[Stream.StepLeg[G, Y]], G, X]) extends StepRunR[Y, Stream.StepLeg[G, Y]](view) { - def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] = + def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = // StepLeg: we shift back to the scope at which we were // before we started to interpret the Leg's inner stream. interruptGuard(scope, view) { @@ -1004,7 +1013,8 @@ object Pull extends PullLowPriority { } } - class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit]) extends Run[G, Y] { + class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit]) + extends Run[G, Y, F[End]] { private[this] def unconsed(chunk: Chunk[Y], tail: Pull[G, Y, Unit]): Pull[G, X, Unit] = if (chunk.size == 1 && tail.isInstanceOf[Succeeded[_]]) // nb: If tl is Pure, there's no need to propagate flatMap through the tail. Hence, we @@ -1030,23 +1040,23 @@ object Pull extends PullLowPriority { go(0) } - def done(scope: Scope[F]): F[B] = + def done(scope: Scope[F]): F[End] = interruptGuard(scope, view) { go(scope, extendedTopLevelScope, translation, runner, view(unit)) } - def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] = { + def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = { val next = bindView(unconsed(head, tail), view) go(outScope, extendedTopLevelScope, translation, runner, next) } - def interrupted(inter: Interrupted): F[B] = + def interrupted(inter: Interrupted): F[End] = go(scope, extendedTopLevelScope, translation, runner, view(inter)) - def fail(e: Throwable): F[B] = goErr(e, view) + def fail(e: Throwable): F[End] = goErr(e, view) } - def goEval[V](eval: Eval[G, V], view: Cont[V, G, X]): F[B] = + def goEval[V](eval: Eval[G, V], view: Cont[V, G, X]): F[End] = scope.interruptibleEval(translation(eval.value)).flatMap { eitherOutcome => val result = eitherOutcome match { case Right(r) => Succeeded(r) @@ -1057,7 +1067,7 @@ object Pull extends PullLowPriority { go(scope, extendedTopLevelScope, translation, runner, view(result)) } - def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[B] = { + def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[End] = { val onScope = scope.acquireResource[R]( poll => if (acquire.cancelable) poll(translation(acquire.resource)) @@ -1079,7 +1089,7 @@ object Pull extends PullLowPriority { def goInterruptWhen( haltOnSignal: F[Either[Throwable, Unit]], view: Cont[Unit, G, X] - ): F[B] = { + ): F[End] = { val onScope = scope.acquireResource( _ => scope.interruptWhen(haltOnSignal), (f: Fiber[F, Throwable, Unit], _: ExitCase) => f.cancel @@ -1100,7 +1110,7 @@ object Pull extends PullLowPriority { stream: Pull[G, X, Unit], useInterruption: Boolean, view: Cont[Unit, G, X] - ): F[B] = { + ): F[End] = { def endScope(scopeId: Unique.Token, result: Terminal[Unit]): Pull[G, X, Unit] = result match { case Succeeded(_) => SucceedScope(scopeId) @@ -1127,7 +1137,7 @@ object Pull extends PullLowPriority { interruptGuard(scope, view)(tail) } - def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[B] = { + def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[End] = { def addError(err: Throwable, res: Terminal[Unit]): Terminal[Unit] = res match { case Succeeded(_) => Fail(err) case Fail(err0) => Fail(CompositeFailure(err, err0, Nil)) @@ -1187,9 +1197,9 @@ object Pull extends PullLowPriority { (viewL(stream): @unchecked) match { // unchecked b/c scala 3 erroneously reports exhaustiveness warning case tst: Translate[h, G, _] @unchecked => // y = Unit - val translateRunner: Run[h, X] = new TranslateRunner(tst.fk, getCont[Unit, G, X]) + val translateRunner: Run[h, X, F[End]] = new TranslateRunner(tst.fk, getCont[Unit, G, X]) val composed: h ~> F = translation.compose[h](tst.fk) - go[h, X](scope, extendedTopLevelScope, composed, translateRunner, tst.stream) + go[h, X, End](scope, extendedTopLevelScope, composed, translateRunner, tst.stream) case output: Output[_] => val view = getCont[Unit, G, X] @@ -1204,13 +1214,17 @@ object Pull extends PullLowPriority { case u: Uncons[G, y] @unchecked => val v = getCont[Option[(Chunk[y], Pull[G, y, Unit])], G, X] // a Uncons is run on the same scope, without shifting. - F.unit >> go(scope, extendedTopLevelScope, translation, new UnconsRunR(v), u.stream) + val runr = buildR[G, y, End] + F.unit >> go(scope, extendedTopLevelScope, translation, runr, u.stream).attempt + .flatMap(_.fold(goErr(_, v), _.apply(new UnconsRunR(v)))) case s: StepLeg[G, y] @unchecked => val v = getCont[Option[Stream.StepLeg[G, y]], G, X] + val runr = buildR[G, y, End] scope .shiftScope(s.scope, s.toString) - .flatMap(go(_, extendedTopLevelScope, translation, new StepLegRunR(v), s.stream)) + .flatMap(go(_, extendedTopLevelScope, translation, runr, s.stream).attempt) + .flatMap(_.fold(goErr(_, v), _.apply(new StepLegRunR(v)))) case _: GetScope[_] => go(scope, extendedTopLevelScope, translation, runner, getCont(Succeeded(scope))) @@ -1230,7 +1244,7 @@ object Pull extends PullLowPriority { val initFk: F ~> F = cats.arrow.FunctionK.id[F] - class OuterRun(initB: B) extends Run[F, O] { self => + class OuterRun(initB: B) extends Run[F, O, F[B]] { self => private[this] var accB: B = initB override def done(scope: Scope[F]): F[B] = F.pure(accB) @@ -1243,11 +1257,13 @@ object Pull extends PullLowPriority { override def out(head: Chunk[O], scope: Scope[F], tail: Pull[F, O, Unit]): F[B] = try { accB = foldChunk(accB, head) - go(scope, None, initFk, self, tail) + go[F, O, B](scope, None, initFk, self, tail) } catch { case NonFatal(e) => viewL(tail) match { - case _: Action[F, O, _] => go(scope, None, initFk, self, getCont(Fail(e))) + case _: Action[F, O, _] => + val v = contP.asInstanceOf[ContP[Unit, F, O, Unit]] + go[F, O, B](scope, None, initFk, self, v(Fail(e))) case Succeeded(_) => F.raiseError(e) case Fail(e2) => F.raiseError(CompositeFailure(e2, e)) case Interrupted(_, err) => F.raiseError(err.fold(e)(t => CompositeFailure(e, t))) @@ -1255,7 +1271,7 @@ object Pull extends PullLowPriority { } } - go[F, O](initScope, None, initFk, new OuterRun(init), stream) + go[F, O, B](initScope, None, initFk, new OuterRun(init), stream) } private[fs2] def flatMapOutput[F[_], F2[x] >: F[x], O, O2](