diff --git a/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala b/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala index b819fb7..8bc5fd4 100644 --- a/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala +++ b/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdater.scala @@ -5,11 +5,11 @@ import org.slf4j.Logger trait AutoUpdater[U[_], R[_], T] extends AutoCloseable { def start( log: Logger, - updateVar: => U[T], + updateVar: () => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, - handleInitializationError: PartialFunction[Throwable, U[T]] = PartialFunction.empty + handleInitializationError: PartialFunction[Throwable, U[T]] ): R[Unit] - def latest: T + def latest: Option[T] } diff --git a/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala b/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala index f2ffb79..7815a37 100644 --- a/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala +++ b/periodic-api/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala @@ -7,7 +7,7 @@ import org.slf4j.LoggerFactory * guaranteed to get the latest var. * * An AutoUpdatingVar attempts to get the variable immediately upon class instantiation. If this - * fails, there are no further attempts (unless specified via `handleInitializationError1), and the + * fails, there are no further attempts (unless specified via `handleInitializationError`), and the * effect returned by the `ready` method will complete unsuccesfully. If it succeeds, the effect * completes successfully and `latest` can be safely called. * @@ -30,8 +30,7 @@ import org.slf4j.LoggerFactory * A name for this variable, used in logging. If unspecified, the simple class name of T will be * used. */ -class AutoUpdatingVar[U[_], R[_], T]( - autoUpdater: AutoUpdater[U, R, T], +class AutoUpdatingVar[U[_], R[_], T](autoUpdater: AutoUpdater[U, R, T])( updateVar: => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, @@ -47,31 +46,35 @@ class AutoUpdatingVar[U[_], R[_], T]( private val log = LoggerFactory.getLogger(s"AutoUpdatingVar[$varName]") - log.info(s"$this: Starting. ${updateAttemptStrategy.description}") + log.info(s"Starting. ${updateAttemptStrategy.description}") - /** @return - * An effect which, once successfully completed, signifies that the AutoUpdatingVar has a - * value, i.e. `latest` can be called and no exception will be thrown. - */ - def ready: R[Unit] = autoUpdater.start( + private val _ready = autoUpdater.start( log, - updateVar, + () => updateVar, updateInterval, updateAttemptStrategy, handleInitializationError ) - /** Wait for `ready` to be completed before calling this method. + /** @return + * An effect which, once successfully completed, signifies that the AutoUpdatingVar has a + * value, i.e. `latest` can be called and no exception will be thrown. + */ + def ready: R[Unit] = _ready + + /** Get the latest variable value from memory. Does not attempt to update the var. + * + * Wait for `ready` to be completed before calling this method. * * @return * The latest value of the variable. Calling this method is thread-safe. * @throws UnreadyAutoUpdatingVarException * if there is not yet a value to return */ - def latest: T = autoUpdater.latest + def latest: T = autoUpdater.latest.getOrElse(throw UnreadyAutoUpdatingVarException) override def close(): Unit = { autoUpdater.close() - log.info(s"$this: Shutting down") + log.info(s"Shut down sucessfully") } } diff --git a/periodic-jdk/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala b/periodic-jdk/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala index 7923576..9e3c649 100644 --- a/periodic-jdk/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala +++ b/periodic-jdk/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdater.scala @@ -45,7 +45,7 @@ class JdkAutoUpdater[T]( override def start( log: Logger, - updateVar: => T, + updateVar: () => T, updateInterval: UpdateInterval[T], updateAttemptStrategy: UpdateAttemptStrategy, handleInitializationError: PartialFunction[Throwable, T] @@ -56,7 +56,7 @@ class JdkAutoUpdater[T]( val tryV = Try(try { try { - updateVar + updateVar() } catch { case NonFatal(e) => log.error("Failed to initialize var", e) @@ -71,7 +71,7 @@ class JdkAutoUpdater[T]( log.info("Successfully initialized") scheduleUpdate(updateInterval.duration(value))( log, - () => updateVar, + updateVar, updateInterval, updateAttemptStrategy ) @@ -84,14 +84,17 @@ class JdkAutoUpdater[T]( TimeUnit.NANOSECONDS ) - blockUntilReadyTimeout.foreach { timeout => - Await.result(_ready.future, timeout) + blockUntilReadyTimeout match { + case Some(timeout) => + Try(Await.result(_ready.future, timeout)) match { + case Success(_) => Future.successful(()) + case Failure(exception) => throw exception + } + case None => _ready.future } - - _ready.future } - override def latest: T = variable.getOrElse(throw UnreadyAutoUpdatingVarException) + override def latest: Option[T] = variable override def close(): Unit = { CloseLock.synchronized { @@ -131,6 +134,7 @@ class JdkAutoUpdater[T]( updateAttemptStrategy: UpdateAttemptStrategy ) extends Runnable { def run(): Unit = { + log.info("Attempting var update...") try { val newV = updateVar() variable = Some(newV) diff --git a/periodic-jdk/src/test/resources/simplelogger.properties b/periodic-jdk/src/test/resources/simplelogger.properties index 57ee9b4..127f3d0 100644 --- a/periodic-jdk/src/test/resources/simplelogger.properties +++ b/periodic-jdk/src/test/resources/simplelogger.properties @@ -1 +1,2 @@ org.slf4j.simpleLogger.logFile=./periodic-jdk/log/test.log +org.slf4j.simpleLogger.showDateTime=true diff --git a/periodic-jdk/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala b/periodic-jdk/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala new file mode 100644 index 0000000..9c7aa95 --- /dev/null +++ b/periodic-jdk/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdaterTest.scala @@ -0,0 +1,251 @@ +package ca.dvgi.periodic.jdk + +import ca.dvgi.periodic._ +import scala.concurrent.duration._ +import scala.util.Success +import org.slf4j.LoggerFactory +import scala.concurrent.Await +// import java.util.concurrent.Executors + +class JdkAutoUpdaterTest extends munit.FunSuite { + + private val log = LoggerFactory.getLogger(getClass) + + case object TestException extends RuntimeException + + class VarHolder { + private var v = 1 + def get: Int = { + log.info("getting") + val r = v + v = v + 1 + r + } + } + + class VarErrorHolder { + var attempts = 0 + def get: Int = { + attempts = attempts + 1 + sys.error("test exception") + } + } + + FunFixture( + _ => { + val holder = new VarHolder + val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(5.seconds)))( + holder.get, + UpdateInterval.Static(1.seconds), + UpdateAttemptStrategy.Infinite(1.second) + ) + (v, holder) + }, + (f: (AutoCloseable, VarHolder)) => f._1.close() + ) + .test("periodically updates the var, blockng on start, and closes") { case (v, holder) => + assert(v.ready.isCompleted) + assertEquals(v.ready.value, Some(Success(()))) + + assertEquals(v.latest, 1) + assertEquals(v.latest, 1) // value should still be cached + + Thread.sleep(1100) + + assertEquals(v.latest, 2) + assertEquals(v.latest, 2) + + Thread.sleep(1000) + + assertEquals(v.latest, 3) + assertEquals(v.latest, 3) + + v.close() + + Thread.sleep(1000) + assertEquals(holder.get, 4) + } + + FunFixture( + _ => { + val holder = new VarHolder + val v = new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + holder.get, + UpdateInterval.Dynamic((i: Int) => i * 1.second), + UpdateAttemptStrategy.Infinite(1.second) + ) + (v, holder) + }, + (f: (AutoCloseable, VarHolder)) => f._1.close() + ) + .test("adjusts the update interval based on the returned value") { case (v, _) => + assert(v.ready.isCompleted) + assertEquals(v.ready.value, Some(Success(()))) + + assertEquals(v.latest, 1) + assertEquals(v.latest, 1) // value should still be cached + + Thread.sleep(1100) + + assertEquals(v.latest, 2) + assertEquals(v.latest, 2) + + Thread.sleep(1000) + + assertEquals(v.latest, 2) // still 2 since update shouldn't have happened yet + + Thread.sleep(1000) + + assertEquals(v.latest, 3) + } + + FunFixture( + _ => { + new AutoUpdatingVar(new JdkAutoUpdater[Int]())( + throw TestException, + UpdateInterval.Static(1.seconds), + UpdateAttemptStrategy.Infinite(1.second) + ) + }, + (f: AutoCloseable) => f.close() + ).test("returns a failed future from ready if the first update fails") { v => + intercept[TestException.type] { Await.result(v.ready, 1.second) } + v.close + } + + FunFixture( + _ => { + new AutoUpdatingVar(new JdkAutoUpdater[Int]())( + { + Thread.sleep(1000) + 1 + }, + UpdateInterval.Static(1.seconds), + UpdateAttemptStrategy.Infinite(1.second) + ) + }, + (f: AutoCloseable) => f.close() + ).test("throws an exception if latest called before var is initialized") { v => + intercept[UnreadyAutoUpdatingVarException.type] { v.latest } + } + + test( + "returns a failed future from constructor if the first update fails and instructed to block" + ) { + intercept[TestException.type] { + new AutoUpdatingVar(new JdkAutoUpdater[Int](Some(1.second)))( + throw TestException, + UpdateInterval.Static(1.seconds), + UpdateAttemptStrategy.Infinite(1.second) + ) + } + } + + // test( + // "handles initialization errors" + // ) { + // case object TestException extends RuntimeException + + // val v = + // new JdkAutoUpdatingVar( + // throw TestException, + // UpdateInterval.Static(1.seconds), + // UpdateAttemptStrategy.Infinite(1.second), + // Some(1.second), + // { case _ => + // 42 + // } + // ) + + // assertEquals(v.latest, 42) + // v.close() + // } + + // test("does infinite reattempts") { + // val holder = new VarErrorHolder + // val v = + // new JdkAutoUpdatingVar( + // holder.get, + // UpdateInterval.Static(1.second), + // UpdateAttemptStrategy.Infinite(1.second), + // Some(1.second), + // { case _ => + // 42 + // } + // ) + + // assertEquals(v.latest, 42) + // assertEquals(holder.attempts, 1) + + // Thread.sleep(1100) + + // assertEquals(v.latest, 42) + // assertEquals(holder.attempts, 2) + + // Thread.sleep(1000) + + // assertEquals(v.latest, 42) + // assertEquals(holder.attempts, 3) + + // v.close() + // } + + // test("does finite reattempts") { + // val holder = new VarErrorHolder + // var terminated = false + // val v = + // new JdkAutoUpdatingVar( + // holder.get, + // UpdateInterval.Static(1.second), + // UpdateAttemptStrategy + // .Finite(1.second, 2, UpdateAttemptExhaustionBehavior.Custom(_ => terminated = true)), + // Some(1.second), + // { case _ => + // 42 + // } + // ) + + // assertEquals(v.latest, 42) + // assertEquals(holder.attempts, 1) + // assertEquals(terminated, false) + + // Thread.sleep(1100) + + // assertEquals(v.latest, 42) + // assertEquals(holder.attempts, 2) + // assertEquals(terminated, false) + + // Thread.sleep(1000) + + // assertEquals(v.latest, 42) + // assertEquals(holder.attempts, 3) + // assertEquals(terminated, true) + + // Thread.sleep(1000) + + // assertEquals(holder.attempts, 3) + + // v.close() + // } + + // test("can use an external SchedulerExecutorService") { + // val holder = new VarHolder + // val ses = Executors.newScheduledThreadPool(1) + + // val v = new JdkAutoUpdatingVar( + // holder.get, + // UpdateInterval.Static(2.seconds), + // UpdateAttemptStrategy.Infinite(1.second), + // Some(1.second), + // executorOverride = Some(ses) + // ) + + // assertEquals(v.latest, 1) + + // v.close() + // assert(!ses.isShutdown()) + + // Thread.sleep(5000) + // assertEquals(holder.get, 2) + // } +} diff --git a/periodic-jdk/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdatingVarTest.scala b/periodic-jdk/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdatingVarTest.scala deleted file mode 100644 index a5a6979..0000000 --- a/periodic-jdk/src/test/scala/ca/dvgi/periodic/jdk/JdkAutoUpdatingVarTest.scala +++ /dev/null @@ -1,242 +0,0 @@ -package ca.dvgi.periodic.jdk - -import ca.dvgi.periodic._ -import scala.concurrent.duration._ -import scala.util.Success -import scala.concurrent.Await -import java.util.concurrent.Executors - -class JdkAutoUpdatingVarTest extends munit.FunSuite { - - class VarHolder { - private var v = 1 - def get: Int = { - val r = v - v = v + 1 - r - } - } - - class VarErrorHolder { - var attempts = 0 - def get: Int = { - attempts = attempts + 1 - sys.error("test exception") - } - } - - test("periodically updates the var, blockng on start, and closes") { - val holder = new VarHolder - - val v = new JdkAutoUpdatingVar( - holder.get, - UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second), - Some(1.second) - ) - - assert(v.ready.isCompleted) - assertEquals(v.ready.value, Some(Success(()))) - - assertEquals(v.latest, 1) - assertEquals(v.latest, 1) // value should still be cached - - Thread.sleep(1100) - - assertEquals(v.latest, 2) - assertEquals(v.latest, 2) - - Thread.sleep(1000) - - assertEquals(v.latest, 3) - assertEquals(v.latest, 3) - - v.close() - - Thread.sleep(1000) - assertEquals(holder.get, 4) - } - - test("adjusts the update interval based on the returned value") { - val holder = new VarHolder - - val v = new JdkAutoUpdatingVar( - holder.get, - UpdateInterval.Dynamic((i: Int) => i * 1.second), - UpdateAttemptStrategy.Infinite(1.second), - Some(1.second) - ) - - assert(v.ready.isCompleted) - assertEquals(v.ready.value, Some(Success(()))) - - assertEquals(v.latest, 1) - assertEquals(v.latest, 1) // value should still be cached - - Thread.sleep(1100) - - assertEquals(v.latest, 2) - assertEquals(v.latest, 2) - - Thread.sleep(1000) - - assertEquals(v.latest, 2) // still 2 since update shouldn't have happened yet - - Thread.sleep(1000) - - assertEquals(v.latest, 3) - - v.close() - } - - test("returns a failed future from ready if the first update fails") { - case object TestException extends RuntimeException - - val v = new JdkAutoUpdatingVar[Int]( - throw TestException, - UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second) - ) - - intercept[TestException.type] { Await.result(v.ready, 1.second) } - v.close - } - - test("throws an exception if latest called before var is initialized") { - - val v = new JdkAutoUpdatingVar( - { - Thread.sleep(1000) - 1 - }, - UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second) - ) - - intercept[UnreadyAutoUpdatingVarException.type] { v.latest } - v.close - } - - test( - "returns a failed future from constructor if the first update fails and instructed to block" - ) { - case object TestException extends RuntimeException - - intercept[TestException.type] { - new JdkAutoUpdatingVar[Int]( - throw TestException, - UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second), - Some(1.second) - ) - } - } - - test( - "handles initialization errors" - ) { - case object TestException extends RuntimeException - - val v = - new JdkAutoUpdatingVar( - throw TestException, - UpdateInterval.Static(1.seconds), - UpdateAttemptStrategy.Infinite(1.second), - Some(1.second), - { case _ => - 42 - } - ) - - assertEquals(v.latest, 42) - v.close() - } - - test("does infinite reattempts") { - val holder = new VarErrorHolder - val v = - new JdkAutoUpdatingVar( - holder.get, - UpdateInterval.Static(1.second), - UpdateAttemptStrategy.Infinite(1.second), - Some(1.second), - { case _ => - 42 - } - ) - - assertEquals(v.latest, 42) - assertEquals(holder.attempts, 1) - - Thread.sleep(1100) - - assertEquals(v.latest, 42) - assertEquals(holder.attempts, 2) - - Thread.sleep(1000) - - assertEquals(v.latest, 42) - assertEquals(holder.attempts, 3) - - v.close() - } - - test("does finite reattempts") { - val holder = new VarErrorHolder - var terminated = false - val v = - new JdkAutoUpdatingVar( - holder.get, - UpdateInterval.Static(1.second), - UpdateAttemptStrategy - .Finite(1.second, 2, UpdateAttemptExhaustionBehavior.Custom(_ => terminated = true)), - Some(1.second), - { case _ => - 42 - } - ) - - assertEquals(v.latest, 42) - assertEquals(holder.attempts, 1) - assertEquals(terminated, false) - - Thread.sleep(1100) - - assertEquals(v.latest, 42) - assertEquals(holder.attempts, 2) - assertEquals(terminated, false) - - Thread.sleep(1000) - - assertEquals(v.latest, 42) - assertEquals(holder.attempts, 3) - assertEquals(terminated, true) - - Thread.sleep(1000) - - assertEquals(holder.attempts, 3) - - v.close() - } - - test("can use an external SchedulerExecutorService") { - val holder = new VarHolder - val ses = Executors.newScheduledThreadPool(1) - - val v = new JdkAutoUpdatingVar( - holder.get, - UpdateInterval.Static(2.seconds), - UpdateAttemptStrategy.Infinite(1.second), - Some(1.second), - executorOverride = Some(ses) - ) - - assertEquals(v.latest, 1) - - v.close() - assert(!ses.isShutdown()) - - Thread.sleep(5000) - assertEquals(holder.get, 2) - } -}