Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocketMockServer and tests for WebSocketEngine #5187

Merged
merged 18 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gradle/libraries.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ ktor-client-okhttp = { group = "io.ktor", name = "ktor-client-okhttp", version.r
ktor-client-darwin = { group = "io.ktor", name = "ktor-client-darwin", version.ref = "ktor" }
ktor-client-js = { group = "io.ktor", name = "ktor-client-js", version.ref = "ktor" }
ktor-client-websockets = { group = "io.ktor", name = "ktor-client-websockets", version.ref = "ktor" }
ktor-server-core = { group = "io.ktor", name = "ktor-server-core", version.ref = "ktor" }
ktor-server-cio = { group = "io.ktor", name = "ktor-server-cio", version.ref = "ktor" }
ktor-server-websockets = { group = "io.ktor", name = "ktor-server-websockets", version.ref = "ktor" }
okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" }
okhttp-logging = { group = "com.squareup.okhttp3", name = "logging-interceptor", version.ref = "okhttp" }
okhttp-mockwebserver = { group = "com.squareup.okhttp3", name = "mockwebserver", version.ref = "okhttp" }
Expand Down
14 changes: 0 additions & 14 deletions libraries/apollo-engine-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ kotlin {
}
}

findByName("commonTest")?.apply {
dependencies {
implementation(project(":apollo-mockserver"))
implementation(project(":apollo-testing-support")) {
because("runTest")
// We have a circular dependency here that creates a warning in JS
// w: duplicate library name: com.apollographql.apollo3:apollo-mockserver
// See https://youtrack.jetbrains.com/issue/KT-51110
// We should probably remove this circular dependency but for the time being, just use excludes
exclude(group = "com.apollographql.apollo3", module = "apollo-runtime")
}
}
}

findByName("jvmMain")?.apply {
dependencies {
api(libs.ktor.client.okhttp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.apollographql.apollo3.network.ws

import com.apollographql.apollo3.annotations.ApolloExperimental
import com.apollographql.apollo3.api.http.HttpHeader
import com.apollographql.apollo3.exception.ApolloNetworkException
import com.apollographql.apollo3.exception.ApolloWebSocketClosedException
import io.ktor.client.HttpClient
import io.ktor.client.plugins.websocket.WebSockets
Expand All @@ -11,14 +12,16 @@ import io.ktor.client.request.url
import io.ktor.http.URLBuilder
import io.ktor.http.URLProtocol
import io.ktor.http.Url
import io.ktor.websocket.CloseReason
import io.ktor.websocket.Frame
import io.ktor.websocket.close
import io.ktor.websocket.readText
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch
import okio.ByteString

Expand Down Expand Up @@ -68,23 +71,24 @@ class KtorWebSocketEngine(
val frame = sendFrameChannel.receive()
try {
send(frame)

// Also close the connection if the sent frame is a close frame
if (frame is Frame.Close) {
receiveMessageChannel.close()
sendFrameChannel.close()
break
}
} catch (e: Exception) {
val closeReason = try {closeReason.await()} catch (e: Exception) {null}
receiveMessageChannel.close(ApolloWebSocketClosedException(code = closeReason?.code?.toInt()
?: -1, reason = closeReason?.message, cause = e))
sendFrameChannel.close(e)
handleNetworkException(e, closeReason, receiveMessageChannel, sendFrameChannel)
break
}
}
}
while (true) {
when (val frame = try {
incoming.receive()
} catch (e: ClosedReceiveChannelException) {
val closeReason = try {closeReason.await()} catch (e: Exception) {null}
receiveMessageChannel.close(ApolloWebSocketClosedException(code = closeReason?.code?.toInt()
?: -1, reason = closeReason?.message, cause = e))
sendFrameChannel.close(e)
} catch (e: Exception) {
handleNetworkException(e, closeReason, receiveMessageChannel, sendFrameChannel)
break
}) {
is Frame.Text -> {
Expand All @@ -110,7 +114,7 @@ class KtorWebSocketEngine(
}
}
} catch (e: Exception) {
receiveMessageChannel.close(e)
receiveMessageChannel.close(ApolloNetworkException(message = "Web socket communication error", platformCause = e))
sendFrameChannel.close(e)
}
}
Expand All @@ -128,10 +132,36 @@ class KtorWebSocketEngine(
}

override fun close() {
sendFrameChannel.trySend(Frame.Close())
sendFrameChannel.close()
sendFrameChannel.trySend(Frame.Close(CloseReason(CLOSE_NORMAL.toShort(), "")))
}
}
}

private suspend fun handleNetworkException(
e: Exception,
deferredCloseReason: Deferred<CloseReason?>,
receiveMessageChannel: Channel<String>,
sendFrameChannel: Channel<Frame>,
) {
if (e is CancellationException) throw e
val closeReason = try {
deferredCloseReason.await()
} catch (e: Exception) {
null
}
val apolloException = if (closeReason != null) {
ApolloWebSocketClosedException(
code = closeReason.code.toInt(),
reason = closeReason.message,
cause = e
)
} else {
ApolloNetworkException(
message = "Web socket communication error",
platformCause = e
)
}
receiveMessageChannel.close(apolloException)
sendFrameChannel.close(apolloException)
}
}

This file was deleted.

5 changes: 5 additions & 0 deletions libraries/apollo-mockserver/api/apollo-mockserver.api
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,8 @@ public abstract interface class com/apollographql/apollo3/mockserver/MockServerI
public abstract fun url (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/apollographql/apollo3/mockserver/WebSocketMockServer_jvmKt {
public static final fun WebSocketMockServer (I)Lcom/apollographql/apollo3/mockserver/WebSocketMockServer;
public static synthetic fun WebSocketMockServer$default (IILjava/lang/Object;)Lcom/apollographql/apollo3/mockserver/WebSocketMockServer;
}

18 changes: 17 additions & 1 deletion libraries/apollo-mockserver/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,29 @@ kotlin {
// w: duplicate library name: com.apollographql.apollo3:apollo-mockserver
// See https://youtrack.jetbrains.com/issue/KT-51110
// We should probably remove this circular dependency but for the time being, just use excludes
exclude(group = "com.apollographql.apollo3", module = "apollo-mockserver")
exclude(group = "com.apollographql.apollo3", module = "apollo-mockserver")
}
implementation(project(":apollo-runtime")) {
because("We need HttpEngine for SocketTest")
}
}
}
}

val commonAppleJvmMain = sourceSets.create("commonAppleJvmMain").apply {
martinbonnin marked this conversation as resolved.
Show resolved Hide resolved
dependsOn(sourceSets.getByName("commonMain"))
dependencies {
implementation(libs.ktor.server.core)
implementation(libs.ktor.server.cio)
implementation(libs.ktor.server.websockets)
}
}
val commonAppleJvmTest = sourceSets.create("commonAppleJvmTest").apply {
dependsOn(sourceSets.getByName("commonTest"))
}
sourceSets.getByName("jvmMain").dependsOn(commonAppleJvmMain)
sourceSets.getByName("jvmTest").dependsOn(commonAppleJvmTest)
sourceSets.getByName("appleMain").dependsOn(commonAppleJvmMain)
sourceSets.getByName("appleTest").dependsOn(commonAppleJvmTest)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.apollographql.apollo3.mockserver

import com.apollographql.apollo3.mockserver.internal.CommonWebSocketMockServer

actual fun WebSocketMockServer(port: Int): WebSocketMockServer = CommonWebSocketMockServer(port)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.apollographql.apollo3.mockserver.internal

import com.apollographql.apollo3.mockserver.WebSocketMockServer
import com.apollographql.apollo3.mockserver.WebSocketMockServer.WebSocketEvent
import com.apollographql.apollo3.mockserver.WebSocketMockServer.WebSocketEvent.BinaryFrame
import com.apollographql.apollo3.mockserver.WebSocketMockServer.WebSocketEvent.Close
import com.apollographql.apollo3.mockserver.WebSocketMockServer.WebSocketEvent.Connect
import com.apollographql.apollo3.mockserver.WebSocketMockServer.WebSocketEvent.Error
import com.apollographql.apollo3.mockserver.WebSocketMockServer.WebSocketEvent.TextFrame
import io.ktor.server.application.Application
import io.ktor.server.application.install
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.server.routing.routing
import io.ktor.server.websocket.WebSockets
import io.ktor.server.websocket.webSocket
import io.ktor.util.toMap
import io.ktor.websocket.CloseReason
import io.ktor.websocket.DefaultWebSocketSession
import io.ktor.websocket.Frame
import io.ktor.websocket.close
import io.ktor.websocket.readBytes
import io.ktor.websocket.readText
import io.ktor.websocket.send
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlin.random.Random

internal class CommonWebSocketMockServer(private val port: Int) : WebSocketMockServer {
private var server: ApplicationEngine? = null

private val _events = MutableSharedFlow<WebSocketEvent>()
override val events: Flow<WebSocketEvent> = _events

override fun start() {
check(server == null) { "Server already started" }
server = embeddedServer(CIO, port) { webSocketServer() }.start(wait = false)
}

override fun stop() {
server?.stop(100, 100)
}

override fun url(): String {
return "ws://127.0.0.1:$port"
}

private class Session(val id: String, val session: DefaultWebSocketSession)

private val sessions = mutableMapOf<String, Session>()

override suspend fun sendText(sessionId: String, text: String) {
sessions[sessionId]?.session?.send(text)
}

override suspend fun sendBinary(sessionId: String, binary: ByteArray) {
sessions[sessionId]?.session?.send(binary)
}

override suspend fun sendClose(sessionId: String, reasonCode: Short?, reasonMessage: String?) {
sessions[sessionId]?.session?.close(CloseReason(reasonCode ?: CloseReason.Codes.NORMAL.code, reasonMessage ?: ""))
}


private fun Application.webSocketServer() {
install(WebSockets)
routing {
webSocket("/") {
val sessionId = Random.nextInt().toString()
sessions[sessionId] = Session(sessionId, this)
try {
_events.emit(Connect(sessionId = sessionId, headers = call.request.headers.toMap().mapValues { it.value.first() }))
for (frame in incoming) {
when (frame) {
is Frame.Text -> _events.emit(TextFrame(sessionId, frame.readText()))
is Frame.Binary -> _events.emit(BinaryFrame(sessionId, frame.readBytes()))
else -> {}
}
}
val closeReason = closeReason.await()
_events.emit(Close(sessionId, closeReason?.code, closeReason?.message))
} catch (e: ClosedReceiveChannelException) {
val closeReason = closeReason.await()
_events.emit(Close(sessionId, closeReason?.code, closeReason?.message))
} catch (e: Throwable) {
_events.emit(Error(sessionId, e))
} finally {
sessions.remove(sessionId)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.apollographql.apollo3.mockserver

import com.apollographql.apollo3.annotations.ApolloExperimental
import kotlinx.coroutines.flow.Flow
import kotlin.random.Random

@ApolloExperimental
interface WebSocketMockServer {
@ApolloExperimental
sealed class WebSocketEvent {
@ApolloExperimental
class Connect(val sessionId: String, val headers: Map<String, String>) : WebSocketEvent()

@ApolloExperimental
class TextFrame(val sessionId: String, val text: String) : WebSocketEvent()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming bikeshedding: I thought a Text message was split accross multiple frames and therefore this would need to be called TextMessage (or just Text)? Similarly, WebSocketEvent could be named WebSocketMessage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In events however you can receive "frames" (which I agree it could make sense to rename "messages"), but also other things that are not really messages (connect, close, error) IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha 👍

I was expecting an API similar to MockWebserver where we don't have anything like Connect or Error. All we can get is the HTTP request.

Similarly for WebSockets, I would expect to be able to record the initial HTTP handshake request as well as subsequent Close/Ping/Pong control frames (or messages because these messages are always a single frame) and data messages (not individual frames).

Just my 2 cents, feel free to discard.


@ApolloExperimental
class BinaryFrame(val sessionId: String, val bytes: ByteArray) : WebSocketEvent()

@ApolloExperimental
class Close(val sessionId: String, val reasonCode: Short?, val reasonMessage: String?) : WebSocketEvent()

@ApolloExperimental
class Error(val sessionId: String, val cause: Throwable) : WebSocketEvent()
}

fun start()
fun url(): String

val events: Flow<WebSocketEvent>

suspend fun sendText(sessionId: String, text: String)
suspend fun sendBinary(sessionId: String, binary: ByteArray)
suspend fun sendClose(sessionId: String, reasonCode: Short? = null, reasonMessage: String? = null)
fun stop()
martinbonnin marked this conversation as resolved.
Show resolved Hide resolved
}

@ApolloExperimental
expect fun WebSocketMockServer(port: Int = Random.nextInt(10000, 20000)): WebSocketMockServer
martinbonnin marked this conversation as resolved.
Show resolved Hide resolved
martinbonnin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.apollographql.apollo3.mockserver

actual fun WebSocketMockServer(port: Int): WebSocketMockServer {
TODO("Not yet implemented")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.apollographql.apollo3.mockserver

import com.apollographql.apollo3.mockserver.internal.CommonWebSocketMockServer

actual fun WebSocketMockServer(port: Int): WebSocketMockServer = CommonWebSocketMockServer(port)
Loading
Loading