From 8d881a1524247bfa8f34cd6cee28c1863bf3f27b Mon Sep 17 00:00:00 2001 From: Nikita Gazarov Date: Thu, 27 Jun 2024 00:23:52 -0700 Subject: [PATCH] WIP: Zooming into a Var should not require an Owner (#119) --- .../airstream/state/PureDerivedVar.scala | 58 ++ .../state/PureDerivedVarSignal.scala | 33 + .../scala/com/raquo/airstream/state/Var.scala | 4 + .../airstream/state/PureDerivedVarSpec.scala | 685 ++++++++++++++++++ 4 files changed, 780 insertions(+) create mode 100644 src/main/scala/com/raquo/airstream/state/PureDerivedVar.scala create mode 100644 src/main/scala/com/raquo/airstream/state/PureDerivedVarSignal.scala create mode 100644 src/test/scala/com/raquo/airstream/state/PureDerivedVarSpec.scala diff --git a/src/main/scala/com/raquo/airstream/state/PureDerivedVar.scala b/src/main/scala/com/raquo/airstream/state/PureDerivedVar.scala new file mode 100644 index 00000000..23e42476 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/state/PureDerivedVar.scala @@ -0,0 +1,58 @@ +package com.raquo.airstream.state + +import com.raquo.airstream.core.AirstreamError.VarError +import com.raquo.airstream.core.{AirstreamError, Transaction} +import com.raquo.airstream.ownership.Owner + +import scala.util.{Failure, Success, Try} + +// #nc Update comments + +/** DerivedVar has the same Var contract as SourceVar, but instead of maintaining its own state + * it is essentially a lens on the underlying SourceVar. + * + * This Var is active for as long as its signal has listeners. + * Being a StrictSignal, it already starts out with a subscription owned by `owner`, + * but even if owner kills its subscriptions, this Var's signal might have other listeners. + */ +class PureDerivedVar[A, B]( + parent: Var[A], + zoomIn: A => B, + zoomOut: (A, B) => A, + displayNameSuffix: String +) extends Var[B] { + + override private[state] def underlyingVar: SourceVar[_] = parent.underlyingVar + + private[this] val _varSignal = new PureDerivedVarSignal(parent, zoomIn, displayName) + + // #Note this getCurrentValue implementation is different from SourceVar + // - SourceVar's getCurrentValue looks at an internal currentValue variable + // - That currentValue gets updated immediately before the signal (in an already existing transaction) + // - I hope this doesn't introduce weird transaction related timing glitches + // - But even if it does, I think keeping derived var's current value consistent with its signal value + // is more important, otherwise it would be madness if the derived var was accessed after its owner + // was killed + override private[state] def getCurrentValue: Try[B] = signal.tryNow() + + override private[state] def setCurrentValue(value: Try[B], transaction: Transaction): Unit = { + // #nc Unlike the old DerivedVar, we don't check `_varSignal.isStarted` before updating the parent. + // - Is that "natural" because we don't have an explicit "owner" here, or is that a change in semantics? + parent.tryNow() match { + case Success(parentValue) => + // This can update the parent without causing an infinite loop because + // the parent updates this derived var's signal, it does not call + // setCurrentValue on this var directly. + val nextValue = value.map(zoomOut(parentValue, _)) + // println(s">> parent.setCurrentValue($nextValue)") + parent.setCurrentValue(nextValue, transaction) + + case Failure(err) => + AirstreamError.sendUnhandledError(VarError(s"Unable to zoom out of derived var when the parent var is failed.", cause = Some(err))) + } + } + + override val signal: StrictSignal[B] = _varSignal + + override protected def defaultDisplayName: String = parent.displayName + displayNameSuffix +} diff --git a/src/main/scala/com/raquo/airstream/state/PureDerivedVarSignal.scala b/src/main/scala/com/raquo/airstream/state/PureDerivedVarSignal.scala new file mode 100644 index 00000000..505f6d59 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/state/PureDerivedVarSignal.scala @@ -0,0 +1,33 @@ +package com.raquo.airstream.state + +import com.raquo.airstream.core.{Protected, Signal} +import com.raquo.airstream.misc.MapSignal + +import scala.util.Try + +class PureDerivedVarSignal[I, O]( + parent: Var[I], + zoomIn: I => O, + parentDisplayName: => String +) extends MapSignal[I, O](parent.signal, project = zoomIn, recover = None) with StrictSignal[O] { self => + + // Note that even if owner kills subscription, this signal might remain due to other listeners + // override protected[state] def isStarted: Boolean = super.isStarted + + override protected def defaultDisplayName: String = parentDisplayName + ".signal" + + override def tryNow(): Try[O] = { + val newParentLastUpdateId = Protected.lastUpdateId(parent.signal) + if (newParentLastUpdateId != _parentLastUpdateId) { + // This branch can only run if !isStarted + val nextValue = currentValueFromParent() + updateCurrentValueFromParent(nextValue, newParentLastUpdateId) + nextValue + } else { + super.tryNow() + } + } + + override protected[state] def updateCurrentValueFromParent(nextValue: Try[O], nextParentLastUpdateId: Int): Unit = + super.updateCurrentValueFromParent(nextValue, nextParentLastUpdateId) +} diff --git a/src/main/scala/com/raquo/airstream/state/Var.scala b/src/main/scala/com/raquo/airstream/state/Var.scala index df299b46..8c0a8f05 100644 --- a/src/main/scala/com/raquo/airstream/state/Var.scala +++ b/src/main/scala/com/raquo/airstream/state/Var.scala @@ -85,6 +85,10 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { new DerivedVar[A, B](this, in, out, owner, displayNameSuffix = ".zoom") } + def zoomPure[B](in: A => B)(out: (A, B) => A): Var[B] = { + new PureDerivedVar[A, B](this, in, out, displayNameSuffix = ".zoomPure") + } + def setTry(tryValue: Try[A]): Unit = writer.onTry(tryValue) final def set(value: A): Unit = setTry(Success(value)) diff --git a/src/test/scala/com/raquo/airstream/state/PureDerivedVarSpec.scala b/src/test/scala/com/raquo/airstream/state/PureDerivedVarSpec.scala new file mode 100644 index 00000000..28e4a1a1 --- /dev/null +++ b/src/test/scala/com/raquo/airstream/state/PureDerivedVarSpec.scala @@ -0,0 +1,685 @@ +package com.raquo.airstream.state + +import com.raquo.airstream.UnitSpec +import com.raquo.airstream.core.AirstreamError.VarError +import com.raquo.airstream.core.{AirstreamError, Observer} +import com.raquo.airstream.fixtures.{Calculation, Effect, TestableOwner} +import org.scalatest.BeforeAndAfter + +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +class PureDerivedVarSpec extends UnitSpec with BeforeAndAfter { + + case class Form(int: Int) + + private val errorEffects = mutable.Buffer[Effect[Throwable]]() + + private val errorCallback = (err: Throwable) => { + errorEffects += Effect("unhandled", err) + () + } + + before { + errorEffects.clear() + AirstreamError.registerUnhandledErrorCallback(errorCallback) + AirstreamError.unregisterUnhandledErrorCallback(AirstreamError.consoleErrorCallback) + } + + after { + AirstreamError.registerUnhandledErrorCallback(AirstreamError.consoleErrorCallback) + AirstreamError.unregisterUnhandledErrorCallback(errorCallback) + assert(errorEffects.isEmpty) // #Note this fails the test rather inelegantly + } + + it("lazy eval") { + + val effects: mutable.Buffer[String] = mutable.Buffer() + + val s = Var(Form(10)) + val d = s.zoomPure[Int]( + in = form => { + effects += s"zoomIn-${form.int}" + form.int + } + )( + out = (form, int) => { + val newForm = form.copy(int = int) + effects += s"zoomOut-${newForm}" + newForm + } + ) + + assert(effects.toList == Nil) + + // -- + + assert(s.tryNow() == Success(Form(10))) + assert(s.now() == Form(10)) + assert(s.signal.now() == Form(10)) + + assert(effects.toList == Nil) + + assert(d.tryNow() == Success(10)) + assert(d.now() == 10) + assert(d.signal.now() == 10) + + assert(effects.toList == List( + "zoomIn-10" + )) + effects.clear() + + // -- + + d.writer.onNext(20) + + assert(effects.toList == List( + "zoomOut-Form(20)" + )) + effects.clear() + + assert(s.tryNow() == Success(Form(20))) + assert(s.now() == Form(20)) + assert(s.signal.now() == Form(20)) + + assert(effects.toList == Nil) + + assert(d.tryNow() == Success(20)) + assert(d.now() == 20) + assert(d.signal.now() == 20) + + // #TODO We evaluate zoomIn when accessing d.tryNow(), + // even though we set the value of `d` directly, because + // setting the value on `d` actually sets the value on its + // parent `s`, and we need to derive `d`'s value from the + // value of `s`... even when the value of `s` is coming from + // `d` in the first place. + // This "zoomIn-20" calculation is redundant and thus undesirable, + // but I don't see an elegant way to fix it. zoomPure is + // supposed to be pure, so it's not a big deal I think. + assert(effects.toList == List( + "zoomIn-20" + )) + effects.clear() + + // -- + + d.update(_ + 1) + + assert(effects.toList == List( + "zoomOut-Form(21)" + )) + effects.clear() + + assert(s.now() == Form(21)) + assert(s.signal.now() == Form(21)) + + assert(effects.toList == Nil) + + assert(d.now() == 21) + assert(d.signal.now() == 21) + + assert(effects.toList == List( + "zoomIn-21" // #TODO this one is also undesirable, see TODO comment above. + )) + effects.clear() + + // -- + + d.tryUpdate(currTry => currTry.map(_ + 1)) + + assert(effects.toList == List( + "zoomOut-Form(22)" + )) + effects.clear() + + assert(s.now() == Form(22)) + assert(s.signal.now() == Form(22)) + + assert(effects.toList == Nil) + + assert(d.now() == 22) + assert(d.signal.now() == 22) + + assert(effects.toList == List( + "zoomIn-22" // #TODO this one is also undesirable, see TODO comment above. + )) + effects.clear() + + // -- + + s.writer.onNext(Form(30)) + + assert(s.tryNow() == Success(Form(30))) + assert(s.now() == Form(30)) + assert(s.signal.now() == Form(30)) + + assert(effects.toList == Nil) + + assert(d.tryNow() == Success(30)) + assert(d.now() == 30) + assert(d.signal.now() == 30) + + assert(effects.toList == List( + "zoomIn-30" + )) + effects.clear() + + // -- + + s.update(f => f.copy(int = f.int + 1)) + + assert(s.tryNow() == Success(Form(31))) + assert(s.now() == Form(31)) + assert(s.signal.now() == Form(31)) + + assert(effects.toList == Nil) + + assert(d.tryNow() == Success(31)) + assert(d.now() == 31) + assert(d.signal.now() == 31) + + assert(effects.toList == List( + "zoomIn-31" + )) + effects.clear() + + // -- + + s.tryUpdate(currTry => currTry.map(f => f.copy(int = f.int + 1))) + + assert(s.now() == Form(32)) + assert(s.signal.now() == Form(32)) + + assert(effects.toList == Nil) + + assert(d.now() == 32) + assert(d.signal.now() == 32) + + assert(effects.toList == List( + "zoomIn-32" + )) + effects.clear() + } + + it("laziness, updates and errors") { + + // val varOwner = new TestableOwner + val obsOwner = new TestableOwner + + // val s = Var(Form(10)) + // val d = s.zoomPure(_.int)((f, int) => f.copy(int = int)) + + val effects: mutable.Buffer[String] = mutable.Buffer() + + val s = Var(Form(10)) + val d = s.zoomPure[Int]( + in = form => { + effects += s"zoomIn-${form.int}" + form.int + } + )( + out = (form, int) => { + val newForm = form.copy(int = int) + effects += s"zoomOut-${newForm}" + newForm + } + ) + + val err1 = new Exception("Error: err1") + + assert(s.tryNow() == Success(Form(10))) + assert(s.now() == Form(10)) + assert(s.signal.now() == Form(10)) + + assert(effects.toList == Nil) + + assert(d.tryNow() == Success(10)) + assert(d.now() == 10) + assert(d.signal.now() == 10) + + assert(effects.toList == List( + "zoomIn-10" + )) + effects.clear() + + // -- Errors propagate + + s.setTry(Failure(err1)) + + assert(s.tryNow() == Failure(err1)) + assert(d.tryNow() == Failure(err1)) + + assert(effects.toList == Nil) + + assert(errorEffects.toList == List( + Effect("unhandled", err1) + )) + + errorEffects.clear() + + // -- Can't update a failed var + + d.update(_ + 1) + + assert(s.tryNow() == Failure(err1)) + assert(d.tryNow() == Failure(err1)) + + assert(effects.toList == Nil) + + // Remember, a Var without a listener does emit its errors into "unhandled" + errorEffects shouldBe mutable.Buffer( + Effect("unhandled", VarError("Unable to update a failed Var. Consider Var#tryUpdate instead.", cause = Some(err1))) + ) + errorEffects.clear() + + // -- Restore normality + + s.set(Form(0)) + + assert(s.tryNow() == Success(Form(0))) + + assert(effects.toList == Nil) + + assert(d.tryNow() == Success(0)) + + assert(effects.toList == List( + "zoomIn-0" + )) + effects.clear() + + // -- Can't update the same underlying var twice + + Var.set( + s -> Form(1), + d -> 2 + ) + assert(effects.toList == Nil) + errorEffects shouldBe mutable.Buffer( + Effect("unhandled", VarError("Unable to Var.{set,setTry}: the provided list of vars has duplicates. You can't make an observable emit more than one event per transaction.", cause = None)) + ) + errorEffects.clear() + + // -- Update again + + d.set(1) + + assert(s.tryNow() == Success(Form(1))) + assert(s.now() == Form(1)) + assert(s.signal.now() == Form(1)) + + assert(effects.toList == List( + "zoomOut-Form(1)" + )) + effects.clear() + + assert(d.tryNow() == Success(1)) + assert(d.now() == 1) + assert(d.signal.now() == 1) + + assert(effects.toList == List( + "zoomIn-1" + )) + effects.clear() + + // -- + + val obs = Observer[Int](v => effects += s"obs-$v") + + d.signal.addObserver(obs)(obsOwner) + + assert(effects.toList == List("obs-1")) + effects.clear() + + // -- + + // Derived var is still active because of obsOwner. + // This is consistent with StrictSignal behaviour. + // DerivedVar behaves similarly if its owner is killed. + + d.set(2) + + assert(effects.toList == List( + "zoomOut-Form(2)", + "zoomIn-2", + "obs-2" + )) + effects.clear() + + assert(s.tryNow() == Success(Form(2))) + assert(s.now() == Form(2)) + assert(s.signal.now() == Form(2)) + + assert(d.tryNow() == Success(2)) + assert(d.now() == 2) + assert(d.signal.now() == 2) + + assert(effects.toList == Nil) + + // -- + + obsOwner.killSubscriptions() + + assert(effects.isEmpty) + + // Now the derived var is killed + + // -- + + d.set(34) + + // #TODO This is different from DerivedVar, which throws an error in this case – intentional? + assert(effects.toList == List( + "zoomOut-Form(34)" + )) + effects.clear() + + assert(s.now() == Form(34)) + assert(s.signal.now() == Form(34)) + + assert(effects.isEmpty) + + assert(d.now() == 34) + assert(d.signal.now() == 34) + + assert(effects.toList == List( + "zoomIn-34" + )) + effects.clear() + + assert(effects.isEmpty) + assert(errorEffects.isEmpty) + } + + + // it("signal does not glitch") { + // + // val owner = new TestableOwner + // + // val calculations = mutable.Buffer[Calculation[Int]]() + // val effects = mutable.Buffer[Effect[Int]]() + // + // val s = Var(Form(1)) + // val d = s.zoom(_.int)((f, v) => f.copy(int = v))(owner) + // + // val combinedSignal = d.signal.combineWithFn(s.signal)(_ * 1000 + _.int) // e.g. if s.now() is Form(2), this is 2002 + // + // val sourceObs = Observer[Form](f => effects += Effect("source-obs", f.int)) + // val derivedObs = Observer[Int](effects += Effect("derived-obs", _)) + // val combinedObs = Observer[Int](effects += Effect("combined-obs", _)) + // + // assert(s.tryNow() == Success(Form(1))) + // assert(s.now() == Form(1)) + // assert(s.signal.now() == Form(1)) + // + // assert(d.tryNow() == Success(1)) + // assert(d.now() == 1) + // assert(d.signal.now() == 1) + // + // assert(calculations == mutable.Buffer()) + // + // // -- + // + // s.set(Form(2)) + // + // assert(s.tryNow() == Success(Form(2))) + // assert(s.now() == Form(2)) + // assert(s.signal.now() == Form(2)) + // + // assert(d.tryNow() == Success(2)) + // assert(d.now() == 2) + // assert(d.signal.now() == 2) + // + // assert(calculations == mutable.Buffer()) + // assert(effects == mutable.Buffer()) + // + // // -- + // + // // #Note observers are added to Var signals directly + // s.signal.addObserver(sourceObs)(owner) + // d.signal.addObserver(derivedObs)(owner) + // combinedSignal.addObserver(combinedObs)(owner) + // + // assert(effects.toList == List( + // Effect("source-obs", 2), + // Effect("derived-obs", 2), + // Effect("combined-obs", 2002), + // )) + // + // calculations.clear() + // effects.clear() + // + // // -- + // + // // @TODO Why does the order of calculations (source vs derived) switch here? + // // - Probably something to do with previous calculation being the first one after observers were added? + // // - Our contract doesn't really mandate any order since these are derived MapSignal-s, not + // + // s.set(Form(3)) + // + // assert(effects.toList == List( + // Effect("source-obs", 3), + // Effect("derived-obs", 3), + // Effect("combined-obs", 3003) + // )) + // + // calculations.clear() + // effects.clear() + // + // // -- + // + // d.set(4) + // + // assert(effects.toList == List( + // Effect("source-obs", 4), + // Effect("derived-obs", 4), + // Effect("combined-obs", 4004) + // )) + // + // calculations.clear() + // effects.clear() + // + // // -- + // + // val derivedAdder = d.updater[Int](_ + _) + // + // derivedAdder.onNext(10) + // + // assert(effects.toList == List( + // Effect("source-obs", 14), + // Effect("derived-obs", 14), + // Effect("combined-obs", 14014) + // )) + // } + // + // it("error handling") { + // + // val varOwner = new TestableOwner + // val obsOwner = new TestableOwner + // + // val s = Var(Form(10)) + // val d = s.zoom(_.int)((f, int) => f.copy(int = int))(varOwner) + // + // val sub = d.signal.addObserver(Observer.empty)(obsOwner) + // + // val err1 = new Exception("Error: err1") + // val err2 = new Exception("Error: err2") + // + // assert(s.tryNow() == Success(Form(10))) + // assert(s.now() == Form(10)) + // assert(s.signal.now() == Form(10)) + // + // assert(d.tryNow() == Success(10)) + // assert(d.now() == 10) + // assert(d.signal.now() == 10) + // + // // -- Errors propagate + // + // s.setTry(Failure(err1)) + // + // assert(s.tryNow() == Failure(err1)) + // assert(d.tryNow() == Failure(err1)) + // + // assert(errorEffects.toList == List( + // Effect("unhandled", err1), + // Effect("unhandled", err1) // two observers (one built into the derived var) – two errors + // )) + // + // errorEffects.clear() + // + // // -- Can't update a failed var + // + // d.update(_ + 1) + // + // assert(s.tryNow() == Failure(err1)) + // assert(d.tryNow() == Failure(err1)) + // + // // Remember, a Var without a listener does emit its errors into "unhandled" + // errorEffects shouldBe mutable.Buffer( + // Effect("unhandled", VarError("Unable to update a failed Var. Consider Var#tryUpdate instead.", cause = Some(err1))) + // ) + // errorEffects.clear() + // + // // -- Reset + // + // s.set(Form(20)) + // + // assert(s.tryNow() == Success(Form(20))) + // assert(d.tryNow() == Success(20)) + // + // // -- Set error on the derived var + // + // d.setError(err1) + // + // assert(s.tryNow() == Failure(err1)) + // assert(d.tryNow() == Failure(err1)) + // + // assert(errorEffects.toList == List( + // Effect("unhandled", err1), + // Effect("unhandled", err1) // two observers (one built into the derived var) – two errors + // )) + // + // errorEffects.clear() + // + // // -- Update the error in the parent + // + // s.tryUpdate(_ => Failure(err2)) + // + // assert(s.tryNow() == Failure(err2)) + // assert(d.tryNow() == Failure(err2)) + // + // assert(errorEffects.toList == List( + // Effect("unhandled", err2), + // Effect("unhandled", err2) // two observers (one built into the derived var) – two errors + // )) + // + // errorEffects.clear() + // + // // -- Update the error in the derived var + // + // // #TODO[API] Not sure what the desired behaviour here would be. + // // Would we want to replace err1 with err2 in `s` and `d` vars? but we can't, + // // because zooming out currently requires parent's value. Perhaps we should have + // // a separate channel for zooming out errors? But I'm not sure what exactly the API + // // should look like. + // + // d.tryUpdate(_ => Failure(err1)) + // + // assert(s.tryNow() == Failure(err2)) + // assert(d.tryNow() == Failure(err2)) + // + // assert(errorEffects.toList == List( + // Effect("unhandled", VarError("Unable to zoom out of derived var when the parent var is failed.", Some(err2))) + // )) + // + // errorEffects.clear() + // } + // + // it("asymmetric derived vars") { + // + // val varOwner = new TestableOwner + // + // val s = Var(Form(10)) + // val d1 = s.zoom(_.int)((f, int) => f.copy(int = 100 + int))(varOwner) + // val d2 = s.zoom(_.int + 100)((f, int) => f.copy(int = int))(varOwner) + // + // val err1 = new Exception("Error: err1") + // + // assert(s.tryNow() == Success(Form(10))) + // assert(s.now() == Form(10)) + // assert(s.signal.now() == Form(10)) + // + // assert(d1.tryNow() == Success(10)) + // assert(d1.now() == 10) + // assert(d1.signal.now() == 10) + // + // assert(d2.tryNow() == Success(110)) + // assert(d2.now() == 110) + // assert(d2.signal.now() == 110) + // + // // -- Update parent var + // + // s.set(Form(20)) + // + // assert(s.tryNow() == Success(Form(20))) + // assert(s.now() == Form(20)) + // assert(s.signal.now() == Form(20)) + // + // assert(d1.tryNow() == Success(20)) + // assert(d1.now() == 20) + // assert(d1.signal.now() == 20) + // + // assert(d2.tryNow() == Success(120)) + // assert(d2.now() == 120) + // assert(d2.signal.now() == 120) + // + // // -- Update derived var with misaligned zoomOut + // + // d1.set(30) + // + // assert(s.tryNow() == Success(Form(130))) + // assert(s.now() == Form(130)) + // assert(s.signal.now() == Form(130)) + // + // assert(d1.tryNow() == Success(130)) + // assert(d1.now() == 130) + // assert(d1.signal.now() == 130) + // + // assert(d2.tryNow() == Success(230)) + // assert(d2.now() == 230) + // assert(d2.signal.now() == 230) + // + // // -- Reset + // + // s.set(Form(20)) + // + // assert(s.tryNow() == Success(Form(20))) + // assert(s.now() == Form(20)) + // assert(s.signal.now() == Form(20)) + // + // assert(d1.tryNow() == Success(20)) + // assert(d1.now() == 20) + // assert(d1.signal.now() == 20) + // + // assert(d2.tryNow() == Success(120)) + // assert(d2.now() == 120) + // assert(d2.signal.now() == 120) + // + // // -- Update derived var with misaligned zoomIn + // + // d2.set(40) + // + // assert(s.tryNow() == Success(Form(40))) + // assert(s.now() == Form(40)) + // assert(s.signal.now() == Form(40)) + // + // assert(d1.tryNow() == Success(40)) + // assert(d1.now() == 40) + // assert(d1.signal.now() == 40) + // + // assert(d2.tryNow() == Success(140)) + // assert(d2.now() == 140) + // assert(d2.signal.now() == 140) + // + // } +}