From 6c3304ad985e4ab19beb48dd876574c477cb0fbc Mon Sep 17 00:00:00 2001 From: "Diego E. Alonso Blas" Date: Sat, 25 Dec 2021 13:15:46 +0100 Subject: [PATCH] Pull Compilation Tweak: reuse BuildR object. In the compilation loop, the BuildR simply represents a sort-of callback or continuation, to connect the result of an inner stream. We use them because directly passing the Uncons or the StepLeg runners currently causes memory leaks. However, since the BuildR object itself has no state, we can just reuse a same object all through the compilation. --- core/shared/src/main/scala/fs2/Pull.scala | 43 ++++++++++++----------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 856b22b92c..44d1a29188 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -669,26 +669,24 @@ object Pull extends PullLowPriority { val del: Bind[F, O, Y, Unit] ) extends Bind[F, O, X, Unit](step) { def cont(tx: Terminal[X]): Pull[F, O, Unit] = - try bindBindAux(this, tx) + try bindBindAux(bb.cont(tx), del) catch { case NonFatal(e) => Fail(e) } } @tailrec @nowarn("cat=unchecked") private def bindBindAux[F[_], O, X, Y]( - bibi: BindBind[F, O, X, Y], - tx: Terminal[X] - ): Pull[F, O, Unit] = { - val py: Pull[F, O, Y] = bibi.bb.cont(tx) + py: Pull[F, O, Y], + del: Bind[F, O, Y, Unit] + ): Pull[F, O, Unit] = py match { case ty: Terminal[_] => - bibi.del match { + del match { case cici: BindBind[F, O, r, Y] => - bindBindAux[F, O, r, Y](cici, ty) - case _ => bibi.del.cont(ty) + bindBindAux[F, O, r, Y](cici.bb.cont(ty), cici.del) + case _ => del.cont(ty) } - case x => new DelegateBind(x, bibi.del) + case x => new DelegateBind(x, del) } - } /* An action is an instruction that can perform effects in `F` * to generate by-product outputs of type `O`. @@ -805,7 +803,7 @@ object Pull extends PullLowPriority { ): Pull[F, INothing, Option[(Chunk[O], Pull[F, O, Unit])]] = Uncons(s) - private type Cont[-Y, +G[_], +X] = Terminal[Y] => Pull[G, X, Unit] + private type Cont[-Y, +G[_], +O] = Terminal[Y] => Pull[G, O, Unit] private[this] type Nought[A] = Any @@ -871,25 +869,28 @@ object Pull extends PullLowPriority { } - trait Run[G[_], X, End] { + trait Run[-G[_], -X, +End] { def done(scope: Scope[F]): End def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): End def interrupted(inter: Interrupted): End def fail(e: Throwable): End } - type CallRun[G[_], X, End] = Run[G, X, End] => End + type CallRun[+G[_], +X, End] = Run[G, X, End] => End - class BuildR[G[_], X, End] extends Run[G, X, F[CallRun[G, X, F[End]]]] { + object TheBuildR extends Run[Pure, INothing, F[CallRun[Pure, Nothing, F[INothing]]]] { + type TheRun = Run[Pure, INothing, F[INothing]] def fail(e: Throwable) = F.raiseError(e) - def done(scope: Scope[F]) = - F.pure((cont: Run[G, X, F[End]]) => cont.done(scope)) - def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]) = - F.pure((cont: Run[G, X, F[End]]) => cont.out(head, scope, tail)) + F.pure((cont: TheRun) => cont.done(scope)) + def out(head: Chunk[INothing], scope: Scope[F], tail: Pull[Pure, INothing, Unit]) = + F.pure((cont: TheRun) => cont.out(head, scope, tail)) def interrupted(i: Interrupted) = - F.pure((cont: Run[G, X, F[End]]) => cont.interrupted(i)) + F.pure((cont: TheRun) => cont.interrupted(i)) } + def buildR[G[_], X, End]: Run[G, X, F[CallRun[G, X, F[End]]]] = + TheBuildR.asInstanceOf[Run[G, X, F[CallRun[G, X, F[End]]]]] + def go[G[_], X, End]( scope: Scope[F], extendedTopLevelScope: Option[Scope[F]], @@ -1184,13 +1185,13 @@ object Pull extends PullLowPriority { case u: Uncons[G, y] @nowarn => val v = getCont[Option[(Chunk[y], Pull[G, y, Unit])], G, X] // a Uncons is run on the same scope, without shifting. - val runr = new BuildR[G, y, End] + val runr = buildR[G, y, End] F.unit >> go(scope, extendedTopLevelScope, translation, runr, u.stream).attempt .flatMap(_.fold(goErr(_, v), _.apply(new UnconsRunR(v)))) case s: StepLeg[G, y] @nowarn => val v = getCont[Option[Stream.StepLeg[G, y]], G, X] - val runr = new BuildR[G, y, End] + val runr = buildR[G, y, End] scope .shiftScope(s.scope, s.toString) .flatMap(go(_, extendedTopLevelScope, translation, runr, s.stream).attempt)