Skip to content

Commit

Permalink
Implement atomic modify function (#204)
Browse files Browse the repository at this point in the history
* Initial modify implementation

* Implement tests and allow use in fenced

* Remove outdated comments

* Modify returns old value release handle, add modification sequencing tests

* Renamed things as per PR review

* Add helper extension methods and comments.

* Further refactoring

---------

Co-authored-by: Ilya Surkov <[email protected]>

(cherry picked from commit f43cf2a)

Co-authored-by: Ilya Surkov <[email protected]>
  • Loading branch information
i-surkov and Ilya Surkov authored Jun 1, 2023
1 parent 423ac9b commit 6b34a41
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 9 deletions.
103 changes: 102 additions & 1 deletion src/main/scala/com/evolution/scache/Cache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package com.evolution.scache
import cats.effect.{BracketThrow, Concurrent, Resource, Timer}
import cats.syntax.all._
import cats.kernel.{CommutativeMonoid, Hash, Monoid}
import cats.{Functor, Monad, MonadThrow, Parallel, ~>}
import cats.{Applicative, Functor, Monad, MonadThrow, Parallel, ~>}
import com.evolution.scache.Cache.Directive
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{MeasureDuration, Runtime}

Expand Down Expand Up @@ -218,6 +219,30 @@ trait Cache[F[_], K, V] {
*/
def put(key: K, value: V, release: Option[Release]): F[F[Option[V]]]

/** Atomically modify a value under specific key.
*
* Allows to make a decision regarding value update based on the present value (or its absence),
* and express it as either `Put`, `Ignore`, or `Remove` directive.
*
* It will try to calculate `f` and apply resulting directive until it succeeds.
*
* In case of `Put` directive, it is guaranteed that the value is written in cache,
* and that it replaced exactly the value passed to `f`.
*
* In case of `Remove` directive, it is guaranteed that the key was removed
* when it contained exactly the value passed to `f`.
*
* @param key
* The key to modify value for.
* @param f
* Function that accepts current value found in cache (or None, if it's absent), and returns
* a directive expressing a desired operation on the value, as well as an arbitrary output value of type `A`
* @return
* Output value returned by `f`, and an optional effect representing an ongoing release of the value
* that was removed from cache as a result of the modification (e.g.: in case of `Put` or `Remove` directives).
*/
def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])]

/** Checks if the value for the key is present in the cache.
*
* @return
Expand Down Expand Up @@ -346,6 +371,13 @@ trait Cache[F[_], K, V] {

object Cache {

sealed trait Directive[+F[_], +V]
object Directive {
final case class Put[F[_], V](value: V, release: Option[F[Unit]]) extends Directive[F, V]
final case object Remove extends Directive[Nothing, Nothing]
final case object Ignore extends Directive[Nothing, Nothing]
}

/** Creates an always-empty implementation of cache.
*
* The implementation *almost* always returns [[scala.None]] regardess the
Expand Down Expand Up @@ -377,6 +409,9 @@ object Cache {

def put(key: K, value: V, release: Option[F[Unit]]) = none[V].pure[F].pure[F]

def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] =
(f(None)._1, none[F[Unit]]).pure[F]

def contains(key: K) = false.pure[F]

def size = 0.pure[F]
Expand Down Expand Up @@ -681,6 +716,19 @@ object Cache {
}
}

def modify[A](key: K)(f: Option[V] => (A, Directive[G, V])): G[(A, Option[G[Unit]])] = {
val adaptedF: Option[V] => (A, Directive[F, V]) = f(_) match {
case (a, put: Directive.Put[G, V]) => (a, Directive.Put(put.value, put.release.map(gf(_))))
case (a, Directive.Ignore) => (a, Directive.Ignore)
case (a, Directive.Remove) => (a, Directive.Remove)
}
fg {
self
.modify(key)(adaptedF)
.map { case (a, release) => (a, release.map(fg(_)))}
}
}

def put(key: K, value: V, release: Release) = {
fg {
self
Expand Down Expand Up @@ -918,5 +966,58 @@ object Cache {
case None => none[V].pure[F]
}
}

/** Like `modify`, but doesn't pass through any return value.
*
* @return
* If this `update` replaced an existing value,
* will return `Some` containing an effect representing release of that value.
*/
def update(key: K)(f: Option[V] => Directive[F, V])(implicit F: Functor[F]): F[Option[F[Unit]]] =
self.modify(key)(() -> f(_)).map(_._2)

/** Like `modify`, but `f` is only applied if there is a value present in cache,
* and the result is always replacing the old value.
*
* @return
* `true` if value was present, and was subsequently replaced.
* `false` if there was no value present.
*/
def updatePresent(key: K)(f: V => V)(implicit F: Functor[F]): F[Boolean] =
self.modify[Boolean](key) {
case Some(value) => (true, Directive.Put(f(value), None))
case None => (false, Directive.Ignore)
} map(_._1)

/** Like `update`, but `f` has an option to return `None`, in which case value will not be changed.
*
* @return
* `true` if value was present and was subsequently replaced.
* `false` if there was no value present, or it was not replaced.
*/
def updatePresentOpt(key: K)(f: V => Option[V])(implicit F: Functor[F]): F[Boolean] =
self.modify[Boolean](key) {
case Some(value) => f(value).fold[(Boolean, Directive[F, V])](false -> Directive.Ignore)(v => true -> Directive.Put(v, None))
case None => (false, Directive.Ignore)
} map(_._1)

/** Like `put`, but based on `modify`, and guarantees that as a result of the operation the value was in fact
* written in cache. Will be slower than a regular `put` in situations of high contention.
*
* @return
* If this `putStrict` replaced an existing value, will return `Some` containing the old value
* and an effect representing release of that value.
*/
def putStrict(key: K, value: V)(implicit F: Applicative[F]): F[Option[(V, F[Unit])]] =
self.modify[Option[V]](key)((_, Directive.Put(value, None))).map(_.tupled)

/** Like `putStrict`, but with `release` part of the new value.
*
* @return
* If this `putStrict` replaced an existing value, will return `Some` containing the old value
* and an effect representing release of that value.
*/
def putStrict(key: K, value: V, release: self.type#Release)(implicit F: Applicative[F]): F[Option[(V, F[Unit])]] =
self.modify[Option[V]](key)((_, Directive.Put(value, release.some))).map(_.tupled)
}
}
3 changes: 3 additions & 0 deletions src/main/scala/com/evolution/scache/CacheFenced.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ object CacheFenced {
.productR { cache.put(key, value, release) }
}

def modify[A](key: K)(f: Option[V] => (A, Cache.Directive[F, V])): F[(A, Option[F[Unit]])] =
fence.flatMap(_ => cache.modify(key)(f))

def contains(key: K) = cache.contains(key)

def size = cache.size
Expand Down
19 changes: 19 additions & 0 deletions src/main/scala/com/evolution/scache/CacheMetered.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.evolution.scache
import cats.effect.{Concurrent, Resource, Timer}
import cats.kernel.CommutativeMonoid
import cats.syntax.all._
import com.evolution.scache.Cache.Directive
import com.evolutiongaming.catshelper.{MeasureDuration, Schedule}

import scala.concurrent.duration._
Expand Down Expand Up @@ -92,6 +93,24 @@ object CacheMetered {
} yield value
}

def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] =
for {
duration <- MeasureDuration[F].start
((a, entryExisted, directive), release) <- cache.modify(key) { entry =>
f(entry) match {
case (a, put: Directive.Put[F, V]) =>
((a, entry.nonEmpty, CacheMetrics.Directive.Put),
Directive.Put(put.value, releaseMetered(duration, put.release.getOrElse(().pure[F])).some))
case (a, Directive.Ignore) =>
((a, entry.nonEmpty, CacheMetrics.Directive.Ignore), Directive.Ignore)
case (a, Directive.Remove) =>
((a, entry.nonEmpty, CacheMetrics.Directive.Remove), Directive.Remove)
}
}
_ <- metrics.modify(entryExisted, directive)
} yield (a, release)


def contains(key: K) = cache.contains(key)

def size = {
Expand Down
30 changes: 29 additions & 1 deletion src/main/scala/com/evolution/scache/CacheMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.evolution.scache
import cats.{Applicative, Monad}
import cats.effect.Resource
import cats.syntax.all._
import com.evolution.scache.CacheMetrics.Directive
import com.evolutiongaming.smetrics.MetricsHelper._
import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantile, Quantiles}

Expand All @@ -18,6 +19,8 @@ trait CacheMetrics[F[_]] {

def put: F[Unit]

def modify(entryExisted: Boolean, directive: Directive): F[Unit]

def size(size: Int): F[Unit]

def size(latency: FiniteDuration): F[Unit]
Expand Down Expand Up @@ -46,6 +49,8 @@ object CacheMetrics {

val put = unit

def modify(entryExisted: Boolean, directive: Directive): F[Unit] = unit

def size(size: Int) = unit

def size(latency: FiniteDuration) = unit
Expand All @@ -59,6 +64,18 @@ object CacheMetrics {
def foldMap(latency: FiniteDuration) = unit
}

sealed trait Directive {
override def toString: Prefix = this match {
case Directive.Put => "put"
case Directive.Ignore => "ignore"
case Directive.Remove => "remove"
}
}
object Directive {
final case object Put extends Directive
final case object Ignore extends Directive
final case object Remove extends Directive
}

type Name = String

Expand All @@ -84,6 +101,12 @@ object CacheMetrics {
help = "Put",
labels = LabelNames("name"))

val modifyCounter = collectorRegistry.counter(
name = s"${ prefix }_modify",
help = "Modify, labeled by modification input (entry was present or not), and output (put, keep, or remove)",
labels = LabelNames("existing_entry", "result")
)

val loadResultCounter = collectorRegistry.counter(
name = s"${ prefix }_load_result",
help = "Load result: success or failure",
Expand Down Expand Up @@ -119,6 +142,7 @@ object CacheMetrics {
for {
getsCounter <- getCounter
putCounter <- putCounter
modifyCounter <- modifyCounter
loadResultCounter <- loadResultCounter
loadTimeSummary <- loadTimeSummary
lifeTimeSummary <- lifeTimeSummary
Expand Down Expand Up @@ -158,7 +182,7 @@ object CacheMetrics {
def get(hit: Boolean) = {
val counter = if (hit) hitCounter else missCounter
counter.inc()
}
}

def load(time: FiniteDuration, success: Boolean) = {
val resultCounter = if (success) successCounter else failureCounter
Expand All @@ -175,6 +199,10 @@ object CacheMetrics {

val put = putCounter1.inc()

def modify(entryExisted: Boolean, directive: Directive): F[Unit] = {
modifyCounter.labels(entryExisted.toString, directive.toString).inc()
}

def size(size: Int) = {
sizeGauge.labels(name).set(size.toDouble)
}
Expand Down
17 changes: 16 additions & 1 deletion src/main/scala/com/evolution/scache/ExpiringCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.effect.{Clock, Concurrent, Resource, Timer}
import cats.kernel.CommutativeMonoid
import cats.syntax.all._
import cats.{Applicative, Monad, MonadThrow, Monoid, Parallel}
import com.evolution.scache.Cache.Directive
import com.evolution.scache.LoadingCache.EntryState
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.ClockHelper._
Expand All @@ -19,7 +20,7 @@ object ExpiringCache {
private[scache] def of[F[_]: Concurrent: Timer: Parallel, K, V](
config: Config[F, K, V]
): Resource[F, Cache[F, K, V]] = {

type E = Entry[V]

val cooldown = config.expireAfterRead.toMillis / 5
Expand Down Expand Up @@ -270,6 +271,20 @@ object ExpiringCache {
}
}

// Modifying existing entry creates a new one, since the old one will be released.
def modify[A](key: K)(f: Option[V] => (A, Directive[F, V])): F[(A, Option[F[Unit]])] =
Clock[F]
.millis
.flatMap { timestamp =>
val adaptedF: Option[Entry[V]] => (A, Directive[F, Entry[V]]) = entry => f(entry.map(_.value)) match {
case (a, put: Directive.Put[F, V]) =>
(a, Directive.Put(Entry(put.value, timestamp, none), put.release))
case (a, Directive.Ignore) => (a, Directive.Ignore)
case (a, Directive.Remove) => (a, Directive.Remove)
}
cache.modify(key)(adaptedF)
}

def contains(key: K) = cache.contains(key)

def size = cache.size
Expand Down
Loading

0 comments on commit 6b34a41

Please sign in to comment.