From ed165614c8d3422f9e3765d3b345ac15ed503d92 Mon Sep 17 00:00:00 2001 From: Ilya Surkov Date: Wed, 7 Jun 2023 13:52:35 +0200 Subject: [PATCH] Implement atomic modify function (#210) * Initial modify implementation * Implement tests and allow use in fenced * Remove outdated comments * Modify returns old value release handle, add modification sequencing tests * Renamed things as per PR review * Add helper extension methods and comments. * Further refactoring --------- (cherry picked from commit f43cf2a0218e0f90e834d7663c0209424090ad91) Co-authored-by: Ilya Surkov --- .../scala/com/evolution/scache/Cache.scala | 103 +++++++- .../com/evolution/scache/CacheFenced.scala | 3 + .../com/evolution/scache/CacheMetered.scala | 19 ++ .../com/evolution/scache/CacheMetrics.scala | 30 ++- .../com/evolution/scache/ExpiringCache.scala | 17 +- .../com/evolution/scache/LoadingCache.scala | 164 ++++++++++++ .../evolution/scache/PartitionedCache.scala | 6 + .../com/evolution/scache/CacheSpec.scala | 234 +++++++++++++++++- 8 files changed, 567 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/evolution/scache/Cache.scala b/src/main/scala/com/evolution/scache/Cache.scala index a7fc3b7..5af3235 100644 --- a/src/main/scala/com/evolution/scache/Cache.scala +++ b/src/main/scala/com/evolution/scache/Cache.scala @@ -3,7 +3,8 @@ package com.evolution.scache import cats.effect.{BracketThrow, Concurrent, Resource, Timer} import cats.syntax.all._ import cats.kernel.{CommutativeMonoid, Hash, Monoid} -import cats.{Functor, Monad, MonadThrow, Parallel, ~>} +import cats.{Applicative, Functor, Monad, MonadThrow, Parallel, ~>} +import com.evolution.scache.Cache.Directive import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.{MeasureDuration, Runtime} import com.evolutiongaming.smetrics @@ -219,6 +220,30 @@ trait Cache[F[_], K, V] { */ def put(key: K, value: V, release: Option[Release]): F[F[Option[V]]] + /** Atomically modify a value under specific key. + * + * Allows to make a decision regarding value update based on the present value (or its absence), + * and express it as either `Put`, `Ignore`, or `Remove` directive. + * + * It will try to calculate `f` and apply resulting directive until it succeeds. + * + * In case of `Put` directive, it is guaranteed that the value is written in cache, + * and that it replaced exactly the value passed to `f`. + * + * In case of `Remove` directive, it is guaranteed that the key was removed + * when it contained exactly the value passed to `f`. + * + * @param key + * The key to modify value for. + * @param f + * Function that accepts current value found in cache (or None, if it's absent), and returns + * a directive expressing a desired operation on the value, as well as an arbitrary output value of type `A` + * @return + * Output value returned by `f`, and an optional effect representing an ongoing release of the value + * that was removed from cache as a result of the modification (e.g.: in case of `Put` or `Remove` directives). + */ + def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] + /** Checks if the value for the key is present in the cache. * * @return @@ -347,6 +372,13 @@ trait Cache[F[_], K, V] { object Cache { + sealed trait Directive[+F[_], +V] + object Directive { + final case class Put[F[_], V](value: V, release: Option[F[Unit]]) extends Directive[F, V] + final case object Remove extends Directive[Nothing, Nothing] + final case object Ignore extends Directive[Nothing, Nothing] + } + /** Creates an always-empty implementation of cache. * * The implementation *almost* always returns [[scala.None]] regardess the @@ -378,6 +410,9 @@ object Cache { def put(key: K, value: V, release: Option[F[Unit]]) = none[V].pure[F].pure[F] + def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] = + (f(None)._1, none[F[Unit]]).pure[F] + def contains(key: K) = false.pure[F] def size = 0.pure[F] @@ -693,6 +728,19 @@ object Cache { } } + def modify[A](key: K)(f: Option[V] => (A, Directive[G, V])): G[(A, Option[G[Unit]])] = { + val adaptedF: Option[V] => (A, Directive[F, V]) = f(_) match { + case (a, put: Directive.Put[G, V]) => (a, Directive.Put(put.value, put.release.map(gf(_)))) + case (a, Directive.Ignore) => (a, Directive.Ignore) + case (a, Directive.Remove) => (a, Directive.Remove) + } + fg { + self + .modify(key)(adaptedF) + .map { case (a, release) => (a, release.map(fg(_)))} + } + } + def put(key: K, value: V, release: Release) = { fg { self @@ -930,5 +978,58 @@ object Cache { case None => none[V].pure[F] } } + + /** Like `modify`, but doesn't pass through any return value. + * + * @return + * If this `update` replaced an existing value, + * will return `Some` containing an effect representing release of that value. + */ + def update(key: K)(f: Option[V] => Directive[F, V])(implicit F: Functor[F]): F[Option[F[Unit]]] = + self.modify(key)(() -> f(_)).map(_._2) + + /** Like `modify`, but `f` is only applied if there is a value present in cache, + * and the result is always replacing the old value. + * + * @return + * `true` if value was present, and was subsequently replaced. + * `false` if there was no value present. + */ + def updatePresent(key: K)(f: V => V)(implicit F: Functor[F]): F[Boolean] = + self.modify[Boolean](key) { + case Some(value) => (true, Directive.Put(f(value), None)) + case None => (false, Directive.Ignore) + } map(_._1) + + /** Like `update`, but `f` has an option to return `None`, in which case value will not be changed. + * + * @return + * `true` if value was present and was subsequently replaced. + * `false` if there was no value present, or it was not replaced. + */ + def updatePresentOpt(key: K)(f: V => Option[V])(implicit F: Functor[F]): F[Boolean] = + self.modify[Boolean](key) { + case Some(value) => f(value).fold[(Boolean, Directive[F, V])](false -> Directive.Ignore)(v => true -> Directive.Put(v, None)) + case None => (false, Directive.Ignore) + } map(_._1) + + /** Like `put`, but based on `modify`, and guarantees that as a result of the operation the value was in fact + * written in cache. Will be slower than a regular `put` in situations of high contention. + * + * @return + * If this `putStrict` replaced an existing value, will return `Some` containing the old value + * and an effect representing release of that value. + */ + def putStrict(key: K, value: V)(implicit F: Applicative[F]): F[Option[(V, F[Unit])]] = + self.modify[Option[V]](key)((_, Directive.Put(value, None))).map(_.tupled) + + /** Like `putStrict`, but with `release` part of the new value. + * + * @return + * If this `putStrict` replaced an existing value, will return `Some` containing the old value + * and an effect representing release of that value. + */ + def putStrict(key: K, value: V, release: self.type#Release)(implicit F: Applicative[F]): F[Option[(V, F[Unit])]] = + self.modify[Option[V]](key)((_, Directive.Put(value, release.some))).map(_.tupled) } } diff --git a/src/main/scala/com/evolution/scache/CacheFenced.scala b/src/main/scala/com/evolution/scache/CacheFenced.scala index 950cfaf..c184f05 100644 --- a/src/main/scala/com/evolution/scache/CacheFenced.scala +++ b/src/main/scala/com/evolution/scache/CacheFenced.scala @@ -53,6 +53,9 @@ object CacheFenced { .productR { cache.put(key, value, release) } } + def modify[A](key: K)(f: Option[V] => (A, Cache.Directive[F, V])): F[(A, Option[F[Unit]])] = + fence.flatMap(_ => cache.modify(key)(f)) + def contains(key: K) = cache.contains(key) def size = cache.size diff --git a/src/main/scala/com/evolution/scache/CacheMetered.scala b/src/main/scala/com/evolution/scache/CacheMetered.scala index 68d7c00..ef60cb3 100644 --- a/src/main/scala/com/evolution/scache/CacheMetered.scala +++ b/src/main/scala/com/evolution/scache/CacheMetered.scala @@ -3,6 +3,7 @@ package com.evolution.scache import cats.effect.{Concurrent, Resource, Timer} import cats.kernel.CommutativeMonoid import cats.syntax.all._ +import com.evolution.scache.Cache.Directive import com.evolutiongaming.catshelper.{MeasureDuration, Schedule} import com.evolutiongaming.smetrics @@ -103,6 +104,24 @@ object CacheMetered { } yield value } + def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] = + for { + duration <- MeasureDuration[F].start + ((a, entryExisted, directive), release) <- cache.modify(key) { entry => + f(entry) match { + case (a, put: Directive.Put[F, V]) => + ((a, entry.nonEmpty, CacheMetrics.Directive.Put), + Directive.Put(put.value, releaseMetered(duration, put.release.getOrElse(().pure[F])).some)) + case (a, Directive.Ignore) => + ((a, entry.nonEmpty, CacheMetrics.Directive.Ignore), Directive.Ignore) + case (a, Directive.Remove) => + ((a, entry.nonEmpty, CacheMetrics.Directive.Remove), Directive.Remove) + } + } + _ <- metrics.modify(entryExisted, directive) + } yield (a, release) + + def contains(key: K) = cache.contains(key) def size = { diff --git a/src/main/scala/com/evolution/scache/CacheMetrics.scala b/src/main/scala/com/evolution/scache/CacheMetrics.scala index c31219a..0007d1f 100644 --- a/src/main/scala/com/evolution/scache/CacheMetrics.scala +++ b/src/main/scala/com/evolution/scache/CacheMetrics.scala @@ -3,6 +3,7 @@ package com.evolution.scache import cats.{Applicative, Monad} import cats.effect.Resource import cats.syntax.all._ +import com.evolution.scache.CacheMetrics.Directive import com.evolutiongaming.smetrics.MetricsHelper._ import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantile, Quantiles} @@ -18,6 +19,8 @@ trait CacheMetrics[F[_]] { def put: F[Unit] + def modify(entryExisted: Boolean, directive: Directive): F[Unit] + def size(size: Int): F[Unit] def size(latency: FiniteDuration): F[Unit] @@ -46,6 +49,8 @@ object CacheMetrics { val put = unit + def modify(entryExisted: Boolean, directive: Directive): F[Unit] = unit + def size(size: Int) = unit def size(latency: FiniteDuration) = unit @@ -59,6 +64,18 @@ object CacheMetrics { def foldMap(latency: FiniteDuration) = unit } + sealed trait Directive { + override def toString: Prefix = this match { + case Directive.Put => "put" + case Directive.Ignore => "ignore" + case Directive.Remove => "remove" + } + } + object Directive { + final case object Put extends Directive + final case object Ignore extends Directive + final case object Remove extends Directive + } type Name = String @@ -84,6 +101,12 @@ object CacheMetrics { help = "Put", labels = LabelNames("name")) + val modifyCounter = collectorRegistry.counter( + name = s"${ prefix }_modify", + help = "Modify, labeled by modification input (entry was present or not), and output (put, keep, or remove)", + labels = LabelNames("existing_entry", "result") + ) + val loadResultCounter = collectorRegistry.counter( name = s"${ prefix }_load_result", help = "Load result: success or failure", @@ -119,6 +142,7 @@ object CacheMetrics { for { getsCounter <- getCounter putCounter <- putCounter + modifyCounter <- modifyCounter loadResultCounter <- loadResultCounter loadTimeSummary <- loadTimeSummary lifeTimeSummary <- lifeTimeSummary @@ -158,7 +182,7 @@ object CacheMetrics { def get(hit: Boolean) = { val counter = if (hit) hitCounter else missCounter counter.inc() - } + } def load(time: FiniteDuration, success: Boolean) = { val resultCounter = if (success) successCounter else failureCounter @@ -175,6 +199,10 @@ object CacheMetrics { val put = putCounter1.inc() + def modify(entryExisted: Boolean, directive: Directive): F[Unit] = { + modifyCounter.labels(entryExisted.toString, directive.toString).inc() + } + def size(size: Int) = { sizeGauge.labels(name).set(size.toDouble) } diff --git a/src/main/scala/com/evolution/scache/ExpiringCache.scala b/src/main/scala/com/evolution/scache/ExpiringCache.scala index d7823c0..d9e31da 100644 --- a/src/main/scala/com/evolution/scache/ExpiringCache.scala +++ b/src/main/scala/com/evolution/scache/ExpiringCache.scala @@ -5,6 +5,7 @@ import cats.effect.{Clock, Concurrent, Resource, Timer} import cats.kernel.CommutativeMonoid import cats.syntax.all._ import cats.{Applicative, Monad, MonadThrow, Monoid, Parallel} +import com.evolution.scache.Cache.Directive import com.evolution.scache.LoadingCache.EntryState import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.ClockHelper._ @@ -19,7 +20,7 @@ object ExpiringCache { private[scache] def of[F[_]: Concurrent: Timer: Parallel, K, V]( config: Config[F, K, V] ): Resource[F, Cache[F, K, V]] = { - + type E = Entry[V] val cooldown = config.expireAfterRead.toMillis / 5 @@ -270,6 +271,20 @@ object ExpiringCache { } } + // Modifying existing entry creates a new one, since the old one will be released. + def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] = + Clock[F] + .millis + .flatMap { timestamp => + val adaptedF: Option[Entry[V]] => (A, Directive[F, Entry[V]]) = entry => f(entry.map(_.value)) match { + case (a, put: Directive.Put[F, V]) => + (a, Directive.Put(Entry(put.value, timestamp, none), put.release)) + case (a, Directive.Ignore) => (a, Directive.Ignore) + case (a, Directive.Remove) => (a, Directive.Remove) + } + cache.modify(key)(adaptedF) + } + def contains(key: K) = cache.contains(key) def size = cache.size diff --git a/src/main/scala/com/evolution/scache/LoadingCache.scala b/src/main/scala/com/evolution/scache/LoadingCache.scala index 8727beb..e44a86b 100644 --- a/src/main/scala/com/evolution/scache/LoadingCache.scala +++ b/src/main/scala/com/evolution/scache/LoadingCache.scala @@ -6,6 +6,7 @@ import cats.effect.syntax.all._ import cats.effect.{Concurrent, Fiber, Resource} import cats.kernel.CommutativeMonoid import cats.syntax.all._ +import com.evolution.scache.Cache.Directive import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.ParallelHelper._ @@ -483,6 +484,169 @@ private[scache] object LoadingCache { } } + override def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] = { + 0.tailRecM { counter => + ref + .access + .flatMap { case (entryRefs, setMap) => + entryRefs + .get(key) + .fold { + f(None) match { + // No entry present in the map, and we want to add a new one + case (a, put: Directive.Put[F, V]) => + Ref[F] + .of[EntryState[F, V]](EntryState.Value(entryOf(put.value, put.release))) + .flatMap { entryRef => + setMap(entryRefs.updated(key, entryRef)).map { + case true => + (a, none[F[Unit]]) + .asRight[Int] + // Failed adding new entry to the map, retrying accessing the map + case false => + (counter + 1) + .asLeft[(A, Option[F[Unit]])] + } + } + // No entry present in the map, and we don't want to have any, so exiting + case (a, Directive.Ignore | Directive.Remove) => + (a, none[F[Unit]]) + .asRight[Int] + .pure[F] + } + } { entryRef => + 0.tailRecM { counter1 => + entryRef + .access + .flatMap { + // A value is already present in the map + case (state: EntryState.Value[F, V], setRef) => + f(state.entry.value.some) match { + case (a, put: Directive.Put[F, V]) => + setRef(EntryState.Value(entryOf(put.value, put.release))) + .flatMap { + // Successfully replaced the entryRef with our value, + // now we are responsible for releasing the old value. + case true => + state + .entry + .release + .traverse { _.start } + .map { release => + (a, release.map(_.join)) + .asRight[Int] + .asRight[Int] + } + // Failed updating entryRef, retrying + case false => + (counter1 + 1) + .asLeft[Either[Int, (A, Option[F[Unit]])]] + .pure[F] + } + // Keeping the value intact and exiting + case (a, Directive.Ignore) => + (a, none[F[Unit]]) + .asRight[Int] + .asRight[Int] + .pure[F] + // Removing the value + case (a, Directive.Remove) => + setRef(EntryState.Removed) + .flatMap { + // Successfully set the entryRef to `Removed` state, now removing it from the map. + // Only removing the key if it still contains this entry, otherwise noop. + case true => + ref + .update { entryRefs => + entryRefs.get(key) match { + case Some(`entryRef`) => entryRefs - key + case _ => entryRefs + } + } + .flatMap { _ => + // Releasing the value regardless of the map update result. + state + .entry + .release + .traverse { _.start } + .map { release => + (a, release.map(_.join)) + .asRight[Int] + .asRight[Int] + } + } + // Failed updating entryRef, retrying + case false => + (counter1 + 1) + .asLeft[Either[Int, (A, Option[F[Unit]])]] + .pure[F] + } + } + + // Entry in the map is still loading + case (state: EntryState.Loading[F, V], setRef) => + f(None) match { + // Trying to replace it with our value + case (a, put: Directive.Put[F, V]) => + val entry = entryOf(put.value, put.release) + state + .deferred + .complete1(entry.asRight) + .flatMap { + // We successfully completed the deferred, now trying to set the value. + case true => + setRef(EntryState.Value(entry)).map { + // We successfully replaced the entry with our value, so we are done. + case true => + (a, none[F[Unit]]) + .asRight[Int] + .asRight[Int] + // Another fiber placed their new value (only Removed should be possible) + // before us so we retry accessing the entry. + case false => + (counter1 + 1) + .asLeft[Either[Int, (A, Option[F[Unit]])]] + } + // Failed to complete the deferred, meaning someone else completed it, and will + // now set the new value in the entryRef. Retrying the lookup. + case false => + (counter1 + 1) + .asLeft[Either[Int, (A, Option[F[Unit]])]] + .pure[F] + } + // Noop decision, exiting + case (a, Directive.Ignore | Directive.Remove) => + (a, none[F[Unit]]) + .asRight[Int] + .asRight[Int] + .pure[F] + } + + // Entry was just removed, it soon will be gone from the map. + case (EntryState.Removed, _) => + f(None) match { + // We want to place the new value; + // Retrying the map lookup, expecting a different result for our key. + case (_, _: Directive.Put[F, V]) => + (counter + 1) + .asLeft[(A, Option[F[Unit]])] + .asRight[Int] + .pure[F] + // Noop decision, exiting + case (a, Directive.Ignore | Directive.Remove) => + (a, none[F[Unit]]) + .asRight[Int] + .asRight[Int] + .pure[F] + } + } + .uncancelable + } + } + } + } + } + def contains(key: K) = { ref .get diff --git a/src/main/scala/com/evolution/scache/PartitionedCache.scala b/src/main/scala/com/evolution/scache/PartitionedCache.scala index bd093cc..169ba6b 100644 --- a/src/main/scala/com/evolution/scache/PartitionedCache.scala +++ b/src/main/scala/com/evolution/scache/PartitionedCache.scala @@ -47,6 +47,12 @@ object PartitionedCache { .put(key, value, release) } + def modify[A](key: K)(f: Option[V] => (A, Cache.Directive[F, V])): F[(A, Option[F[Unit]])] = { + partitions + .get(key) + .modify(key)(f) + } + def contains(key: K) = { partitions .get(key) diff --git a/src/test/scala/com/evolution/scache/CacheSpec.scala b/src/test/scala/com/evolution/scache/CacheSpec.scala index 6d5bdbb..9fb227e 100644 --- a/src/test/scala/com/evolution/scache/CacheSpec.scala +++ b/src/test/scala/com/evolution/scache/CacheSpec.scala @@ -5,6 +5,7 @@ import cats.effect.concurrent.{Deferred, Ref} import cats.effect.{Concurrent, Fiber, IO, Resource} import cats.effect.implicits._ import cats.syntax.all._ +import com.evolution.scache.Cache.Directive import com.evolution.scache.IOSuite._ import com.evolutiongaming.catshelper.CatsHelper._ import org.scalatest.Assertion @@ -1212,10 +1213,18 @@ class CacheSpec extends AsyncFunSuite with Matchers { } yield {} } - check(s"each release performed exactly once during `getOrUpdate1`, `put` and `remove` race: $name") { (cache, _) => + check(s"each release performed exactly once during " + + s"`getOrUpdate1`, `put`, `modify` and `remove` race: $name") { (cache, _) => + + def modify(releaseCounter: Ref[IO, Int]): Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Put(i, releaseCounter.update(_ + 1).some) + case None => -2 -> Directive.Put(-2, releaseCounter.update(_ + 1).some) + } + for { resultRef1 <- Ref[IO].of(0) resultRef2 <- Ref[IO].of(0) + resultRef3 <- Ref[IO].of(0) n = 100000 range = (1 to n).toList @@ -1228,27 +1237,40 @@ class CacheSpec extends AsyncFunSuite with Matchers { // so we increment on release and check that the final value is equal to the sum of the range. f2 <- range.parTraverse(i => cache.put(0, 0, resultRef2.update(_ + i))).start - f3 <- cache.remove(0).replicateA(n).start + f3 <- range.parTraverse(_ => cache.modify(0)(modify(resultRef3))).start + + f4 <- cache.remove(0).replicateA(n).start expectedResult = range.sum _ <- f1.join.void _ <- f2.join.flatMap(_.sequence) - _ <- f3.join.flatMap(_.sequence) + _ <- f3.join.flatMap(_.flatMap(_._2).sequence_) + _ <- f4.join.flatMap(_.sequence) _ <- cache.clear.flatten result1 <- resultRef1.get result2 <- resultRef2.get + result3 <- resultRef3.get _ <- IO { result1 shouldEqual 0 } _ <- IO { result2 shouldEqual expectedResult } + _ <- IO { result3 shouldEqual n } } yield {} } - check(s"failing loads don't interfere with releases during `getOrUpdate1`, `put` and `remove` race: $name") { (cache, _) => + check(s"failing loads don't interfere with releases during " + + s"`getOrUpdate1`, `put`, `modify` and `remove` race: $name") { (cache, _) => + + def modify(releaseCounter: Ref[IO, Int]): Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Put(i, releaseCounter.update(_ + 1).some) + case None => -2 -> Directive.Put(-2, releaseCounter.update(_ + 1).some) + } + for { resultRef1 <- Ref[IO].of(0) resultRef2 <- Ref[IO].of(0) resultRef3 <- Ref[IO].of(0) + resultRef4 <- Ref[IO].of(0) n = 100000 range = (1 to n).toList @@ -1269,24 +1291,220 @@ class CacheSpec extends AsyncFunSuite with Matchers { // so we increment on release and check that the final value is equal to the sum of the range. f3 <- range.parTraverse(i => cache.put(0, 0, resultRef3.update(_ + i))).start - f4 <- cache.remove(0).parReplicateA(n).start + f4 <- range.parTraverse(_ => cache.modify(0)(modify(resultRef4))).start + + f5 <- cache.remove(0).parReplicateA(n).start expectedResult = range.sum _ <- f1.join.void _ <- f2.join.void _ <- f3.join.flatMap(_.sequence) - _ <- f4.join.flatMap(_.sequence) + _ <- f4.join.flatMap(_.flatMap(_._2).sequence_) + _ <- f5.join.flatMap(_.sequence) _ <- cache.clear.flatten result1 <- resultRef1.get result2 <- resultRef2.get result3 <- resultRef3.get + result4 <- resultRef4.get _ <- IO { result1 shouldEqual 0 } _ <- IO { result2 shouldEqual 0 } _ <- IO { result3 shouldEqual expectedResult } + _ <- IO { result4 shouldEqual n } + } yield () + } + + check(s"modify modifies existing entry: $name") { (cache, metrics) => + val modify: Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Put(i + 1, None) + case None => -1 -> Directive.Ignore + } + for { + (a, release1) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual -1 } + _ <- cache.put(0, 1) + (a, release2) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual 1 } + value <- cache.get(0) + _ <- IO { value shouldBe 2.some } + release3 <- cache.remove(0) + + _ <- List(release1, release2, release3.void.some).flatten.sequence_ + + _ <- metrics.expect( + metrics.expectedPut -> 1, + metrics.expectedModify(entryExisted = false, CacheMetrics.Directive.Ignore) -> 1, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Put) -> 1, + metrics.expectedGet(true) -> 1, + metrics.expectedLife -> 2, + ) + } yield () + } + + check(s"modify keeps existing entry: $name") { (cache, metrics) => + val modify: Option[Int] => (Int, Directive[IO, Int]) = i => i.getOrElse(-1) -> Directive.Ignore + for { + (a, release1) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual -1 } + _ <- cache.put(0, 1) + (a, release2) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual 1 } + value <- cache.get(0) + _ <- IO { value shouldBe 1.some } + _ <- List(release1, release2).flatten.sequence_ + _ <- metrics.expect( + metrics.expectedPut -> 1, + metrics.expectedModify(entryExisted = false, CacheMetrics.Directive.Ignore) -> 1, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Ignore) -> 1, + metrics.expectedGet(true) -> 1, + ) + } yield () + } + + check(s"modify removes existing entry: $name") { (cache, metrics) => + val modify: Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Remove + case None => -1 -> Directive.Ignore + } + for { + (a, release1) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual -1 } + _ <- cache.put(0, 1) + (a, release2) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual 1 } + value <- cache.get(0) + _ <- IO { value shouldBe None } + _ <- List(release1, release2).flatten.sequence_ + _ <- metrics.expect( + metrics.expectedPut -> 1, + metrics.expectedModify(entryExisted = false, CacheMetrics.Directive.Ignore) -> 1, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Remove) -> 1, + metrics.expectedGet(false) -> 1, + metrics.expectedLife -> 1, + ) + } yield () + } + + check(s"modify adds entry when absent: $name") { (cache, metrics) => + val modify: Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Ignore + case None => 1 -> Directive.Put(1, None) + } + for { + (a, release1) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual 1 } + (a, release2) <- cache.modify(0)(modify) + _ <- IO { a shouldEqual 1 } + value <- cache.get(0) + _ <- IO { value shouldBe Some(1) } + _ <- List(release1, release2).flatten.sequence_ + _ <- metrics.expect( + metrics.expectedModify(entryExisted = false, CacheMetrics.Directive.Put) -> 1, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Ignore) -> 1, + metrics.expectedGet(true) -> 1, + ) } yield () } + + check(s"modify guarantees updated value write concurrently accessing single key: $name") { + (cache, metrics) => + def modify(releaseCounter: Ref[IO, Int]): Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Put(i + 1, releaseCounter.update(_ + i + 1).some) + case None => 0 -> Directive.Put(1, releaseCounter.update(_ + 1).some) + } + for { + releaseCounter <- Ref[IO].of(0) + n = 100000 + range = (1 to n).toList + + f1 <- range.parTraverse(_ => cache.modify(0)(modify(releaseCounter))).start + + expectedResult = range.sum + + results <- f1.join + _ <- IO { results.map(_._1).sum shouldEqual (expectedResult - n) } + + // Waiting for releases + _ <- results.flatMap(_._2).sequence_ + + lastWrittenValue <- cache.get(0) + (lastValueRemoved, lastRelease) <- cache.modify(0)(lastValue => (lastValue, Directive.Remove)) + _ <- lastRelease.sequence_ + releasedValuesSum <- releaseCounter.get + + _ <- IO { releasedValuesSum shouldEqual expectedResult } + _ <- IO { lastWrittenValue shouldBe n.some } + _ <- IO { lastValueRemoved shouldBe n.some } + + _ <- metrics.expect( + metrics.expectedModify(entryExisted = false, CacheMetrics.Directive.Put) -> 1, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Put) -> (n - 1), + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Remove) -> 1, + metrics.expectedGet(true) -> 1, + metrics.expectedLife -> n, + ) + } yield () + } + + check(s"modify guarantees updated value write concurrently accessing multiple keys: $name") { + (cache, metrics) => + def modify(releaseCounter: Ref[IO, Int]): Option[Int] => (Int, Directive[IO, Int]) = { + case Some(i) => i -> Directive.Put(i + 1, releaseCounter.update(_ + i + 1).some) + case None => 0 -> Directive.Put(1, releaseCounter.update(_ + 1).some) + } + for { + releaseCounter <- Ref[IO].of(0) + n = 100000 + range = (1 to n).toList + + f0 <- range.parTraverse(_ => cache.modify(0)(modify(releaseCounter))).start + f1 <- range.parTraverse(_ => cache.modify(1)(modify(releaseCounter))).start + f2 <- range.parTraverse(_ => cache.modify(2)(modify(releaseCounter))).start + f3 <- range.parTraverse(_ => cache.modify(3)(modify(releaseCounter))).start + + expectedResult = range.sum + + results <- List(f0, f1, f2, f3).flatTraverse(_.join) + _ <- IO { results.map(_._1).sum shouldEqual (expectedResult - n) * 4 } + + // Waiting for releases + _ <- results.flatMap(_._2).sequence_ + + lastWrittenValue0 <- cache.get(0) + lastWrittenValue1 <- cache.get(1) + lastWrittenValue2 <- cache.get(2) + lastWrittenValue3 <- cache.get(3) + + (lastValueRemoved0, lastRelease0) <- cache.modify(0)(lastValue => (lastValue, Directive.Remove)) + (lastValueRemoved1, lastRelease1) <- cache.modify(1)(lastValue => (lastValue, Directive.Remove)) + (lastValueRemoved2, lastRelease2) <- cache.modify(2)(lastValue => (lastValue, Directive.Remove)) + (lastValueRemoved3, lastRelease3) <- cache.modify(3)(lastValue => (lastValue, Directive.Remove)) + _ <- List(lastRelease0, lastRelease1, lastRelease2, lastRelease2).flatten.sequence_ + + releasedValuesSum <- releaseCounter.get + + _ <- IO { releasedValuesSum shouldEqual expectedResult * 4 } + + _ <- IO { lastWrittenValue0 shouldBe n.some } + _ <- IO { lastWrittenValue1 shouldBe n.some } + _ <- IO { lastWrittenValue2 shouldBe n.some } + _ <- IO { lastWrittenValue3 shouldBe n.some } + + _ <- IO { lastValueRemoved0 shouldBe n.some } + _ <- IO { lastValueRemoved1 shouldBe n.some } + _ <- IO { lastValueRemoved2 shouldBe n.some } + _ <- IO { lastValueRemoved3 shouldBe n.some } + + _ <- metrics.expect( + metrics.expectedModify(entryExisted = false, CacheMetrics.Directive.Put) -> 4, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Put) -> (n - 1) * 4, + metrics.expectedModify(entryExisted = true, CacheMetrics.Directive.Remove) -> 4, + metrics.expectedGet(true) -> 4, + metrics.expectedLife -> n * 4, + ) + } yield () + } } } @@ -1306,6 +1524,8 @@ object CacheSpec { def expectedLoad(success: Boolean): String = s"load(time=..., success=$success)" val expectedLife: String = "life(time=...)" val expectedPut: String = "put" + def expectedModify(entryExisted: Boolean, directive: CacheMetrics.Directive): String = + s"modify(existed=$entryExisted, directive=$directive" def expectedSize(size: Int): String = s"size(size=$size)" val expectedSize: String = "size(latency=...)" val expectedValues: String = "values(latency=...)" @@ -1317,6 +1537,8 @@ object CacheSpec { def load(time: FiniteDuration, success: Boolean): IO[Unit] = inc(expectedLoad(success)) def life(time: FiniteDuration): IO[Unit] = inc(expectedLife) def put: IO[Unit] = inc(expectedPut) + def modify(entryExisted: Boolean, directive: CacheMetrics.Directive): IO[Unit] = + inc(expectedModify(entryExisted, directive)) def size(size: Int): IO[Unit] = inc(expectedSize(size)) def size(latency: FiniteDuration): IO[Unit] = inc(expectedSize) def values(latency: FiniteDuration): IO[Unit] = inc(expectedValues)