Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error policies, huge refactors, and a lot of other breaking changes #480

Merged
merged 14 commits into from
Mar 7, 2023
6 changes: 3 additions & 3 deletions MACROS.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object BasicQuery extends GraphQLOperation[StarWars] {
implicit val showVariables: cats.Show[Variables] = cats.Show.fromToString

// Circe typeclasses
implicit val jsonEncoderVariables: io.circe.Encoder[Variables] = io.circe.generic.semiauto.deriveEncoder[Variables].mapJson(_.foldWith(clue.data.Input.dropIgnoreFolder))
implicit val jsonEncoderVariables: io.circe.Encoder.AsObject[Variables] = io.circe.generic.semiauto.deriveEncoder[Variables].mapJsonObject(clue.data.Input.dropIgnores)
}

// Operation result.
Expand Down Expand Up @@ -249,11 +249,11 @@ object BasicQuery extends GraphQLOperation[StarWars] {
implicit val jsonDecoderData: io.circe.Decoder[Data] = io.circe.generic.semiauto.deriveDecoder[Data]
}

override val varEncoder: io.circe.Encoder[Variables] = Variables.jsonEncoderVariables
override val varEncoder: io.circe.Encoder.AsObject[Variables] = Variables.jsonEncoderVariables
override val dataDecoder: io.circe.Decoder[Data] = Data.jsonDecoderData

// A convenience parametrized method is generated.
def query[F[_]](episode: Episode)(implicit client: clue.TransactionalClient[F, StarWars]) = client.request(this)(Variables(episode))
def query_[F[_]](episode: Episode)(implicit client: clue.TransactionalClient[F, StarWars]) = client.request_(this)(Variables(episode))
}
```

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ They must extend `GraphQLOperation[S]`, defining the following members:
type Variables
type Data

val varEncoder: io.circe.Encoder[Variables]
val varEncoder: io.circe.Encoder.AsObject[Variables]
val dataDecoder: io.circe.Decoder[Data]
```

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val scala3Version = "3.2.2"
lazy val rulesCrossVersions = Seq(V.scala213)
lazy val allVersions = rulesCrossVersions :+ scala3Version

ThisBuild / tlBaseVersion := "0.25"
ThisBuild / tlBaseVersion := "0.26"
ThisBuild / tlCiReleaseBranches := Seq("master")
ThisBuild / tlJdkRelease := Some(8)
ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_))
Expand Down
113 changes: 58 additions & 55 deletions core/src/main/scala/clue/ApolloClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@

package clue

import cats.data.Ior
import cats.effect.Ref
import cats.effect.Temporal
import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Queue
import cats.effect.std.UUIDGen
import cats.syntax.all._
import clue.ErrorPolicyProcessor
import clue.GraphQLSubscription
import clue.model.GraphQLCombinedResponse
import clue.model.GraphQLDataResponse
import clue.model.GraphQLErrors
import clue.model.GraphQLRequest
import clue.model.StreamingMessage
import clue.model.json._
Expand All @@ -23,13 +28,12 @@ import org.typelevel.log4cats.Logger

import java.util.UUID
import scala.concurrent.duration.FiniteDuration

// Interface for internally handling a subscription queue.
protected[clue] trait Emitter[F[_]] {
val request: GraphQLRequest
val request: GraphQLRequest[JsonObject]

def emitData(json: Json): F[Unit]
def emitError(json: Json): F[Unit]
def emitData(dataJson: Json, errors: Option[GraphQLErrors]): F[Unit]
def emitErrors(errors: GraphQLErrors): F[Unit]
val halt: F[Unit]
}

Expand Down Expand Up @@ -222,20 +226,22 @@ class ApolloClient[F[_], S, CP, CE](
}

// <StreamingClient>
override protected def subscribeInternal[D: Decoder](
override protected def subscribeInternal[D: Decoder, R](
subscription: String,
operationName: Option[String],
variables: Option[Json]
): Resource[F, fs2.Stream[F, D]] =
subscriptionResource(subscription, operationName, variables)(implicitly[Decoder[D]])
variables: Option[JsonObject],
errorPolicy: ErrorPolicyProcessor[D, R]
): Resource[F, fs2.Stream[F, R]] =
subscriptionResource(subscription, operationName, variables, errorPolicy)

// <TransactionalClient>
override protected def requestInternal[D: Decoder](
override protected def requestInternal[D: Decoder, R](
document: String,
operationName: Option[String],
variables: Option[Json]
): F[D] = F.async[D] { cb =>
startSubscription[D](document, operationName, variables).flatMap(
variables: Option[JsonObject],
errorPolicy: ErrorPolicyProcessor[D, R]
): F[R] = F.async[R] { cb =>
startSubscription[D, R](document, operationName, variables, errorPolicy).flatMap(
_.stream.attempt
.evalMap(result => F.delay(cb(result)))
.compile
Expand All @@ -250,39 +256,41 @@ class ApolloClient[F[_], S, CP, CE](
// <WebSocketHandler>
override def onMessage(connectionId: ConnectionId, msg: String): F[Unit] =
decode[StreamingMessage.FromServer](msg) match {
case Left(e) =>
case Left(e) =>
e.raiseF(s"Exception decoding message received from server: [$msg]")
case Right(StreamingMessage.FromServer.ConnectionAck) =>
case Right(StreamingMessage.FromServer.ConnectionAck) =>
stateModify {
case Initializing(stateConnectionId, connection, subscriptions, initPayload, latch)
if connectionId === stateConnectionId =>
Initialized(connectionId, connection, subscriptions, initPayload) ->
(startSubscriptions(connection, subscriptions) >> latch.complete(().asRight).void)
case s => s -> s"Unexpected connection_ack received from server.".warnF
}
case Right(StreamingMessage.FromServer.ConnectionError(payload)) =>
case Right(StreamingMessage.FromServer.ConnectionError(payload)) =>
stateModify {
case Initializing(stateConnectionId, connection, _, _, latch)
if connectionId === stateConnectionId =>
Connected(connectionId, connection) ->
latch.complete(s"Initialization rejected by server: [$payload].".error).void
case s => s -> s"Unexpected connection_error received from server.".warnF
}
case Right(StreamingMessage.FromServer.DataJson(subscriptionId, data, errors)) =>
case Right(
StreamingMessage.FromServer.Data(subscriptionId, GraphQLDataResponse(data, errors, _))
) =>
state.get.flatMap {
case Initialized(stateConnectionId, _, subscriptions, _)
if connectionId === stateConnectionId =>
subscriptions.get(subscriptionId) match {
case None =>
s"Received data for non existant subscription id [$subscriptionId]: $data".warnF
case Some(emitter) =>
errors.fold(emitter.emitData(data))(emitter.emitError)
emitter.emitData(data, errors)
}
case s @ _ =>
s"UNEXPECTED Data RECEIVED for subscription [$subscriptionId]. State is: [$s]".raiseError.void
}
// TODO Contemplate different states.
case Right(StreamingMessage.FromServer.Error(subscriptionId, payload)) =>
case Right(StreamingMessage.FromServer.Error(subscriptionId, payload)) =>
state.get.flatMap {
case Initialized(stateConnectionId, _, subscriptions, _)
if connectionId === stateConnectionId =>
Expand All @@ -291,12 +299,12 @@ class ApolloClient[F[_], S, CP, CE](
s"Received error for non existant subscription id [$subscriptionId]: $payload".warnF
case Some(emitter) =>
s"Error message received for subscription id [$subscriptionId]:\n$payload".debugF >>
emitter.emitError(payload)
emitter.emitErrors(payload)
}
case s @ _ =>
s"UNEXPECTED Error RECEIVED for subscription [$subscriptionId]. State is: [$s]".raiseError.void
}
case Right(StreamingMessage.FromServer.Complete(subscriptionId)) =>
case Right(StreamingMessage.FromServer.Complete(subscriptionId)) =>
state.get.flatMap {
case Initialized(stateConnectionId, _, subscriptions, _)
if connectionId === stateConnectionId =>
Expand All @@ -317,13 +325,13 @@ class ApolloClient[F[_], S, CP, CE](
case s @ _ =>
s"UNEXPECTED Complete RECEIVED for subscription [$subscriptionId]. State is: [$s]".warnF
}
case Right(StreamingMessage.FromServer.ConnectionKeepAlive) => F.unit
case _ => s"Unexpected message received from server: [$msg]".warnF
case Right(StreamingMessage.FromServer.ConnectionKeepAlive) => F.unit
case _ => s"Unexpected message received from server: [$msg]".warnF
}

// TODO Handle interruptions? Can callbacks be canceled?
override def onClose(connectionId: ConnectionId, event: CE): F[Unit] = {
val error = (new DisconnectedException()).asLeft
val error = DisconnectedException.asLeft
val debug = s"onClose() called with mismatching connectionId.".debugF

reconnectionStrategy(0, event.asRight) match {
Expand Down Expand Up @@ -563,61 +571,55 @@ class ApolloClient[F[_], S, CP, CE](
} yield ()
}

private type DataQueue[D] = Queue[F, Either[Throwable, Option[D]]]
private type DataQueueType[D] = Option[GraphQLCombinedResponse[D]]

private case class QueueEmitter[D: Decoder](
val queue: DataQueue[D],
val request: GraphQLRequest
val queue: Queue[F, DataQueueType[D]],
val request: GraphQLRequest[JsonObject]
) extends Emitter[F] {

def emitData(json: Json): F[Unit] = {
val data = json.as[D]
queue.offer(data.map(_.some))
}
def emitData(dataJson: Json, errors: Option[GraphQLErrors]): F[Unit] =
for {
data <- F.delay(dataJson.as[D]).rethrow
_ <- queue.offer(Ior.fromOptions(errors, data.some))
} yield ()

def emitError(json: Json): F[Unit] = {
// Should `json` be the full response with an `errors` field or just the actual errors?
// Above in `onMessage` we strip them out
// case Right(StreamingMessage.FromServer.DataJson(subscriptionId, data, errors)) =>
// and pass just the errors to `emitError`
// val error = new ResponseException(json.hcursor.get[List[Json]]("errors").getOrElse(Nil))
val error = new ResponseException(json.asArray.fold(List(json))(_.toList))
// TODO When an Error message is received, we terminate the stream and halt the subscription. Do we want that?
s"Emitting error $error".traceF >> queue.offer(error.asLeft)
}
def emitErrors(errors: GraphQLErrors): F[Unit] =
s"Emitting error $errors".traceF >> queue.offer(Ior.left(errors).some)

val halt: F[Unit] =
queue.offer(none.asRight)
val halt: F[Unit] = queue.offer(none)
}

private def buildQueue[D: Decoder](
request: GraphQLRequest
request: GraphQLRequest[JsonObject]
): F[(String, QueueEmitter[D])] =
for {
queue <- Queue.unbounded[F, Either[Throwable, Option[D]]]
queue <- Queue.unbounded[F, DataQueueType[D]]
id <- UUIDGen.randomString[F]
emitter = QueueEmitter(queue, request)
} yield (id, emitter)

// TODO Handle interruptions in subscription and query.

private def subscriptionResource[D: Decoder](
private def subscriptionResource[D: Decoder, R](
subscription: String,
operationName: Option[String],
variables: Option[Json]
): Resource[F, fs2.Stream[F, D]] =
variables: Option[JsonObject],
errorPolicy: ErrorPolicyProcessor[D, R]
): Resource[F, fs2.Stream[F, R]] =
Resource
.make(startSubscription[D](subscription, operationName, variables))(
.make(startSubscription[D, R](subscription, operationName, variables, errorPolicy))(
_.stop()
.handleErrorWith(_.logF("Error stopping subscription"))
)
.map(_.stream)

private def startSubscription[D: Decoder](
private def startSubscription[D: Decoder, R](
subscription: String,
operationName: Option[String],
variables: Option[Json]
): F[GraphQLSubscription[F, D]] =
variables: Option[JsonObject],
errorPolicy: ErrorPolicyProcessor[D, R]
): F[GraphQLSubscription[F, R]] =
state.get.flatMap {
case Initialized(_, _, _, _) =>
val request = GraphQLRequest(subscription, operationName, variables)
Expand Down Expand Up @@ -679,8 +681,8 @@ class ApolloClient[F[_], S, CP, CE](
Stream
.fromQueueUnterminated(emitter.queue)
.evalTap(v => s"Dequeuing for subscription [$id]: [$v]".debugF)
.rethrow // TODO We actually have to manually stop if we receive an error
.unNoneTerminate)
.unNoneTerminate
.evalMap(errorPolicy.process(_)))
.onFinalizeCase(c =>
s"Stream for subscription [$id] finalized with ExitCase [$c]".debugF >>
(c match { // If canceled, we don't want to clean up. Other fibers may be evaluating the stream. Clients can explicitly call `stop()`.
Expand All @@ -692,9 +694,10 @@ class ApolloClient[F[_], S, CP, CE](
createSubscription(stream, id)
}
case Initializing(_, _, _, _, latch) =>
latch.get.rethrow >> startSubscription(subscription, operationName, variables)
latch.get.rethrow >> startSubscription(subscription, operationName, variables, errorPolicy)
case Reestablishing(_, _, _, _, initLatch) =>
initLatch.get.rethrow >> startSubscription(subscription, operationName, variables)
initLatch.get.rethrow >>
startSubscription(subscription, operationName, variables, errorPolicy)
case _ =>
"NOT INITIALIZED".raiseError
}
Expand Down
84 changes: 84 additions & 0 deletions core/src/main/scala/clue/ErrorPolicy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2016-2023 Association of Universities for Research in Astronomy, Inc. (AURA)
// For license information see LICENSE or https://opensource.org/licenses/BSD-3-Clause

package clue

import cats.MonadThrow
import cats.data.Ior
import cats.effect.Sync
import cats.syntax.all._
import clue.model.GraphQLCombinedResponse
import clue.model.GraphQLDataResponse
import clue.model.GraphQLErrors

sealed trait ErrorPolicy {
type ReturnType[D]

def processor[D]: ErrorPolicyProcessor[D, ReturnType[D]]
}

sealed trait ErrorPolicyProcessor[D, R] {
def process[F[_]: Sync](result: GraphQLCombinedResponse[D]): F[R]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need Sync? Can the delays below just be pure, in which case ApplicativeThrow is all that is needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds right, good catch too.

}

object ErrorPolicy {
protected sealed trait Distinct[D] extends ErrorPolicyProcessor[D, D] {
protected def processData[F[_]: Sync, D](data: D): F[D] = Sync[F].delay(data)
protected def processErrors[F[_]: MonadThrow, D](errors: GraphQLErrors): F[D] =
MonadThrow[F].raiseError(ResponseException(errors))
}

object IgnoreOnData extends ErrorPolicy {
type ReturnType[D] = D

def processor[D]: ErrorPolicyProcessor[D, D] = new Distinct[D] {
def process[F[_]: Sync](result: GraphQLCombinedResponse[D]): F[D] =
result match {
case Ior.Left(errors) => processErrors(errors)
case Ior.Right(data) => processData(data)
case Ior.Both(_, data) => processData(data)
}
}
}

object RaiseAlways extends ErrorPolicy {
type ReturnType[D] = D

def processor[D]: ErrorPolicyProcessor[D, D] = new Distinct[D] {
def process[F[_]: Sync](result: GraphQLCombinedResponse[D]): F[ReturnType[D]] =
result match {
case Ior.Left(errors) => processErrors(errors)
case Ior.Right(data) => processData(data)
case Ior.Both(errors, _) => processErrors(errors)
}
}
}

object ReturnAlways extends ErrorPolicy {
type ReturnType[D] = GraphQLCombinedResponse[D]

def processor[D]: ErrorPolicyProcessor[D, GraphQLCombinedResponse[D]] =
new ErrorPolicyProcessor[D, GraphQLCombinedResponse[D]] {

def process[F[_]: Sync](result: GraphQLCombinedResponse[D]): F[ReturnType[D]] =
Sync[F].delay(result)
}
}

object RaiseOnNoData extends ErrorPolicy {
type ReturnType[D] = GraphQLDataResponse[D]

def processor[D]: ErrorPolicyProcessor[D, GraphQLDataResponse[D]] =
new ErrorPolicyProcessor[D, GraphQLDataResponse[D]] {

def process[F[_]: Sync](result: GraphQLCombinedResponse[D]): F[ReturnType[D]] =
result match {
case Ior.Left(errors) => MonadThrow[F].raiseError(ResponseException(errors))
case Ior.Right(data) => Sync[F].delay(GraphQLDataResponse(data, none, none))
case Ior.Both(errors, data) =>
Sync[F].delay(GraphQLDataResponse(data, errors.some, none))
}

}
}
}
Loading