diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 4bdb9bf7fa..a31e02cf14 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -861,17 +861,11 @@ object Pull extends PullLowPriority { scope: Scope[F], extendedTopLevelScope: Option[Scope[F]], translation: G ~> F, - endRunner: Run[G, X, F[End]], + runner: Run[G, X, F[End]], stream: Pull[G, X, Unit] ): F[End] = { - def interruptGuard[Mid]( - scope: Scope[F], - view: Cont[Nothing, G, X], - runner: Run[G, X, F[Mid]] - )( - next: => F[Mid] - ): F[Mid] = + def interruptGuard(scope: Scope[F], view: Cont[Nothing, G, X])(next: => F[End]): F[End] = scope.isInterrupted.flatMap { case None => next case Some(outcome) => @@ -912,13 +906,13 @@ object Pull extends PullLowPriority { } def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[End] = - go(scope, extendedTopLevelScope, translation, endRunner, view(Fail(err))) + go(scope, extendedTopLevelScope, translation, runner, view(Fail(err))) class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X, F[End]] { - private val prevRunner = endRunner + private val prevRunner = runner def done(doneScope: Scope[F]): F[End] = - go(doneScope, extendedTopLevelScope, translation, endRunner, view(unit)) + go(doneScope, extendedTopLevelScope, translation, prevRunner, view(unit)) def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[End] = { @tailrec @@ -933,7 +927,7 @@ object Pull extends PullLowPriority { def interrupted(tok: Unique.Token, err: Option[Throwable]): F[End] = { val next = view(Interrupted(tok, err)) - go(scope, extendedTopLevelScope, translation, endRunner, next) + go(scope, extendedTopLevelScope, translation, prevRunner, next) } def fail(e: Throwable): F[End] = goErr(e, view) @@ -946,13 +940,13 @@ object Pull extends PullLowPriority { abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y, F[End]] { def done(scope: Scope[F]): F[End] = - interruptGuard(scope, view, endRunner) { - go(scope, extendedTopLevelScope, translation, endRunner, view(Succeeded(None))) + interruptGuard(scope, view) { + go(scope, extendedTopLevelScope, translation, runner, view(Succeeded(None))) } def interrupted(scopeId: Unique.Token, err: Option[Throwable]): F[End] = { val next = view(Interrupted(scopeId, err)) - go(scope, extendedTopLevelScope, translation, endRunner, next) + go(scope, extendedTopLevelScope, translation, runner, next) } def fail(e: Throwable): F[End] = goErr(e, view) @@ -963,9 +957,9 @@ object Pull extends PullLowPriority { def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = // For a Uncons, we continue in same Scope at which we ended compilation of inner stream - interruptGuard(outScope, view, endRunner) { + interruptGuard(outScope, view) { val result = Succeeded(Some((head, tail))) - go(outScope, extendedTopLevelScope, translation, endRunner, view(result)) + go(outScope, extendedTopLevelScope, translation, runner, view(result)) } } @@ -975,9 +969,9 @@ object Pull extends PullLowPriority { def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = // StepLeg: we shift back to the scope at which we were // before we started to interpret the Leg's inner stream. - interruptGuard(scope, view, endRunner) { + interruptGuard(scope, view) { val result = Succeeded(Some(new Stream.StepLeg(head, outScope.id, tail))) - go(scope, extendedTopLevelScope, translation, endRunner, view(result)) + go(scope, extendedTopLevelScope, translation, runner, view(result)) } } @@ -1014,18 +1008,18 @@ object Pull extends PullLowPriority { } def done(scope: Scope[F]): F[End] = - interruptGuard(scope, outView, endRunner) { - go(scope, extendedTopLevelScope, translation, endRunner, outView(unit)) + interruptGuard(scope, outView) { + go(scope, extendedTopLevelScope, translation, runner, outView(unit)) } def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = { val next = bindView(unconsed(head, tail), outView) - go(outScope, extendedTopLevelScope, translation, endRunner, next) + go(outScope, extendedTopLevelScope, translation, runner, next) } def interrupted(scopeId: Unique.Token, err: Option[Throwable]): F[End] = { val next = outView(Interrupted(scopeId, err)) - go(scope, extendedTopLevelScope, translation, endRunner, next) + go(scope, extendedTopLevelScope, translation, runner, next) } def fail(e: Throwable): F[End] = goErr(e, outView) @@ -1039,7 +1033,7 @@ object Pull extends PullLowPriority { case Left(Outcome.Canceled()) => Interrupted(scope.id, None) case Left(Outcome.Succeeded(token)) => Interrupted(token, None) } - go(scope, extendedTopLevelScope, translation, endRunner, view(result)) + go(scope, extendedTopLevelScope, translation, runner, view(result)) } def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[End] = { @@ -1049,7 +1043,6 @@ object Pull extends PullLowPriority { else translation(acquire.resource), (resource, exit) => translation(acquire.release(resource, exit)) ) - val cont = onScope.flatMap { outcome => val result = outcome match { case Outcome.Succeeded(Right(r)) => Succeeded(r) @@ -1057,9 +1050,9 @@ object Pull extends PullLowPriority { case Outcome.Canceled() => Interrupted(scope.id, None) case Outcome.Errored(err) => Fail(err) } - go(scope, extendedTopLevelScope, translation, endRunner, view(result)) + go(scope, extendedTopLevelScope, translation, runner, view(result)) } - interruptGuard(scope, view, endRunner)(cont) + interruptGuard(scope, view)(cont) } def goInterruptWhen( @@ -1077,9 +1070,9 @@ object Pull extends PullLowPriority { case Outcome.Canceled() => Interrupted(scope.id, None) case Outcome.Errored(err) => Fail(err) } - go(scope, extendedTopLevelScope, translation, endRunner, view(result)) + go(scope, extendedTopLevelScope, translation, runner, view(result)) } - interruptGuard(scope, view, endRunner)(cont) + interruptGuard(scope, view)(cont) } def goInScope( @@ -1102,16 +1095,15 @@ object Pull extends PullLowPriority { else F.pure(extendedTopLevelScope) - val vrun = new ViewRunner(view) val tail = maybeCloseExtendedScope.flatMap { newExtendedScope => scope.open(useInterruption).rethrow.flatMap { childScope => val bb = new Bind[G, X, Unit, Unit](stream) { def cont(r: Terminal[Unit]): Pull[G, X, Unit] = endScope(childScope.id, r) } - go(childScope, newExtendedScope, translation, vrun, bb) + go(childScope, newExtendedScope, translation, new ViewRunner(view), bb) } } - interruptGuard(scope, view, vrun)(tail) + interruptGuard(scope, view)(tail) } def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[End] = { @@ -1147,41 +1139,41 @@ object Pull extends PullLowPriority { scope.findInLineage(close.scopeId).flatMap { case Some(toClose) if toClose.isRoot => // Impossible - don't close root scope as a result of a `CloseScope` call - go(scope, extendedTopLevelScope, translation, endRunner, viewCont(unit)) + go(scope, extendedTopLevelScope, translation, runner, viewCont(unit)) case Some(toClose) if extendLastTopLevelScope && toClose.level == 1 => // Request to close the current top-level scope - if we're supposed to extend // it instead, leave the scope open and pass it to the continuation extendedTopLevelScope.traverse_(_.close(ExitCase.Succeeded).rethrow) *> toClose.openAncestor.flatMap { ancestor => - go(ancestor, Some(toClose), translation, endRunner, viewCont(unit)) + go(ancestor, Some(toClose), translation, runner, viewCont(unit)) } case Some(toClose) => toClose.close(close.exitCase).flatMap { r => toClose.openAncestor.flatMap { ancestor => val res = closeTerminal(r, ancestor) - go(ancestor, extendedTopLevelScope, translation, endRunner, viewCont(res)) + go(ancestor, extendedTopLevelScope, translation, runner, viewCont(res)) } } case None => // scope already closed, continue with current scope val result = close.interruption.getOrElse(unit) - go(scope, extendedTopLevelScope, translation, endRunner, viewCont(result)) + go(scope, extendedTopLevelScope, translation, runner, viewCont(result)) } } viewL(stream) match { - case _: Succeeded[_] => endRunner.done(scope) - case failed: Fail => endRunner.fail(failed.error) - case int: Interrupted => endRunner.interrupted(int.context, int.deferredError) + case _: Succeeded[_] => runner.done(scope) + case failed: Fail => runner.fail(failed.error) + case int: Interrupted => runner.interrupted(int.context, int.deferredError) case view: View[G, X, y] => view.step match { case output: Output[_] => - interruptGuard(scope, view, endRunner)( - endRunner.out(output.values, scope, view(unit)) + interruptGuard(scope, view)( + runner.out(output.values, scope, view(unit)) ) case fmout: FlatMapOutput[g, z, _] => // y = Unit @@ -1191,13 +1183,13 @@ object Pull extends PullLowPriority { val composed: h ~> F = translation.asInstanceOf[g ~> F].compose[h](tst.fk) val translateRunner: Run[h, X, F[End]] = new Run[h, X, F[End]] { - def done(scope: Scope[F]): F[End] = endRunner.done(scope) + def done(scope: Scope[F]): F[End] = runner.done(scope) def out(head: Chunk[X], scope: Scope[F], tail: Pull[h, X, Unit]): F[End] = - endRunner.out(head, scope, Translate(tail, tst.fk)) + runner.out(head, scope, Translate(tail, tst.fk)) def interrupted(scopeId: Unique.Token, err: Option[Throwable]): F[End] = - endRunner.interrupted(scopeId, err) + runner.interrupted(scopeId, err) def fail(e: Throwable): F[End] = - endRunner.fail(e) + runner.fail(e) } go[h, X, End](scope, extendedTopLevelScope, composed, translateRunner, tst.stream) @@ -1216,7 +1208,7 @@ object Pull extends PullLowPriority { case _: GetScope[_] => val result = Succeeded(scope.asInstanceOf[y]) - go(scope, extendedTopLevelScope, translation, endRunner, view(result)) + go(scope, extendedTopLevelScope, translation, runner, view(result)) case mout: MapOutput[g, z, _] => goMapOutput[z](mout, view) case eval: Eval[G, r] => goEval[r](eval, view)