Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvgica committed Oct 21, 2023
1 parent 3bca313 commit be95e1e
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import org.slf4j.Logger
trait AutoUpdater[U[_], R[_], T] extends AutoCloseable {
def start(
log: Logger,
updateVar: => U[T],
updateVar: () => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty
handleInitializationError: PartialFunction[Throwable, U[T]]
): R[Unit]

def latest: T
def latest: Option[T]
}
29 changes: 16 additions & 13 deletions periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.slf4j.LoggerFactory
* guaranteed to get the latest var.
*
* An AutoUpdatingVar attempts to get the variable immediately upon class instantiation. If this
* fails, there are no further attempts (unless specified via `handleInitializationError1), and the
* fails, there are no further attempts (unless specified via `handleInitializationError`), and the
* effect returned by the `ready` method will complete unsuccesfully. If it succeeds, the effect
* completes successfully and `latest` can be safely called.
*
Expand All @@ -30,8 +30,7 @@ import org.slf4j.LoggerFactory
* A name for this variable, used in logging. If unspecified, the simple class name of T will be
* used.
*/
class AutoUpdatingVar[U[_], R[_], T](
autoUpdater: AutoUpdater[U, R, T],
class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
Expand All @@ -47,31 +46,35 @@ class AutoUpdatingVar[U[_], R[_], T](

private val log = LoggerFactory.getLogger(s"AutoUpdatingVar[$varName]")

log.info(s"$this: Starting. ${updateAttemptStrategy.description}")
log.info(s"Starting. ${updateAttemptStrategy.description}")

/** @return
* An effect which, once successfully completed, signifies that the AutoUpdatingVar has a
* value, i.e. `latest` can be called and no exception will be thrown.
*/
def ready: R[Unit] = autoUpdater.start(
private val _ready = autoUpdater.start(
log,
updateVar,
() => updateVar,
updateInterval,
updateAttemptStrategy,
handleInitializationError
)

/** Wait for `ready` to be completed before calling this method.
/** @return
* An effect which, once successfully completed, signifies that the AutoUpdatingVar has a
* value, i.e. `latest` can be called and no exception will be thrown.
*/
def ready: R[Unit] = _ready

/** Get the latest variable value from memory. Does not attempt to update the var.
*
* Wait for `ready` to be completed before calling this method.
*
* @return
* The latest value of the variable. Calling this method is thread-safe.
* @throws UnreadyAutoUpdatingVarException
* if there is not yet a value to return
*/
def latest: T = autoUpdater.latest
def latest: T = autoUpdater.latest.getOrElse(throw UnreadyAutoUpdatingVarException)

override def close(): Unit = {
autoUpdater.close()
log.info(s"$this: Shutting down")
log.info(s"Shut down sucessfully")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class JdkAutoUpdater[T](

override def start(
log: Logger,
updateVar: => T,
updateVar: () => T,
updateInterval: UpdateInterval[T],
updateAttemptStrategy: UpdateAttemptStrategy,
handleInitializationError: PartialFunction[Throwable, T]
Expand All @@ -56,7 +56,7 @@ class JdkAutoUpdater[T](
val tryV =
Try(try {
try {
updateVar
updateVar()
} catch {
case NonFatal(e) =>
log.error("Failed to initialize var", e)
Expand All @@ -71,7 +71,7 @@ class JdkAutoUpdater[T](
log.info("Successfully initialized")
scheduleUpdate(updateInterval.duration(value))(
log,
() => updateVar,
updateVar,
updateInterval,
updateAttemptStrategy
)
Expand All @@ -84,14 +84,17 @@ class JdkAutoUpdater[T](
TimeUnit.NANOSECONDS
)

blockUntilReadyTimeout.foreach { timeout =>
Await.result(_ready.future, timeout)
blockUntilReadyTimeout match {
case Some(timeout) =>
Try(Await.result(_ready.future, timeout)) match {
case Success(_) => Future.successful(())
case Failure(exception) => throw exception
}
case None => _ready.future
}

_ready.future
}

override def latest: T = variable.getOrElse(throw UnreadyAutoUpdatingVarException)
override def latest: Option[T] = variable

override def close(): Unit = {
CloseLock.synchronized {
Expand Down Expand Up @@ -131,6 +134,7 @@ class JdkAutoUpdater[T](
updateAttemptStrategy: UpdateAttemptStrategy
) extends Runnable {
def run(): Unit = {
log.info("Attempting var update...")
try {
val newV = updateVar()
variable = Some(newV)
Expand Down
1 change: 1 addition & 0 deletions periodic-jdk/src/test/resources/simplelogger.properties
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.slf4j.simpleLogger.logFile=./periodic-jdk/log/test.log
org.slf4j.simpleLogger.showDateTime=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package ca.dvgi.periodic.jdk

import ca.dvgi.periodic._
import scala.concurrent.duration._
import scala.util.Success
import org.slf4j.LoggerFactory
import scala.concurrent.Await
// import java.util.concurrent.Executors

class JdkAutoUpdaterTest extends munit.FunSuite {

private val log = LoggerFactory.getLogger(getClass)

case object TestException extends RuntimeException

class VarHolder {
private var v = 1
def get: Int = {
log.info("getting")
val r = v
v = v + 1
r
}
}

class VarErrorHolder {
var attempts = 0
def get: Int = {
attempts = attempts + 1
sys.error("test exception")
}
}

FunFixture(
_ => {
val holder = new VarHolder
val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(5.seconds)))(
holder.get,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
)
(v, holder)
},
(f: (AutoCloseable, VarHolder)) => f._1.close()
)
.test("periodically updates the var, blockng on start, and closes") { case (v, holder) =>
assert(v.ready.isCompleted)
assertEquals(v.ready.value, Some(Success(())))

assertEquals(v.latest, 1)
assertEquals(v.latest, 1) // value should still be cached

Thread.sleep(1100)

assertEquals(v.latest, 2)
assertEquals(v.latest, 2)

Thread.sleep(1000)

assertEquals(v.latest, 3)
assertEquals(v.latest, 3)

v.close()

Thread.sleep(1000)
assertEquals(holder.get, 4)
}

FunFixture(
_ => {
val holder = new VarHolder
val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
holder.get,
UpdateInterval.Dynamic((i: Int) => i * 1.second),
UpdateAttemptStrategy.Infinite(1.second)
)
(v, holder)
},
(f: (AutoCloseable, VarHolder)) => f._1.close()
)
.test("adjusts the update interval based on the returned value") { case (v, _) =>
assert(v.ready.isCompleted)
assertEquals(v.ready.value, Some(Success(())))

assertEquals(v.latest, 1)
assertEquals(v.latest, 1) // value should still be cached

Thread.sleep(1100)

assertEquals(v.latest, 2)
assertEquals(v.latest, 2)

Thread.sleep(1000)

assertEquals(v.latest, 2) // still 2 since update shouldn't have happened yet

Thread.sleep(1000)

assertEquals(v.latest, 3)
}

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int]())(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
)
},
(f: AutoCloseable) => f.close()
).test("returns a failed future from ready if the first update fails") { v =>
intercept[TestException.type] { Await.result(v.ready, 1.second) }
v.close
}

FunFixture(
_ => {
new AutoUpdatingVar(new JdkAutoUpdater[Int]())(
{
Thread.sleep(1000)
1
},
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
)
},
(f: AutoCloseable) => f.close()
).test("throws an exception if latest called before var is initialized") { v =>
intercept[UnreadyAutoUpdatingVarException.type] { v.latest }
}

test(
"returns a failed future from constructor if the first update fails and instructed to block"
) {
intercept[TestException.type] {
new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))(
throw TestException,
UpdateInterval.Static(1.seconds),
UpdateAttemptStrategy.Infinite(1.second)
)
}
}

// test(
// "handles initialization errors"
// ) {
// case object TestException extends RuntimeException

// val v =
// new JdkAutoUpdatingVar(
// throw TestException,
// UpdateInterval.Static(1.seconds),
// UpdateAttemptStrategy.Infinite(1.second),
// Some(1.second),
// { case _ =>
// 42
// }
// )

// assertEquals(v.latest, 42)
// v.close()
// }

// test("does infinite reattempts") {
// val holder = new VarErrorHolder
// val v =
// new JdkAutoUpdatingVar(
// holder.get,
// UpdateInterval.Static(1.second),
// UpdateAttemptStrategy.Infinite(1.second),
// Some(1.second),
// { case _ =>
// 42
// }
// )

// assertEquals(v.latest, 42)
// assertEquals(holder.attempts, 1)

// Thread.sleep(1100)

// assertEquals(v.latest, 42)
// assertEquals(holder.attempts, 2)

// Thread.sleep(1000)

// assertEquals(v.latest, 42)
// assertEquals(holder.attempts, 3)

// v.close()
// }

// test("does finite reattempts") {
// val holder = new VarErrorHolder
// var terminated = false
// val v =
// new JdkAutoUpdatingVar(
// holder.get,
// UpdateInterval.Static(1.second),
// UpdateAttemptStrategy
// .Finite(1.second, 2, UpdateAttemptExhaustionBehavior.Custom(_ => terminated = true)),
// Some(1.second),
// { case _ =>
// 42
// }
// )

// assertEquals(v.latest, 42)
// assertEquals(holder.attempts, 1)
// assertEquals(terminated, false)

// Thread.sleep(1100)

// assertEquals(v.latest, 42)
// assertEquals(holder.attempts, 2)
// assertEquals(terminated, false)

// Thread.sleep(1000)

// assertEquals(v.latest, 42)
// assertEquals(holder.attempts, 3)
// assertEquals(terminated, true)

// Thread.sleep(1000)

// assertEquals(holder.attempts, 3)

// v.close()
// }

// test("can use an external SchedulerExecutorService") {
// val holder = new VarHolder
// val ses = Executors.newScheduledThreadPool(1)

// val v = new JdkAutoUpdatingVar(
// holder.get,
// UpdateInterval.Static(2.seconds),
// UpdateAttemptStrategy.Infinite(1.second),
// Some(1.second),
// executorOverride = Some(ses)
// )

// assertEquals(v.latest, 1)

// v.close()
// assert(!ses.isShutdown())

// Thread.sleep(5000)
// assertEquals(holder.get, 2)
// }
}
Loading

0 comments on commit be95e1e

Please sign in to comment.