diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 2ea73a9dce..f542280408 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -669,26 +669,24 @@ object Pull extends PullLowPriority { val del: Bind[F, O, Y, Unit] ) extends Bind[F, O, X, Unit](step) { def cont(tx: Terminal[X]): Pull[F, O, Unit] = - try bindBindAux(this, tx) + try bindBindAux(bb.cont(tx), del) catch { case NonFatal(e) => Fail(e) } } @tailrec @nowarn("cat=unchecked") private def bindBindAux[F[_], O, X, Y]( - bibi: BindBind[F, O, X, Y], - tx: Terminal[X] - ): Pull[F, O, Unit] = { - val py: Pull[F, O, Y] = bibi.bb.cont(tx) + py: Pull[F, O, Y], + del: Bind[F, O, Y, Unit] + ): Pull[F, O, Unit] = py match { case ty: Terminal[_] => - bibi.del match { + del match { case cici: BindBind[F, O, r, Y] => - bindBindAux[F, O, r, Y](cici, ty) - case _ => bibi.del.cont(ty) + bindBindAux[F, O, r, Y](cici.bb.cont(ty), cici.del) + case _ => del.cont(ty) } - case x => new DelegateBind(x, bibi.del) + case x => new DelegateBind(x, del) } - } /* An action is an instruction that can perform effects in `F` * to generate by-product outputs of type `O`. @@ -805,7 +803,7 @@ object Pull extends PullLowPriority { ): Pull[F, INothing, Option[(Chunk[O], Pull[F, O, Unit])]] = Uncons(s) - private type Cont[-Y, +G[_], +X] = Terminal[Y] => Pull[G, X, Unit] + private type Cont[-Y, +G[_], +O] = Terminal[Y] => Pull[G, O, Unit] private[this] type Nought[A] = Any @@ -871,25 +869,28 @@ object Pull extends PullLowPriority { } - trait Run[G[_], X, End] { + trait Run[-G[_], -X, +End] { def done(scope: Scope[F]): End def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): End def interrupted(inter: Interrupted): End def fail(e: Throwable): End } - type CallRun[G[_], X, End] = Run[G, X, End] => End + type CallRun[+G[_], +X, End] = Run[G, X, End] => End - class BuildR[G[_], X, End] extends Run[G, X, F[CallRun[G, X, F[End]]]] { + object TheBuildR extends Run[Pure, INothing, F[CallRun[Pure, Nothing, F[INothing]]]] { + type TheRun = Run[Pure, INothing, F[INothing]] def fail(e: Throwable) = F.raiseError(e) - def done(scope: Scope[F]) = - F.pure((cont: Run[G, X, F[End]]) => cont.done(scope)) - def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]) = - F.pure((cont: Run[G, X, F[End]]) => cont.out(head, scope, tail)) + F.pure((cont: TheRun) => cont.done(scope)) + def out(head: Chunk[INothing], scope: Scope[F], tail: Pull[Pure, INothing, Unit]) = + F.pure((cont: TheRun) => cont.out(head, scope, tail)) def interrupted(i: Interrupted) = - F.pure((cont: Run[G, X, F[End]]) => cont.interrupted(i)) + F.pure((cont: TheRun) => cont.interrupted(i)) } + def buildR[G[_], X, End]: Run[G, X, F[CallRun[G, X, F[End]]]] = + TheBuildR.asInstanceOf[Run[G, X, F[CallRun[G, X, F[End]]]]] + def go[G[_], X, End]( scope: Scope[F], extendedTopLevelScope: Option[Scope[F]], @@ -1184,13 +1185,13 @@ object Pull extends PullLowPriority { case u: Uncons[G, y] @unchecked => val v = getCont[Option[(Chunk[y], Pull[G, y, Unit])], G, X] // a Uncons is run on the same scope, without shifting. - val runr = new BuildR[G, y, End] + val runr = buildR[G, y, End] F.unit >> go(scope, extendedTopLevelScope, translation, runr, u.stream).attempt .flatMap(_.fold(goErr(_, v), _.apply(new UnconsRunR(v)))) case s: StepLeg[G, y] @unchecked => val v = getCont[Option[Stream.StepLeg[G, y]], G, X] - val runr = new BuildR[G, y, End] + val runr = buildR[G, y, End] scope .shiftScope(s.scope, s.toString) .flatMap(go(_, extendedTopLevelScope, translation, runr, s.stream).attempt)