Skip to content

Commit

Permalink
Merge pull request gemini-hlsw#2489 from gemini-hlsw/update/cats-effe…
Browse files Browse the repository at this point in the history
…ct-3.5

CE 3.5 updates
  • Loading branch information
armanbilge authored Oct 4, 2023
2 parents 81cfd46 + 62e7bc6 commit 254c246
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 84 deletions.
160 changes: 87 additions & 73 deletions modules/server/src/main/scala/seqexec/server/EpicsUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ abstract class EpicsCommandBase[F[_]: Async](sysName: String) extends EpicsComma

override def post(timeout: FiniteDuration): F[ApplyCommandResult] = setTimeout(timeout) *>
Async[F]
.async_[ApplyCommandResult] { (f: Either[Throwable, ApplyCommandResult] => Unit) =>
cs.map { ccs =>
ccs.postCallback {
new CaCommandListener {
override def onSuccess(): Unit = f(ApplyCommandResult.Completed.asRight)
override def onPause(): Unit = f(ApplyCommandResult.Paused.asRight)
override def onFailure(cause: Exception): Unit = f(cause.asLeft)
}
.async[ApplyCommandResult] { (f: Either[Throwable, ApplyCommandResult] => Unit) =>
Async[F]
.delay {
cs.map { ccs =>
ccs.postCallback {
new CaCommandListener {
override def onSuccess(): Unit = f(ApplyCommandResult.Completed.asRight)
override def onPause(): Unit = f(ApplyCommandResult.Paused.asRight)
override def onFailure(cause: Exception): Unit = f(cause.asLeft)
}
}
// It should call f on all execution paths, thanks @tpolecat
}.void
.getOrElse(f(SeqexecFailure.Unexpected("Unable to trigger command.").asLeft))
}
// It should call f on all execution paths, thanks @tpolecat
}.void
.getOrElse(f(SeqexecFailure.Unexpected("Unable to trigger command.").asLeft))
.as(Some(Async[F].unit))
}
.addSystemNameToCmdError(sysName)

Expand Down Expand Up @@ -143,21 +147,25 @@ abstract class ObserveCommandBase[F[_]: Async](sysName: String) extends ObserveC

override def post(timeout: FiniteDuration): F[ObserveCommandResult] = setTimeout(timeout) *>
Async[F]
.async_[ObserveCommandResult] { (f: Either[Throwable, ObserveCommandResult] => Unit) =>
os.map { oos =>
oos.postCallback {
new CaCommandListener {
override def onSuccess(): Unit = f(ObserveCommandResult.Success.asRight)
override def onPause(): Unit = f(ObserveCommandResult.Paused.asRight)
override def onFailure(cause: Exception): Unit = cause match {
case _: CaObserveStopped => f(ObserveCommandResult.Stopped.asRight)
case _: CaObserveAborted => f(ObserveCommandResult.Aborted.asRight)
case _ => f(cause.asLeft)
.async[ObserveCommandResult] { (f: Either[Throwable, ObserveCommandResult] => Unit) =>
Async[F]
.delay {
os.map { oos =>
oos.postCallback {
new CaCommandListener {
override def onSuccess(): Unit = f(ObserveCommandResult.Success.asRight)
override def onPause(): Unit = f(ObserveCommandResult.Paused.asRight)
override def onFailure(cause: Exception): Unit = cause match {
case _: CaObserveStopped => f(ObserveCommandResult.Stopped.asRight)
case _: CaObserveAborted => f(ObserveCommandResult.Aborted.asRight)
case _ => f(cause.asLeft)
}
}
}
}
}.void
.getOrElse(f(SeqexecFailure.Unexpected("Unable to trigger command.").asLeft))
}
}.void
.getOrElse(f(SeqexecFailure.Unexpected("Unable to trigger command.").asLeft))
.as(Some(Async[F].unit))
}
.addSystemNameToCmdError(sysName)

Expand Down Expand Up @@ -211,60 +219,66 @@ object EpicsUtil {
timeout: FiniteDuration,
name: String
): F[T] =
Async[F].async_[T] { (f: Either[Throwable, T] => Unit) =>
// The task is created with async. So we do whatever we need to do,
// and then call `f` to signal the completion of the task.

// `resultGuard` and `lock` are used for synchronization.
val resultGuard = new AtomicInteger(1)
val lock = new ReentrantLock()

// First we verify that the attribute doesn't already have the required value.
// NOTE: It was possible to lose a change to the right value if it happened between here and the line that
// subscribes to the attribute, and the wait would end in timeout. That is not the case anymore, because the
// CaAttribute will call the callback once on subscription.
if (!attr.values().isEmpty && vv.contains(attr.value)) {
f(attr.value.asRight)
} else {
// If not, we set a timer for the timeout, and a listener for the EPICS
// channel. The timer and the listener can both complete the IO. The
// first one to do it cancels the other.The use of `resultGuard`
// guarantees that only one of them will complete the IO.
val timer = new JTimer
val statusListener = new CaAttributeListener[T] {
override def onValueChange(newVals: util.List[T]): Unit =
if (
!newVals.isEmpty && vv.contains(newVals.get(0)) && resultGuard.getAndDecrement() === 1
) {
locked(lock) {
attr.removeListener(this)
timer.cancel()
}
// This `right` looks a bit confusing because is not related to
// the `TrySeq`, but to the result of `IO`.
f(newVals.get(0).asRight)
}

override def onValidityChange(newValidity: Boolean): Unit = {}
}

locked(lock) {
if (timeout.toMillis > 0) {
timer.schedule(
new TimerTask {
override def run(): Unit = if (resultGuard.getAndDecrement() === 1) {
Async[F].async[T] { (f: Either[Throwable, T] => Unit) =>
Async[F]
.delay {
// The task is created with async. So we do whatever we need to do,
// and then call `f` to signal the completion of the task.

// `resultGuard` and `lock` are used for synchronization.
val resultGuard = new AtomicInteger(1)
val lock = new ReentrantLock()

// First we verify that the attribute doesn't already have the required value.
// NOTE: It was possible to lose a change to the right value if it happened between here and the line that
// subscribes to the attribute, and the wait would end in timeout. That is not the case anymore, because the
// CaAttribute will call the callback once on subscription.
if (!attr.values().isEmpty && vv.contains(attr.value)) {
f(attr.value.asRight)
} else {
// If not, we set a timer for the timeout, and a listener for the EPICS
// channel. The timer and the listener can both complete the IO. The
// first one to do it cancels the other.The use of `resultGuard`
// guarantees that only one of them will complete the IO.
val timer = new JTimer
val statusListener = new CaAttributeListener[T] {
override def onValueChange(newVals: util.List[T]): Unit =
if (
!newVals.isEmpty &&
vv.contains(newVals.get(0)) &&
resultGuard.getAndDecrement() === 1
) {
locked(lock) {
attr.removeListener(statusListener)
attr.removeListener(this)
timer.cancel()
}
f(SeqexecFailure.Timeout(name).asLeft)
// This `right` looks a bit confusing because is not related to
// the `TrySeq`, but to the result of `IO`.
f(newVals.get(0).asRight)
}
},
timeout.toMillis
)

override def onValidityChange(newValidity: Boolean): Unit = {}
}

locked(lock) {
if (timeout.toMillis > 0) {
timer.schedule(
new TimerTask {
override def run(): Unit = if (resultGuard.getAndDecrement() === 1) {
locked(lock) {
attr.removeListener(statusListener)
}
f(SeqexecFailure.Timeout(name).asLeft)
}
},
timeout.toMillis
)
}
attr.addListener(statusListener)
}
}
attr.addListener(statusListener)
}
}
.as(Some(Async[F].unit))
}

def waitForValueF[T, F[_]: Async](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cats.data.OptionT
import cats.effect.Sync
import cats.instances.string._
import cats.syntax.eq._
import fs2.compression.Compression
import org.http4s.CacheDirective._
import org.http4s.HttpRoutes
import org.http4s.Request
Expand All @@ -17,7 +18,7 @@ import org.http4s.StaticFile
import org.http4s.headers.`Cache-Control`
import org.http4s.server.middleware.GZip

class StaticRoutes[F[_]: Sync](
class StaticRoutes[F[_]: Sync: Compression](
devMode: Boolean,
builtAtMillis: Long
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package seqexec.web.server.http4s

import cats.effect.Async
import cats.syntax.all._
import fs2.compression.Compression
import org.typelevel.log4cats.Logger
import org.http4s.EntityDecoder
import org.http4s.HttpRoutes
Expand All @@ -15,7 +16,8 @@ import seqexec.server.tcs.GuideConfig
import seqexec.server.tcs.GuideConfigDb
import seqexec.server.tcs.GuideConfigDb._

class GuideConfigDbRoutes[F[_]: Async: Logger](db: GuideConfigDb[F]) extends Http4sDsl[F] {
class GuideConfigDbRoutes[F[_]: Async: Logger: Compression](db: GuideConfigDb[F])
extends Http4sDsl[F] {

implicit val decoder: EntityDecoder[F, GuideConfig] = jsonOf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package seqexec.web.server.http4s

import cats.effect.Async
import cats.syntax.all._
import fs2.compression.Compression
import org.http4s._
import org.http4s.dsl._
import org.http4s.server.middleware.GZip
Expand All @@ -22,7 +23,7 @@ import seqexec.web.server.security.TokenRefresher
/**
* Rest Endpoints under the /api route
*/
class SeqexecCommandRoutes[F[_]: Async](
class SeqexecCommandRoutes[F[_]: Async: Compression](
auth: AuthenticationService[F],
inputQueue: server.EventQueue[F],
se: SeqexecEngine[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import cats.data.NonEmptyList
import cats.effect.Async
import cats.effect.Sync
import cats.syntax.all._
import com.comcast.ip4s.Dns
import fs2.Pipe
import fs2.Stream
import fs2.compression.Compression
import fs2.concurrent.Topic
import giapi.client.GiapiStatusDb
import giapi.client.StatusValue
Expand Down Expand Up @@ -49,7 +51,7 @@ import seqexec.web.server.security.TokenRefresher
/**
* Rest Endpoints under the /api route
*/
class SeqexecUIApiRoutes[F[_]: Async](
class SeqexecUIApiRoutes[F[_]: Async: Dns: Compression](
site: Site,
mode: Mode,
auth: AuthenticationService[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import web.server.common.StaticRoutes
import cats.effect.{ Ref, Resource, Temporal }
import org.http4s.jdkhttpclient.JdkHttpClient
import org.http4s.blaze.server.BlazeServerBuilder
import com.comcast.ip4s.Dns
import fs2.compression.Compression

object WebServerLauncher extends IOApp with LogInitialization {
private implicit def L: Logger[IO] = Slf4jLogger.getLoggerFromName[IO]("seqexec")
Expand Down Expand Up @@ -112,7 +114,7 @@ object WebServerLauncher extends IOApp with LogInitialization {
}

/** Resource that yields the running web server */
def webServer[F[_]: Logger: Async](
def webServer[F[_]: Logger: Async: Dns: Compression](
conf: SeqexecConfiguration,
cal: SmartGcal,
as: AuthenticationService[F],
Expand Down Expand Up @@ -168,7 +170,7 @@ object WebServerLauncher extends IOApp with LogInitialization {

}

def redirectWebServer[F[_]: Logger: Async](
def redirectWebServer[F[_]: Logger: Async: Compression](
gcdb: GuideConfigDb[F],
cal: SmartGcal
)(conf: WebServerConfiguration): Resource[F, Server] = {
Expand Down
10 changes: 5 additions & 5 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ object Settings {
val scalaJSReactSortable = "0.5.2"

// Scala libraries
val catsEffectVersion = "3.4.11"
val catsEffectVersion = "3.5.2"
val catsVersion = "2.10.0"
val mouseVersion = "1.2.1"
val fs2Version = "3.6.1"
val fs2Version = "3.9.2"
val shapelessVersion = "2.3.9"
val scalaParsersVersion = "1.1.2"
val scalaXmlVersion = "1.2.0"
val catsTime = "0.4.0"

val http4sVersion = "0.23.18"
val http4sBlazeVersion = "0.23.14"
val http4sJdkHttpClientVersion = "0.9.0"
val http4sVersion = "0.23.23"
val http4sBlazeVersion = "0.23.15"
val http4sJdkHttpClientVersion = "0.9.1"
val http4sBoopickleVersion = "0.23.11"
val http4sPrometheusMetricsVersion = "0.24.3"
val http4sScalaXmlVersion = "0.23.13"
Expand Down

0 comments on commit 254c246

Please sign in to comment.