Skip to content

Commit

Permalink
unify connect+initialize
Browse files Browse the repository at this point in the history
  • Loading branch information
rpiaggio committed Jul 22, 2024
1 parent 319ad4d commit ff7b44d
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 359 deletions.
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val scala3Version = "3.4.2"
lazy val rulesCrossVersions = Seq(V.scala213)
lazy val allVersions = rulesCrossVersions :+ scala3Version

ThisBuild / tlBaseVersion := "0.39"
ThisBuild / tlBaseVersion := "0.40"
ThisBuild / tlCiReleaseBranches := Seq("master")
ThisBuild / tlJdkRelease := Some(8)
ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_))
Expand Down Expand Up @@ -92,8 +92,9 @@ lazy val http4sJDKDemo = project
.in(file("http4s-jdk-demo"))
.enablePlugins(NoPublishPlugin)
.settings(
moduleName := "clue-http4s-jdk-client-demo",
tlJdkRelease := Some(11),
moduleName := "clue-http4s-jdk-client-demo",
tlJdkRelease := Some(11),
Compile / run / fork := true,
libraryDependencies ++= Seq(
"org.typelevel" %% "log4cats-slf4j" % Settings.LibraryVersions.log4Cats,
"org.slf4j" % "slf4j-simple" % "2.0.13"
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/clue/PersistentClientStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ sealed trait PersistentClientStatus
object PersistentClientStatus {
case object Connecting extends PersistentClientStatus
case object Connected extends PersistentClientStatus
case object Initializing extends PersistentClientStatus
case object Initialized extends PersistentClientStatus
// case object Initializing extends PersistentClientStatus
// case object Initialized extends PersistentClientStatus
case object Disconnected extends PersistentClientStatus

implicit val eqStreamingClientStatus: Eq[PersistentClientStatus] = Eq.fromUniversalEquals
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/scala/clue/clients.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,15 @@ trait StreamingClient[F[_], S] extends FetchClientWithPars[F, Unit, S] {
* A client that keeps a connection open and initializable protocol with the server.
*/
trait PersistentClient[F[_], CP, CE] {
// protected val backend: PersistentBackend[F, CP, CE]
// protected val reconnectionStrategy: ReconnectionStrategy[CE]

def status: F[PersistentClientStatus]
def statusStream: fs2.Stream[F, PersistentClientStatus]

def connect(): F[Unit]
// Initialization may repeat upon reconnection, that's why the payload is effectful since it may change over time (eg: auth tokens).
def initialize(payload: F[Map[String, Json]]): F[Unit]
def connect(payload: F[Map[String, Json]]): F[Unit]
def connect(): F[Unit]

def terminate(): F[Unit]
def disconnect(closeParameters: CP): F[Unit]
def disconnect(): F[Unit]

def reestablish(): F[Unit]
}

/**
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/clue/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ package object clue {
def logAndRaiseF_[F[_], A](implicit F: MonadError[F, Throwable], logger: Logger[F]): F[A] =
logger.error(t)("") >> F.raiseError[A](t)

def raiseF[F[_]](implicit F: MonadError[F, Throwable]): F[Unit] =
F.raiseError(t)

def logF[F[_]](
msg: String
)(implicit logger: Logger[F]): F[Unit] =
Expand Down
512 changes: 169 additions & 343 deletions core/src/main/scala/clue/websocket/ApolloClient.scala

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions core/src/main/scala/clue/websocket/Emitter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package clue.websocket

import clue.model.*
import io.circe.*

// Internal structure to emit data and errors to the client.
protected[clue] trait Emitter[F[_]] {
val request: GraphQLRequest[JsonObject]

def emitData(response: GraphQLResponse[Json]): F[Unit]
def emitErrors(errors: GraphQLErrors): F[Unit]
val halt: F[Unit]
}
34 changes: 34 additions & 0 deletions core/src/main/scala/clue/websocket/State.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package clue.websocket

import clue.*
import io.circe.*

// Client internal state for the FSM.
// We keep a connectionId throughout all states to ensure that callback events (onClose, onMessage)
// correpond to the current connection iteration. This is important in case of reconnections.
protected sealed abstract class State[F[_]](val status: PersistentClientStatus) {
val connectionId: ConnectionId
}

protected object State {
final case class Disconnected[F[_]](connectionId: ConnectionId)
extends State[F](PersistentClientStatus.Disconnected)

final case class Connecting[F[_]](
connectionId: ConnectionId,
connection: Option[WebSocketConnection[F]],
initPayload: F[Map[String, Json]],
subscriptions: Map[String, Emitter[F]],
latch: Latch[F]
) extends State[F](PersistentClientStatus.Connecting)

final case class Connected[F[_]](
connectionId: ConnectionId,
connection: WebSocketConnection[F],
initPayload: F[Map[String, Json]],
subscriptions: Map[String, Emitter[F]]
) extends State[F](PersistentClientStatus.Connected)
}
1 change: 1 addition & 0 deletions http4s-jdk-demo/src/main/resources/simplelogger.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.slf4j.simpleLogger.defaultLogLevel=trace
14 changes: 11 additions & 3 deletions http4s-jdk-demo/src/main/scala/clue/http4sjdkDemo/Demo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object Demo extends IOApp.Simple {
object Query extends GraphQLOperation.Typed.NoInput[DemoDB, Json] {
override val document: String = """
|query {
| observations(WHERE: {programId: {EQ: "p-2"}}) {
| observations(WHERE: {program: {id: {EQ: "p-2"}}}) {
| matches {
| id
| title
Expand Down Expand Up @@ -75,14 +75,22 @@ object Demo extends IOApp.Simple {
def withLogger[F[_]: Sync]: Resource[F, Logger[F]] =
Resource.make(Slf4jLogger.create[F])(_ => Applicative[F].unit)

def initPayload[F[_]: Sync]: F[Map[String, Json]] =
Sync[F]
.delay(sys.env.get("ODB_SERVICE_JWT"))
.map(
_.map(token => Map("Authorization" -> Json.fromString(s"Bearer $token")))
.getOrElse(Map.empty)
)

def withStreamingClient[F[_]: Async: Logger]: Resource[F, WebSocketClient[F, DemoDB]] =
for {
client <- Resource.eval(JdkWSClient.simple)
backend = Http4sWebSocketBackend(client)
uri = uri"wss://lucuma-odb-development.herokuapp.com/ws"
uri = uri"wss://lucuma-postgres-odb-dev.herokuapp.com/ws"
sc <-
Resource.eval(Http4sWebSocketClient.of[F, DemoDB](uri)(using Async[F], Logger[F], backend))
_ <- Resource.make(sc.connect() >> sc.initialize())(_ => sc.terminate() >> sc.disconnect())
_ <- Resource.make(sc.connect(initPayload))(_ => sc.disconnect())
} yield sc

val allStatus =
Expand Down

0 comments on commit ff7b44d

Please sign in to comment.