Skip to content

Commit

Permalink
Merge pull request #2864 from typelevel/revert-2828-unwind_bind_bind
Browse files Browse the repository at this point in the history
Revert "Pull - Cut ties with BindBind, no need for BuildR"
  • Loading branch information
mpilquist authored Mar 30, 2022
2 parents f504097 + fd1d154 commit 881d22d
Showing 1 changed file with 76 additions and 60 deletions.
136 changes: 76 additions & 60 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,9 @@ object Pull extends PullLowPriority {
def cont(r: Terminal[Unit]): Pull[Pure, INothing, Unit] = r
}

private abstract class Bind[+F[_], +O, X, +R](misstep: Pull[F, O, X])
private abstract class Bind[+F[_], +O, X, +R](val step: Pull[F, O, X])
extends Pull[F, O, R]
with ContP[X, F, O, R] {
def step: Pull[F, O, X] = misstep
def cont(r: Terminal[X]): Pull[F, O, R]
def delegate: Bind[F, O, X, R] = this
}
Expand Down Expand Up @@ -694,12 +693,12 @@ object Pull extends PullLowPriority {

// This class is not created by combinators in public Pull API, only during compilation
private class BindBind[F[_], O, X, Y](
var innerBind: Bind[F, O, X, Y],
var endBind: Bind[F, O, Y, Unit]
) extends Bind[F, O, X, Unit](null) {
override def step: Pull[F, O, X] = innerBind.step
def cont(xterm: Terminal[X]): Pull[F, O, Unit] =
try bindBindAux(xterm, this)
step: Pull[F, O, X],
val bb: Bind[F, O, X, Y],
val del: Bind[F, O, Y, Unit]
) extends Bind[F, O, X, Unit](step) {
def cont(tx: Terminal[X]): Pull[F, O, Unit] =
try bindBindAux(bb.cont(tx), del)
catch { case NonFatal(e) => Fail(e) }
}

Expand All @@ -712,12 +711,7 @@ object Pull extends PullLowPriority {
case ty: Terminal[_] =>
del match {
case cici: BindBind[F, O, r, Y] =>
val innerBind = cici.innerBind
val endBind = cici.endBind
cici.innerBind = null
cici.endBind = null
val nextStep = innerBind.cont(ty)
bindBindAux[F, O, r, Y](nextStep, endBind)
bindBindAux[F, O, r, Y](cici.bb.cont(ty), cici.del)
case _ => del.cont(ty)
}
case x => new DelegateBind(x, del)
Expand Down Expand Up @@ -871,7 +865,7 @@ object Pull extends PullLowPriority {
case b: Bind[G, X, y, Unit] =>
b.step match {
case c: Bind[G, X, x, _] =>
viewL(new BindBind[G, X, x, y](c, b.delegate))
viewL(new BindBind[G, X, x, y](c.step, c.delegate, b.delegate))
case e: Action[G, X, y2] =>
contP = b.delegate
e
Expand Down Expand Up @@ -904,22 +898,37 @@ object Pull extends PullLowPriority {

}

trait Run[-G[_], -X] {
def done(scope: Scope[F]): F[B]
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[B]
def interrupted(inter: Interrupted): F[B]
def fail(e: Throwable): F[B]
trait Run[-G[_], -X, +End] {
def done(scope: Scope[F]): End
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): End
def interrupted(inter: Interrupted): End
def fail(e: Throwable): End
}
type CallRun[+G[_], +X, End] = Run[G, X, End] => End

object TheBuildR extends Run[Pure, INothing, F[CallRun[Pure, Nothing, F[INothing]]]] {
type TheRun = Run[Pure, INothing, F[INothing]]
def fail(e: Throwable) = F.raiseError(e)
def done(scope: Scope[F]) =
F.pure((cont: TheRun) => cont.done(scope))
def out(head: Chunk[INothing], scope: Scope[F], tail: Pull[Pure, INothing, Unit]) =
F.pure((cont: TheRun) => cont.out(head, scope, tail))
def interrupted(i: Interrupted) =
F.pure((cont: TheRun) => cont.interrupted(i))
}

def buildR[G[_], X, End]: Run[G, X, F[CallRun[G, X, F[End]]]] =
TheBuildR.asInstanceOf[Run[G, X, F[CallRun[G, X, F[End]]]]]

def go[G[_], X](
def go[G[_], X, End](
scope: Scope[F],
extendedTopLevelScope: Option[Scope[F]],
translation: G ~> F,
runner: Run[G, X],
runner: Run[G, X, F[End]],
stream: Pull[G, X, Unit]
): F[B] = {
): F[End] = {

def interruptGuard(scope: Scope[F], view: Cont[INothing, G, X])(next: => F[B]): F[B] =
def interruptGuard(scope: Scope[F], view: Cont[INothing, G, X])(next: => F[End]): F[End] =
scope.isInterrupted.flatMap {
case None => next
case Some(outcome) =>
Expand All @@ -931,18 +940,18 @@ object Pull extends PullLowPriority {
go(scope, extendedTopLevelScope, translation, runner, view(result))
}

def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[B] =
def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[End] =
go(scope, extendedTopLevelScope, translation, runner, view(Fail(err)))

class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X] {
class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X, F[End]] {
private val prevRunner = runner

def done(doneScope: Scope[F]): F[B] =
def done(doneScope: Scope[F]): F[End] =
go(doneScope, extendedTopLevelScope, translation, prevRunner, view(unit))

def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[B] = {
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[End] = {
@tailrec
def outLoop(acc: Pull[G, X, Unit], pred: Run[G, X]): F[B] =
def outLoop(acc: Pull[G, X, Unit], pred: Run[G, X, F[End]]): F[End] =
// bit of an ugly hack to avoid a stack overflow when these accummulate
pred match {
case vrun: ViewRunner => outLoop(bindView(acc, vrun.view), vrun.prevRunner)
Expand All @@ -951,40 +960,40 @@ object Pull extends PullLowPriority {
outLoop(tail, this)
}

def interrupted(inter: Interrupted): F[B] =
def interrupted(inter: Interrupted): F[End] =
go(scope, extendedTopLevelScope, translation, prevRunner, view(inter))

def fail(e: Throwable): F[B] = goErr(e, view)
def fail(e: Throwable): F[End] = goErr(e, view)
}

class TranslateRunner[H[_]](fk: H ~> G, view: Cont[Unit, G, X]) extends Run[H, X] {
def done(doneScope: Scope[F]): F[B] =
class TranslateRunner[H[_]](fk: H ~> G, view: Cont[Unit, G, X]) extends Run[H, X, F[End]] {
def done(doneScope: Scope[F]): F[End] =
go(doneScope, extendedTopLevelScope, translation, runner, view(unit))
def out(head: Chunk[X], scope: Scope[F], tail: Pull[H, X, Unit]): F[B] = {
def out(head: Chunk[X], scope: Scope[F], tail: Pull[H, X, Unit]): F[End] = {
val next = bindView(Translate(tail, fk), view)
runner.out(head, scope, next)
}
def interrupted(inter: Interrupted): F[B] =
def interrupted(inter: Interrupted): F[End] =
go(scope, extendedTopLevelScope, translation, runner, view(inter))
def fail(e: Throwable): F[B] = goErr(e, view)
def fail(e: Throwable): F[End] = goErr(e, view)
}

abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y] {
def done(scope: Scope[F]): F[B] =
abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y, F[End]] {
def done(scope: Scope[F]): F[End] =
interruptGuard(scope, view) {
go(scope, extendedTopLevelScope, translation, runner, view(Succeeded(None)))
}

def interrupted(inter: Interrupted): F[B] =
def interrupted(inter: Interrupted): F[End] =
go(scope, extendedTopLevelScope, translation, runner, view(inter))

def fail(e: Throwable): F[B] = goErr(e, view)
def fail(e: Throwable): F[End] = goErr(e, view)
}

class UnconsRunR[Y](view: Cont[Option[(Chunk[Y], Pull[G, Y, Unit])], G, X])
extends StepRunR[Y, (Chunk[Y], Pull[G, Y, Unit])](view) {

def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] =
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] =
// For a Uncons, we continue in same Scope at which we ended compilation of inner stream
interruptGuard(outScope, view) {
val result = Succeeded(Some((head, tail)))
Expand All @@ -995,7 +1004,7 @@ object Pull extends PullLowPriority {
class StepLegRunR[Y](view: Cont[Option[Stream.StepLeg[G, Y]], G, X])
extends StepRunR[Y, Stream.StepLeg[G, Y]](view) {

def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] =
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] =
// StepLeg: we shift back to the scope at which we were
// before we started to interpret the Leg's inner stream.
interruptGuard(scope, view) {
Expand All @@ -1004,7 +1013,8 @@ object Pull extends PullLowPriority {
}
}

class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit]) extends Run[G, Y] {
class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit])
extends Run[G, Y, F[End]] {
private[this] def unconsed(chunk: Chunk[Y], tail: Pull[G, Y, Unit]): Pull[G, X, Unit] =
if (chunk.size == 1 && tail.isInstanceOf[Succeeded[_]])
// nb: If tl is Pure, there's no need to propagate flatMap through the tail. Hence, we
Expand All @@ -1030,23 +1040,23 @@ object Pull extends PullLowPriority {
go(0)
}

def done(scope: Scope[F]): F[B] =
def done(scope: Scope[F]): F[End] =
interruptGuard(scope, view) {
go(scope, extendedTopLevelScope, translation, runner, view(unit))
}

def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] = {
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = {
val next = bindView(unconsed(head, tail), view)
go(outScope, extendedTopLevelScope, translation, runner, next)
}

def interrupted(inter: Interrupted): F[B] =
def interrupted(inter: Interrupted): F[End] =
go(scope, extendedTopLevelScope, translation, runner, view(inter))

def fail(e: Throwable): F[B] = goErr(e, view)
def fail(e: Throwable): F[End] = goErr(e, view)
}

def goEval[V](eval: Eval[G, V], view: Cont[V, G, X]): F[B] =
def goEval[V](eval: Eval[G, V], view: Cont[V, G, X]): F[End] =
scope.interruptibleEval(translation(eval.value)).flatMap { eitherOutcome =>
val result = eitherOutcome match {
case Right(r) => Succeeded(r)
Expand All @@ -1057,7 +1067,7 @@ object Pull extends PullLowPriority {
go(scope, extendedTopLevelScope, translation, runner, view(result))
}

def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[B] = {
def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[End] = {
val onScope = scope.acquireResource[R](
poll =>
if (acquire.cancelable) poll(translation(acquire.resource))
Expand All @@ -1079,7 +1089,7 @@ object Pull extends PullLowPriority {
def goInterruptWhen(
haltOnSignal: F[Either[Throwable, Unit]],
view: Cont[Unit, G, X]
): F[B] = {
): F[End] = {
val onScope = scope.acquireResource(
_ => scope.interruptWhen(haltOnSignal),
(f: Fiber[F, Throwable, Unit], _: ExitCase) => f.cancel
Expand All @@ -1100,7 +1110,7 @@ object Pull extends PullLowPriority {
stream: Pull[G, X, Unit],
useInterruption: Boolean,
view: Cont[Unit, G, X]
): F[B] = {
): F[End] = {
def endScope(scopeId: Unique.Token, result: Terminal[Unit]): Pull[G, X, Unit] =
result match {
case Succeeded(_) => SucceedScope(scopeId)
Expand All @@ -1127,7 +1137,7 @@ object Pull extends PullLowPriority {
interruptGuard(scope, view)(tail)
}

def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[B] = {
def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[End] = {
def addError(err: Throwable, res: Terminal[Unit]): Terminal[Unit] = res match {
case Succeeded(_) => Fail(err)
case Fail(err0) => Fail(CompositeFailure(err, err0, Nil))
Expand Down Expand Up @@ -1187,9 +1197,9 @@ object Pull extends PullLowPriority {

(viewL(stream): @unchecked) match { // unchecked b/c scala 3 erroneously reports exhaustiveness warning
case tst: Translate[h, G, _] @unchecked => // y = Unit
val translateRunner: Run[h, X] = new TranslateRunner(tst.fk, getCont[Unit, G, X])
val translateRunner: Run[h, X, F[End]] = new TranslateRunner(tst.fk, getCont[Unit, G, X])
val composed: h ~> F = translation.compose[h](tst.fk)
go[h, X](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)
go[h, X, End](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)

case output: Output[_] =>
val view = getCont[Unit, G, X]
Expand All @@ -1204,13 +1214,17 @@ object Pull extends PullLowPriority {
case u: Uncons[G, y] @unchecked =>
val v = getCont[Option[(Chunk[y], Pull[G, y, Unit])], G, X]
// a Uncons is run on the same scope, without shifting.
F.unit >> go(scope, extendedTopLevelScope, translation, new UnconsRunR(v), u.stream)
val runr = buildR[G, y, End]
F.unit >> go(scope, extendedTopLevelScope, translation, runr, u.stream).attempt
.flatMap(_.fold(goErr(_, v), _.apply(new UnconsRunR(v))))

case s: StepLeg[G, y] @unchecked =>
val v = getCont[Option[Stream.StepLeg[G, y]], G, X]
val runr = buildR[G, y, End]
scope
.shiftScope(s.scope, s.toString)
.flatMap(go(_, extendedTopLevelScope, translation, new StepLegRunR(v), s.stream))
.flatMap(go(_, extendedTopLevelScope, translation, runr, s.stream).attempt)
.flatMap(_.fold(goErr(_, v), _.apply(new StepLegRunR(v))))

case _: GetScope[_] =>
go(scope, extendedTopLevelScope, translation, runner, getCont(Succeeded(scope)))
Expand All @@ -1230,7 +1244,7 @@ object Pull extends PullLowPriority {

val initFk: F ~> F = cats.arrow.FunctionK.id[F]

class OuterRun(initB: B) extends Run[F, O] { self =>
class OuterRun(initB: B) extends Run[F, O, F[B]] { self =>
private[this] var accB: B = initB

override def done(scope: Scope[F]): F[B] = F.pure(accB)
Expand All @@ -1243,19 +1257,21 @@ object Pull extends PullLowPriority {
override def out(head: Chunk[O], scope: Scope[F], tail: Pull[F, O, Unit]): F[B] =
try {
accB = foldChunk(accB, head)
go(scope, None, initFk, self, tail)
go[F, O, B](scope, None, initFk, self, tail)
} catch {
case NonFatal(e) =>
viewL(tail) match {
case _: Action[F, O, _] => go(scope, None, initFk, self, getCont(Fail(e)))
case _: Action[F, O, _] =>
val v = contP.asInstanceOf[ContP[Unit, F, O, Unit]]
go[F, O, B](scope, None, initFk, self, v(Fail(e)))
case Succeeded(_) => F.raiseError(e)
case Fail(e2) => F.raiseError(CompositeFailure(e2, e))
case Interrupted(_, err) => F.raiseError(err.fold(e)(t => CompositeFailure(e, t)))
}
}
}

go[F, O](initScope, None, initFk, new OuterRun(init), stream)
go[F, O, B](initScope, None, initFk, new OuterRun(init), stream)
}

private[fs2] def flatMapOutput[F[_], F2[x] >: F[x], O, O2](
Expand Down

0 comments on commit 881d22d

Please sign in to comment.