Skip to content

Commit

Permalink
feat: rewrite event system
Browse files Browse the repository at this point in the history
  • Loading branch information
d1snin committed Sep 21, 2024
1 parent 13b12e9 commit 12807a8
Show file tree
Hide file tree
Showing 31 changed files with 521 additions and 392 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ build/
!**/src/main/**/build/
!**/src/test/**/build/
kotlin-js-store
.kotlin

### STS ###
.apt_generated
Expand Down
10 changes: 7 additions & 3 deletions e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/EventDeliveryTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package dev.d1s.ktor.events
import dev.d1s.ktor.events.client.ClientWebSocketEvent
import dev.d1s.ktor.events.client.receiveWebSocketEvent
import dev.d1s.ktor.events.client.webSocketEvents
import dev.d1s.ktor.events.configuration.eventChannel
import dev.d1s.ktor.events.configuration.pool
import dev.d1s.ktor.events.configuration.runTestServer
import dev.d1s.ktor.events.configuration.webSocketClient
import dev.d1s.ktor.events.server.event
import dev.d1s.ktor.events.server.entity.event
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -71,6 +71,10 @@ class EventDeliveryTest {

private suspend fun sendTestEvent() {
val event = event(testServerEventReference) { parameters ->
log.i {
"Got parameters: $parameters"
}

val testParameterData = parameters[TEST_CLIENT_PARAMETER_KEY]

assertNotNull(testParameterData)
Expand All @@ -79,6 +83,6 @@ class EventDeliveryTest {
testEventData
}

eventChannel.send(event)
pool.push(event)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ val webSocketClient = HttpClient(CIO) {

install(WebSocketEvents) {
url = "ws://localhost:$TEST_SERVER_PORT"
clientId = "test-client"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@

package dev.d1s.ktor.events.configuration

import dev.d1s.ktor.events.server.WebSocketEventChannel
import dev.d1s.ktor.events.server.WebSocketEvents
import dev.d1s.ktor.events.server.webSocketEvents
import dev.d1s.ktor.events.server.pool.InMemoryEventPool
import dev.d1s.ktor.events.server.route.webSocketEvents
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.routing.*

const val TEST_SERVER_PORT = 9639
const val TEST_SERVER_PORT = 20324

val eventChannel = WebSocketEventChannel()
val pool = InMemoryEventPool()

fun runTestServer() = embeddedServer(Netty, environment).start()

private val environment = applicationEngineEnvironment {
module {
install(WebSocketEvents) {
channel = eventChannel
eventPool = pool
}

routing {
Expand Down
10 changes: 5 additions & 5 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ projectVersion=1.1.1-dev

moduleDocsPath=./docs/module.md

kotlinVersion=1.9.22
kotlinVersion=2.0.0

dokkaVersion=1.9.10
dokkaVersion=1.9.20

versionsPluginVersion=0.51.0

kmLogVersion=1.3.0
logbackVersion=1.4.14
kmLogVersion=1.5.0
logbackVersion=1.5.8

ktorVersion=2.3.5

kotlinxSerializationVersion=1.6.0
kotlinxSerializationVersion=1.7.3
4 changes: 4 additions & 0 deletions ktor-ws-events-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ kotlin {
sourceSets {
val commonMain by getting {
dependencies {
val kmLogVersion: String by project

val ktorVersion: String by project

val kotlinxSerializationVersion: String by project

api(project(":ktor-ws-events-commons"))

implementation("org.lighthousegames:logging:$kmLogVersion")

implementation("io.ktor:ktor-client-core:$ktorVersion")
implementation("io.ktor:ktor-client-websockets:$ktorVersion")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package dev.d1s.ktor.events.client

import dev.d1s.ktor.events.commons.AbstractEvent
import dev.d1s.ktor.events.commons.EventReference
import dev.d1s.ktor.events.commons.Identifier
import dev.d1s.ktor.events.commons.UnixTime
import kotlinx.serialization.Serializable

@Serializable
public data class ClientWebSocketEvent<T>(
val reference: EventReference,
override val id: Identifier,
override val reference: EventReference,
override val initiated: UnixTime,
val data: T
)
) : AbstractEvent
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package dev.d1s.ktor.events.client

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import org.lighthousegames.logging.logging

private const val DEFAULT_DELAY = 5_000L

private val logger = logging()

public suspend fun <R> withRetries(
continuous: Boolean = false,
onError: suspend (Throwable) -> Unit = {},
block: suspend () -> R
) {
coroutineScope {
while (true) {
try {
block()

if (!continuous) {
break
}
} catch (e: Throwable) {
logger.d {
"Error while executing; ${e::class.simpleName}: ${e.message}"
}

onError(e)

delay(DEFAULT_DELAY)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import io.ktor.client.plugins.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import org.lighthousegames.logging.logging

private val logger = logging()

/**
* Opens a [block] with [DefaultClientWebSocketSession] associated with the given [event reference][reference] and optional [path].
Expand Down Expand Up @@ -59,18 +62,30 @@ public suspend fun HttpClient.webSocketEvents(
}

url.parameters.appendMissing(parameters)

header(Routes.CLIENT_ID_HEADER, webSocketEventsConfiguration.clientId)
}

val url = URLBuilder(webSocketEventsConfiguration.requiredBaseUrl).apply {
val configuredPath = path.replace(Routes.GROUP_SEGMENT_PLACEHOLDER, reference.group)
path(configuredPath)
}.buildString()

webSocket(
urlString = url,
request = requestConfiguration,
block = block
)
logger.d {
"Will start WS session at $url"
}

withRetries(onError = {
logger.w {
"Error opening WS session: ${it.message}"
}
}) {
webSocket(
urlString = url,
request = requestConfiguration,
block = block
)
}
}

private fun HttpClient.checkPluginInstalled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,38 @@
package dev.d1s.ktor.events.client

import io.ktor.client.plugins.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.lighthousegames.logging.KmLog
import org.lighthousegames.logging.logging

public val EventReceivingScope: CoroutineScope = CoroutineScope(Dispatchers.Default)

public val EventReceiverLog: KmLog = logging()

/**
* Dequeues a frame containing [ClientWebSocketEvent] and tries to deserialize it.
*
* @see webSocketEvents
*/
public suspend inline fun <reified T> DefaultClientWebSocketSession.receiveWebSocketEvent(): ClientWebSocketEvent<T> =
receiveDeserialized()
receiveDeserialized<ClientWebSocketEvent<T>>()

/**
* Dequeues frames containing [ClientWebSocketEvent] and tries to deserialize it. Will retry if something went wrong while receiving a frame.
*
* @see webSocketEvents
*/
public suspend inline fun <reified T> DefaultClientWebSocketSession.receiveWebSocketEvents(crossinline receiver: suspend (ClientWebSocketEvent<T>) -> Unit): Job =
EventReceivingScope.launch {
withRetries(continuous = true, onError = {
EventReceiverLog.w {
"Error receiving web socket events: ${it.message}"
}
}) {
val event = receiveWebSocketEvent<T>()
receiver(event)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package dev.d1s.ktor.events.client

import dev.d1s.ktor.events.commons.randomId
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.plugins.api.*
Expand All @@ -32,8 +33,12 @@ public val WebSocketEvents: ClientPlugin<WebSocketEventsConfiguration> =

public class WebSocketEventsConfiguration {

@Suppress("MemberVisibilityCanBePrivate")
public var url: String? = null

@Suppress("MemberVisibilityCanBePrivate")
public var clientId: String = randomId

internal val requiredBaseUrl
get() = requireNotNull(url) {
"URL is not configured."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dev.d1s.ktor.events.commons

public interface AbstractEvent {

public val id: Identifier

public val reference: EventReference

public val initiated: UnixTime
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,15 @@ public data class EventReference(

other as EventReference

if (group != other.group) return false

val otherPrincipal = other.principal

if (principal != null) {
return if (otherPrincipal != null) {
principal == otherPrincipal
} else {
false
}
}

return true
return group == other.group
}

override fun hashCode(): Int {
var result = group.hashCode()
result = 31 * result + (principal?.hashCode() ?: 0)
return result
}
override fun hashCode(): Int =
group.hashCode()
}

/**
* A shortcut. Returns `EventReference(group, principal, parameters)`
* A shortcut.
*
* @see EventReference
* @see ClientParameters
Expand All @@ -107,4 +92,4 @@ public fun ref(
group: EventGroup,
principal: EventPrincipal = null,
clientParameters: ClientParameters = mapOf()
): EventReference = EventReference(group, principal, clientParameters)
): EventReference = EventReference(group = group, principal = principal, parameters = clientParameters)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.d1s.ktor.events.commons

public typealias Identifier = String

private val chars = ('0'..'9') + ('a'..'z')

public val randomId: Identifier
get() = chars.shuffled().take(16).joinToString("")
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package dev.d1s.ktor.events.commons

public typealias UnixTime = Long
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package dev.d1s.ktor.events.commons.util

public object Routes {

public const val CLIENT_ID_HEADER: String = "X-Client-Id"

public const val GROUP_PATH_PARAMETER: String = "group"

public const val PRINCIPAL_QUERY_PARAMETER: String = "principal"

public const val GROUP_SEGMENT_PLACEHOLDER: String = "{${GROUP_PATH_PARAMETER}}"

public const val DEFAULT_EVENTS_ROUTE: String = "/events/$GROUP_SEGMENT_PLACEHOLDER"

public const val PRINCIPAL_QUERY_PARAMETER: String = "principal"
}
Loading

0 comments on commit 12807a8

Please sign in to comment.