diff --git a/.gitignore b/.gitignore index d93105b..86d022a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ build/ !**/src/main/**/build/ !**/src/test/**/build/ kotlin-js-store +.kotlin ### STS ### .apt_generated diff --git a/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/EventDeliveryTest.kt b/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/EventDeliveryTest.kt index cc3c4e2..5e8685c 100644 --- a/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/EventDeliveryTest.kt +++ b/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/EventDeliveryTest.kt @@ -19,10 +19,10 @@ package dev.d1s.ktor.events import dev.d1s.ktor.events.client.ClientWebSocketEvent import dev.d1s.ktor.events.client.receiveWebSocketEvent import dev.d1s.ktor.events.client.webSocketEvents -import dev.d1s.ktor.events.configuration.eventChannel +import dev.d1s.ktor.events.configuration.pool import dev.d1s.ktor.events.configuration.runTestServer import dev.d1s.ktor.events.configuration.webSocketClient -import dev.d1s.ktor.events.server.event +import dev.d1s.ktor.events.server.entity.event import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -71,6 +71,10 @@ class EventDeliveryTest { private suspend fun sendTestEvent() { val event = event(testServerEventReference) { parameters -> + log.i { + "Got parameters: $parameters" + } + val testParameterData = parameters[TEST_CLIENT_PARAMETER_KEY] assertNotNull(testParameterData) @@ -79,6 +83,6 @@ class EventDeliveryTest { testEventData } - eventChannel.send(event) + pool.push(event) } } \ No newline at end of file diff --git a/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestClient.kt b/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestClient.kt index df6ad86..6ce9e18 100644 --- a/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestClient.kt +++ b/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestClient.kt @@ -29,5 +29,6 @@ val webSocketClient = HttpClient(CIO) { install(WebSocketEvents) { url = "ws://localhost:$TEST_SERVER_PORT" + clientId = "test-client" } } diff --git a/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestServer.kt b/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestServer.kt index 15b919d..c27752b 100644 --- a/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestServer.kt +++ b/e2e/src/jvmTest/kotlin/dev/d1s/ktor/events/configuration/TestServer.kt @@ -16,24 +16,24 @@ package dev.d1s.ktor.events.configuration -import dev.d1s.ktor.events.server.WebSocketEventChannel import dev.d1s.ktor.events.server.WebSocketEvents -import dev.d1s.ktor.events.server.webSocketEvents +import dev.d1s.ktor.events.server.pool.InMemoryEventPool +import dev.d1s.ktor.events.server.route.webSocketEvents import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.netty.* import io.ktor.server.routing.* -const val TEST_SERVER_PORT = 9639 +const val TEST_SERVER_PORT = 20324 -val eventChannel = WebSocketEventChannel() +val pool = InMemoryEventPool() fun runTestServer() = embeddedServer(Netty, environment).start() private val environment = applicationEngineEnvironment { module { install(WebSocketEvents) { - channel = eventChannel + eventPool = pool } routing { diff --git a/gradle.properties b/gradle.properties index e9548bc..8ed28c2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,15 +7,15 @@ projectVersion=1.1.1-dev moduleDocsPath=./docs/module.md -kotlinVersion=1.9.22 +kotlinVersion=2.0.0 -dokkaVersion=1.9.10 +dokkaVersion=1.9.20 versionsPluginVersion=0.51.0 -kmLogVersion=1.3.0 -logbackVersion=1.4.14 +kmLogVersion=1.5.0 +logbackVersion=1.5.8 ktorVersion=2.3.5 -kotlinxSerializationVersion=1.6.0 \ No newline at end of file +kotlinxSerializationVersion=1.7.3 \ No newline at end of file diff --git a/ktor-ws-events-client/build.gradle.kts b/ktor-ws-events-client/build.gradle.kts index 762dced..bd4fdf9 100644 --- a/ktor-ws-events-client/build.gradle.kts +++ b/ktor-ws-events-client/build.gradle.kts @@ -40,12 +40,16 @@ kotlin { sourceSets { val commonMain by getting { dependencies { + val kmLogVersion: String by project + val ktorVersion: String by project val kotlinxSerializationVersion: String by project api(project(":ktor-ws-events-commons")) + implementation("org.lighthousegames:logging:$kmLogVersion") + implementation("io.ktor:ktor-client-core:$ktorVersion") implementation("io.ktor:ktor-client-websockets:$ktorVersion") diff --git a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/ClientWebSocketEvent.kt b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/ClientWebSocketEvent.kt index 138572a..37277cb 100644 --- a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/ClientWebSocketEvent.kt +++ b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/ClientWebSocketEvent.kt @@ -1,10 +1,15 @@ package dev.d1s.ktor.events.client +import dev.d1s.ktor.events.commons.AbstractEvent import dev.d1s.ktor.events.commons.EventReference +import dev.d1s.ktor.events.commons.Identifier +import dev.d1s.ktor.events.commons.UnixTime import kotlinx.serialization.Serializable @Serializable public data class ClientWebSocketEvent<T>( - val reference: EventReference, + override val id: Identifier, + override val reference: EventReference, + override val initiated: UnixTime, val data: T -) +) : AbstractEvent \ No newline at end of file diff --git a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/Retries.kt b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/Retries.kt new file mode 100644 index 0000000..bbe602b --- /dev/null +++ b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/Retries.kt @@ -0,0 +1,35 @@ +package dev.d1s.ktor.events.client + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import org.lighthousegames.logging.logging + +private const val DEFAULT_DELAY = 5_000L + +private val logger = logging() + +public suspend fun <R> withRetries( + continuous: Boolean = false, + onError: suspend (Throwable) -> Unit = {}, + block: suspend () -> R +) { + coroutineScope { + while (true) { + try { + block() + + if (!continuous) { + break + } + } catch (e: Throwable) { + logger.d { + "Error while executing; ${e::class.simpleName}: ${e.message}" + } + + onError(e) + + delay(DEFAULT_DELAY) + } + } + } +} \ No newline at end of file diff --git a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventHandlerBuilder.kt b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventHandlerBuilder.kt index 4e14e60..c994f93 100644 --- a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventHandlerBuilder.kt +++ b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventHandlerBuilder.kt @@ -23,6 +23,9 @@ import io.ktor.client.plugins.* import io.ktor.client.plugins.websocket.* import io.ktor.client.request.* import io.ktor.http.* +import org.lighthousegames.logging.logging + +private val logger = logging() /** * Opens a [block] with [DefaultClientWebSocketSession] associated with the given [event reference][reference] and optional [path]. @@ -59,6 +62,8 @@ public suspend fun HttpClient.webSocketEvents( } url.parameters.appendMissing(parameters) + + header(Routes.CLIENT_ID_HEADER, webSocketEventsConfiguration.clientId) } val url = URLBuilder(webSocketEventsConfiguration.requiredBaseUrl).apply { @@ -66,11 +71,21 @@ public suspend fun HttpClient.webSocketEvents( path(configuredPath) }.buildString() - webSocket( - urlString = url, - request = requestConfiguration, - block = block - ) + logger.d { + "Will start WS session at $url" + } + + withRetries(onError = { + logger.w { + "Error opening WS session: ${it.message}" + } + }) { + webSocket( + urlString = url, + request = requestConfiguration, + block = block + ) + } } private fun HttpClient.checkPluginInstalled() { diff --git a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventReceiver.kt b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventReceiver.kt index 8b26f7b..f2f6873 100644 --- a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventReceiver.kt +++ b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventReceiver.kt @@ -17,6 +17,16 @@ package dev.d1s.ktor.events.client import io.ktor.client.plugins.websocket.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import org.lighthousegames.logging.KmLog +import org.lighthousegames.logging.logging + +public val EventReceivingScope: CoroutineScope = CoroutineScope(Dispatchers.Default) + +public val EventReceiverLog: KmLog = logging() /** * Dequeues a frame containing [ClientWebSocketEvent] and tries to deserialize it. @@ -24,4 +34,21 @@ import io.ktor.client.plugins.websocket.* * @see webSocketEvents */ public suspend inline fun <reified T> DefaultClientWebSocketSession.receiveWebSocketEvent(): ClientWebSocketEvent<T> = - receiveDeserialized() \ No newline at end of file + receiveDeserialized<ClientWebSocketEvent<T>>() + +/** + * Dequeues frames containing [ClientWebSocketEvent] and tries to deserialize it. Will retry if something went wrong while receiving a frame. + * + * @see webSocketEvents + */ +public suspend inline fun <reified T> DefaultClientWebSocketSession.receiveWebSocketEvents(crossinline receiver: suspend (ClientWebSocketEvent<T>) -> Unit): Job = + EventReceivingScope.launch { + withRetries(continuous = true, onError = { + EventReceiverLog.w { + "Error receiving web socket events: ${it.message}" + } + }) { + val event = receiveWebSocketEvent<T>() + receiver(event) + } + } \ No newline at end of file diff --git a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventsPlugin.kt b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventsPlugin.kt index 9d62ef6..08acab7 100644 --- a/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventsPlugin.kt +++ b/ktor-ws-events-client/src/commonMain/kotlin/dev/d1s/ktor/events/client/WebSocketEventsPlugin.kt @@ -16,6 +16,7 @@ package dev.d1s.ktor.events.client +import dev.d1s.ktor.events.commons.randomId import io.ktor.client.* import io.ktor.client.plugins.* import io.ktor.client.plugins.api.* @@ -32,8 +33,12 @@ public val WebSocketEvents: ClientPlugin<WebSocketEventsConfiguration> = public class WebSocketEventsConfiguration { + @Suppress("MemberVisibilityCanBePrivate") public var url: String? = null + @Suppress("MemberVisibilityCanBePrivate") + public var clientId: String = randomId + internal val requiredBaseUrl get() = requireNotNull(url) { "URL is not configured." diff --git a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/AbstractEvent.kt b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/AbstractEvent.kt new file mode 100644 index 0000000..8304f63 --- /dev/null +++ b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/AbstractEvent.kt @@ -0,0 +1,10 @@ +package dev.d1s.ktor.events.commons + +public interface AbstractEvent { + + public val id: Identifier + + public val reference: EventReference + + public val initiated: UnixTime +} \ No newline at end of file diff --git a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/EventReference.kt b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/EventReference.kt index 56e6bc6..11735be 100644 --- a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/EventReference.kt +++ b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/EventReference.kt @@ -75,30 +75,15 @@ public data class EventReference( other as EventReference - if (group != other.group) return false - - val otherPrincipal = other.principal - - if (principal != null) { - return if (otherPrincipal != null) { - principal == otherPrincipal - } else { - false - } - } - - return true + return group == other.group } - override fun hashCode(): Int { - var result = group.hashCode() - result = 31 * result + (principal?.hashCode() ?: 0) - return result - } + override fun hashCode(): Int = + group.hashCode() } /** - * A shortcut. Returns `EventReference(group, principal, parameters)` + * A shortcut. * * @see EventReference * @see ClientParameters @@ -107,4 +92,4 @@ public fun ref( group: EventGroup, principal: EventPrincipal = null, clientParameters: ClientParameters = mapOf() -): EventReference = EventReference(group, principal, clientParameters) \ No newline at end of file +): EventReference = EventReference(group = group, principal = principal, parameters = clientParameters) \ No newline at end of file diff --git a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/Identifier.kt b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/Identifier.kt new file mode 100644 index 0000000..1337a91 --- /dev/null +++ b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/Identifier.kt @@ -0,0 +1,8 @@ +package dev.d1s.ktor.events.commons + +public typealias Identifier = String + +private val chars = ('0'..'9') + ('a'..'z') + +public val randomId: Identifier + get() = chars.shuffled().take(16).joinToString("") \ No newline at end of file diff --git a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/Time.kt b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/Time.kt new file mode 100644 index 0000000..15ebddd --- /dev/null +++ b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/Time.kt @@ -0,0 +1,3 @@ +package dev.d1s.ktor.events.commons + +public typealias UnixTime = Long \ No newline at end of file diff --git a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/util/Routes.kt b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/util/Routes.kt index 346e8fd..11e9de3 100644 --- a/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/util/Routes.kt +++ b/ktor-ws-events-commons/src/commonMain/kotlin/dev/d1s/ktor/events/commons/util/Routes.kt @@ -18,10 +18,13 @@ package dev.d1s.ktor.events.commons.util public object Routes { + public const val CLIENT_ID_HEADER: String = "X-Client-Id" + public const val GROUP_PATH_PARAMETER: String = "group" + + public const val PRINCIPAL_QUERY_PARAMETER: String = "principal" + public const val GROUP_SEGMENT_PLACEHOLDER: String = "{${GROUP_PATH_PARAMETER}}" public const val DEFAULT_EVENTS_ROUTE: String = "/events/$GROUP_SEGMENT_PLACEHOLDER" - - public const val PRINCIPAL_QUERY_PARAMETER: String = "principal" } \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt new file mode 100644 index 0000000..b4859de --- /dev/null +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/EventProcessor.kt @@ -0,0 +1,128 @@ +/* + * Copyright 2022-2024 Mikhail Titov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.d1s.ktor.events.server + +import dev.d1s.ktor.events.commons.Identifier +import dev.d1s.ktor.events.server.dto.WebSocketEventDto +import dev.d1s.ktor.events.server.entity.EventSendingConnection +import dev.d1s.ktor.events.server.entity.ServerWebSocketEvent +import dev.d1s.ktor.events.server.pool.EventPool +import dev.d1s.ktor.events.server.util.clientId +import dev.d1s.ktor.events.server.util.eventPool +import io.ktor.server.websocket.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.lighthousegames.logging.logging + +internal interface EventProcessor { + + fun process(connection: EventSendingConnection) +} + +internal class DefaultEventProcessor : EventProcessor { + + private val log = logging() + + private val eventProcessorScope = CoroutineScope(Dispatchers.Default) + + override fun process(connection: EventSendingConnection) { + eventProcessorScope.launch { + val reference = connection.reference + val call = connection.call + val pool = call.application.attributes.eventPool + val client = call.clientId + + log.d { + "Processing events for reference: $reference" + } + + val unreceived = pool.get(reference, client) + + log.d { + "Sending previously unreceived events: $unreceived" + } + + connection.sendAll(pool, client, unreceived) + + pool.onEvent(reference, client) { + log.d { + "Got event from event channel: $it" + } + + connection.sendEvent(pool, client, it) + } + } + } + + private suspend fun EventSendingConnection.sendAll( + pool: EventPool, + client: Identifier, + events: List<ServerWebSocketEvent> + ) { + events.forEach { + sendEvent(pool, client, it) + } + } + + private suspend fun EventSendingConnection.sendEvent( + pool: EventPool, + client: Identifier, + event: ServerWebSocketEvent + ) { + log.d { + "Sending event... Connection: $this" + } + + (session as? WebSocketServerSession)?.let { session -> + processSession(pool, client, session, event) + } + } + + @OptIn(DelicateCoroutinesApi::class) + private suspend fun EventSendingConnection.processSession( + pool: EventPool, + client: Identifier, + session: WebSocketServerSession, + event: ServerWebSocketEvent + ) { + if (!session.outgoing.isClosedForSend) { + sendEventDto(session, event) + + pool.confirm(event.id, client) + } else { + log.d { + "Couldn't send event. Connection is closed." + } + } + } + + private suspend fun EventSendingConnection.sendEventDto( + session: WebSocketServerSession, + event: ServerWebSocketEvent + ) { + val dto = WebSocketEventDto( + id = event.id, + reference = reference, + initiated = event.initiated, + data = event.dataSupplier(reference.parameters) + ) + + session.sendSerialized(dto) + } +} \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/InternalComponents.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/InternalComponents.kt deleted file mode 100644 index a479158..0000000 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/InternalComponents.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2022-2024 Mikhail Titov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.d1s.ktor.events.server - -import io.ktor.util.* - -internal fun webSocketEventConsumer() = - DefaultWebSocketEventConsumer() - -internal fun webSocketEventSendingConnectionPool() = - DefaultWebSocketEventSendingConnectionPool() - -internal var Attributes.webSocketEventConsumer: WebSocketEventConsumer - get() = this[Key.WebSocketEventConsumer] - set(value) = this.put(Key.WebSocketEventConsumer, value) - -private object Key { - - val WebSocketEventConsumer = - AttributeKey<WebSocketEventConsumer>("${WEBSOCKET_EVENTS_PLUGIN_NAME}_websocket-event-consumer") -} \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventChannel.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventChannel.kt deleted file mode 100644 index 3de1ef3..0000000 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventChannel.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2022-2024 Mikhail Titov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.d1s.ktor.events.server - -import kotlinx.coroutines.channels.Channel - -/** - * @see WebSocketEventSender - */ -public typealias WebSocketEventChannel = Channel<ServerWebSocketEvent> - -public fun WebSocketEventChannel(): WebSocketEventChannel = Channel() \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventConsumer.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventConsumer.kt deleted file mode 100644 index 2b5d514..0000000 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventConsumer.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2022-2024 Mikhail Titov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.d1s.ktor.events.server - -import io.ktor.server.websocket.* -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.launch -import org.lighthousegames.logging.logging - -internal typealias WebSocketEventReceiver = ReceiveChannel<ServerWebSocketEvent> - -internal interface WebSocketEventConsumer { - - fun launch(eventReceivingScope: CoroutineScope, channel: WebSocketEventReceiver) - - fun addConnection(connection: WebSocketEventSendingConnection) - - fun removeConnection(connection: WebSocketEventSendingConnection) -} - -internal class DefaultWebSocketEventConsumer : WebSocketEventConsumer { - - private val connectionPool = webSocketEventSendingConnectionPool() - - private val log = logging() - - override fun launch(eventReceivingScope: CoroutineScope, channel: WebSocketEventReceiver) { - log.d { - "Launching WebSocket event consumer..." - } - - eventReceivingScope.launch { - handleEvents(eventReceivingScope, channel) - } - } - - override fun addConnection(connection: WebSocketEventSendingConnection) { - connectionPool += connection - } - - override fun removeConnection(connection: WebSocketEventSendingConnection) { - connectionPool -= connection - } - - private suspend fun handleEvents(eventReceivingScope: CoroutineScope, channel: WebSocketEventReceiver) { - for (event in channel) { - log.d { - "Consumed event $event" - } - - processEvent(eventReceivingScope, event) - } - } - - private fun processEvent(eventReceivingScope: CoroutineScope, event: ServerWebSocketEvent) { - val connections = connectionPool[event.reference] - - connections.parallelStream().forEach { connection -> - eventReceivingScope.launch { - connection.sendEvent(event) - } - } - } - - private suspend fun WebSocketEventSendingConnection.sendEvent(event: ServerWebSocketEvent) { - log.d { - "Sending event... Connection: $this" - } - - (session as? WebSocketServerSession)?.let { session -> - processSession(session, event) - } - } - - @OptIn(DelicateCoroutinesApi::class) - private suspend fun WebSocketEventSendingConnection.processSession( - session: WebSocketServerSession, - event: ServerWebSocketEvent - ) { - if (!session.outgoing.isClosedForSend) { - sendEventDto(session, event) - } else { - log.d { - "Couldn't send event. Connection is closed." - } - - connectionPool -= reference - } - } - - private suspend fun WebSocketEventSendingConnection.sendEventDto( - session: WebSocketServerSession, - event: ServerWebSocketEvent - ) { - val dto = WebSocketEventDto( - reference = reference, - data = event.dataSupplier(reference.parameters) - ) - - session.sendSerialized(dto) - } -} \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventDto.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventDto.kt deleted file mode 100644 index 4ad343a..0000000 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventDto.kt +++ /dev/null @@ -1,8 +0,0 @@ -package dev.d1s.ktor.events.server - -import dev.d1s.ktor.events.commons.EventReference - -internal data class WebSocketEventDto( - val reference: EventReference, - val data: Any -) diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSender.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSender.kt deleted file mode 100644 index 84a92bf..0000000 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSender.kt +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2022-2024 Mikhail Titov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.d1s.ktor.events.server - -import kotlinx.coroutines.channels.SendChannel - -public typealias WebSocketEventSender = SendChannel<ServerWebSocketEvent> \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSendingConnectionPool.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSendingConnectionPool.kt deleted file mode 100644 index 9fd04a8..0000000 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSendingConnectionPool.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2022-2024 Mikhail Titov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.d1s.ktor.events.server - -import dev.d1s.ktor.events.commons.EventReference -import org.lighthousegames.logging.logging -import java.util.concurrent.CopyOnWriteArrayList - -internal interface WebSocketEventSendingConnectionPool { - - operator fun plusAssign(connection: WebSocketEventSendingConnection) - - operator fun minusAssign(connection: WebSocketEventSendingConnection) - - operator fun get(reference: EventReference): List<WebSocketEventSendingConnection> - - operator fun minusAssign(reference: EventReference) -} - -internal class DefaultWebSocketEventSendingConnectionPool : WebSocketEventSendingConnectionPool { - - private val connections = CopyOnWriteArrayList<WebSocketEventSendingConnection>() - - private val log = logging() - - override fun plusAssign(connection: WebSocketEventSendingConnection) { - connections += connection - - log.d { - "Added connection with reference: ${connection.reference}. Connections: $connections" - } - } - - override fun minusAssign(connection: WebSocketEventSendingConnection) { - connections -= connection - - log.d { - "Removed connection with reference: ${connection.reference}. Connections: $connections" - } - } - - override fun minusAssign(reference: EventReference) { - val connections = this[reference] - - connections.forEach { connection -> - this -= connection - } - } - - override fun get(reference: EventReference): List<WebSocketEventSendingConnection> { - log.d { - "Filtering connections $connections by reference $reference" - } - - val connections = connections.filter { - it.reference == reference - } - - log.d { - "Found connections $connections. Wanted connections with reference $reference" - } - - return connections - } -} \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt index ee6a1e6..7459651 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsPlugin.kt @@ -16,11 +16,12 @@ package dev.d1s.ktor.events.server +import dev.d1s.ktor.events.server.pool.EventPool +import dev.d1s.ktor.events.server.util.eventPool +import dev.d1s.ktor.events.server.util.eventProcessor import io.ktor.serialization.jackson.* import io.ktor.server.application.* import io.ktor.server.websocket.* -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import org.lighthousegames.logging.logging internal const val WEBSOCKET_EVENTS_PLUGIN_NAME = "websocket-events" @@ -29,31 +30,28 @@ private val log = logging() /** * Enables support for event streaming over WebSockets. - * Supposed to be used in pair with [webSocketEvents] route builder. - * Once the plugin is installed, it will fire a job listening for events - * in the provided [WebSocketEventsConfiguration.channel]. + * Supposed to be used in pair with [dev.d1s.ktor.events.server.route.webSocketEvents] route builder. * * Example usage: * ```kotlin - * val eventChannel = WebSocketEventChannel() + * val pool = InMemoryEventPool() * * install(WebSocketEvents) { - * channel = eventChannel + * eventPool = pool * } * * routing { - * webSockets() + * webSocketEvents() * } * * val createdBook = createBook() * val reference = ref("book_created") * val event = event(reference, createdBook) * - * eventChannel.send(event) + * pool.push(event) * ``` - * @see webSocketEvents - * @see WebSocketEventChannel * @see WebSocketEventsConfiguration + * @see EventPool */ public val WebSocketEvents: ApplicationPlugin<WebSocketEventsConfiguration> = createApplicationPlugin(WEBSOCKET_EVENTS_PLUGIN_NAME, ::WebSocketEventsConfiguration) { @@ -61,34 +59,19 @@ public val WebSocketEvents: ApplicationPlugin<WebSocketEventsConfiguration> = "Installing WebSocketEvents plugin" } - val eventReceivingScope = pluginConfig.eventReceivingScope - val channel = pluginConfig.requiredChannel - if (!application.hasWebSocketsPlugin()) { application.installWebSockets() } - val webSocketEventConsumer = webSocketEventConsumer().apply { - application.attributes.webSocketEventConsumer = this - } - - webSocketEventConsumer.launch(eventReceivingScope, channel) + application.attributes.eventProcessor = DefaultEventProcessor() + application.attributes.eventPool = pluginConfig.eventPool ?: error("Event pool must be specified") } public class WebSocketEventsConfiguration { - public var channel: WebSocketEventChannel? = null - - public var eventReceivingScope: CoroutineScope = defaultEventReceivingScope() - - internal val requiredChannel - get() = requireNotNull(channel) { - "channel is not configured." - } + public var eventPool: EventPool? = null } -private fun defaultEventReceivingScope() = CoroutineScope(Dispatchers.IO) - private fun Application.hasWebSocketsPlugin() = pluginOrNull(WebSockets) != null private fun Application.installWebSockets() { diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt new file mode 100644 index 0000000..a991dac --- /dev/null +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/dto/WebSocketEventDto.kt @@ -0,0 +1,13 @@ +package dev.d1s.ktor.events.server.dto + +import dev.d1s.ktor.events.commons.AbstractEvent +import dev.d1s.ktor.events.commons.EventReference +import dev.d1s.ktor.events.commons.Identifier +import dev.d1s.ktor.events.commons.UnixTime + +internal data class WebSocketEventDto( + override val id: Identifier, + override val reference: EventReference, + override val initiated: UnixTime, + val data: Any +) : AbstractEvent diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSendingConnection.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt similarity index 78% rename from ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSendingConnection.kt rename to ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt index e741479..d6507c6 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventSendingConnection.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/EventSendingConnection.kt @@ -14,12 +14,14 @@ * limitations under the License. */ -package dev.d1s.ktor.events.server +package dev.d1s.ktor.events.server.entity import dev.d1s.ktor.events.commons.EventReference +import io.ktor.server.application.* import io.ktor.websocket.* -internal data class WebSocketEventSendingConnection( +public data class EventSendingConnection( val reference: EventReference, - val session: DefaultWebSocketSession + val session: DefaultWebSocketSession, + val call: ApplicationCall ) diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/ServerWebSocketEvent.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/ServerWebSocketEvent.kt similarity index 66% rename from ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/ServerWebSocketEvent.kt rename to ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/ServerWebSocketEvent.kt index d4d6e83..8100975 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/ServerWebSocketEvent.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/entity/ServerWebSocketEvent.kt @@ -14,10 +14,10 @@ * limitations under the License. */ -package dev.d1s.ktor.events.server +package dev.d1s.ktor.events.server.entity -import dev.d1s.ktor.events.commons.ClientParameters -import dev.d1s.ktor.events.commons.EventReference +import dev.d1s.ktor.events.commons.* +import java.time.Instant public typealias EventDataSupplier = suspend (ClientParameters) -> Any @@ -36,14 +36,17 @@ public typealias EventDataSupplier = suspend (ClientParameters) -> Any * @see dev.d1s.ktor.events.commons.ref */ public data class ServerWebSocketEvent( - val reference: EventReference, - val dataSupplier: EventDataSupplier -) + override val id: Identifier = randomId, + override val reference: EventReference, + override val initiated: UnixTime = Instant.now().toEpochMilli(), + internal var acceptedByClients: List<Identifier> = listOf(), + internal val dataSupplier: EventDataSupplier +) : AbstractEvent /** - * A shortcut. Returns `ServerWebSocketEvent(reference, data)` + * A shortcut. Returns `ServerWebSocketEvent(reference, data)` currently initiated. * * @see ServerWebSocketEvent */ public fun event(reference: EventReference, dataSupplier: EventDataSupplier): ServerWebSocketEvent = - ServerWebSocketEvent(reference, dataSupplier) + ServerWebSocketEvent(reference = reference, dataSupplier = dataSupplier) diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/pool/EventPool.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/pool/EventPool.kt new file mode 100644 index 0000000..b8ee5e9 --- /dev/null +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/pool/EventPool.kt @@ -0,0 +1,133 @@ +package dev.d1s.ktor.events.server.pool + +import dev.d1s.ktor.events.commons.EventReference +import dev.d1s.ktor.events.commons.Identifier +import dev.d1s.ktor.events.server.entity.ServerWebSocketEvent +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import org.lighthousegames.logging.logging +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList + +private const val DEFAULT_LIFETIME = 43_200_000L // 12 h + +public interface EventPool { + + public val lifetimeMillis: Long + + public suspend fun push(event: ServerWebSocketEvent) + + public fun get(reference: EventReference, client: Identifier): List<ServerWebSocketEvent> + + public fun onEvent( + reference: EventReference, + client: Identifier, + handler: suspend (ServerWebSocketEvent) -> Unit + ) + + public fun confirm(event: Identifier, client: Identifier) +} + +public class InMemoryEventPool( + override val lifetimeMillis: Long = DEFAULT_LIFETIME +) : EventPool { + + private val eventMap = ConcurrentHashMap<EventReference, CopyOnWriteArrayList<ServerWebSocketEvent>>() + private val handlerMap = ConcurrentHashMap<EventReference, MutableList<suspend (ServerWebSocketEvent) -> Unit>>() + + private val gcScope = CoroutineScope(Dispatchers.Default) + + private val log = logging() + + init { + gcScope.launch { + while (true) { + cleanupExpiredEvents() + } + } + } + + override suspend fun push(event: ServerWebSocketEvent) { + eventMap.compute(event.reference) { _, events -> + val updatedEvents = events ?: CopyOnWriteArrayList() + updatedEvents.add(event) + updatedEvents + } + + log.d { + "Pushing event $event;\ncurrent event map: $eventMap;\nhandler map: $handlerMap" + } + + val handlersForReference = handlerMap.entries.find { + if (it.key.group != event.reference.group) return@find false + + val otherPrincipal = event.reference.principal + + if (it.key.principal != null) { + return@find if (otherPrincipal != null) { + it.key.principal == otherPrincipal + } else { + false + } + } + + true + }?.value + + log.d { + "Got handlers for reference: $handlersForReference" + } + + if (handlersForReference != null) { + coroutineScope { + handlersForReference.forEach { handler -> + launch { + handler(event) + } + } + } + } + } + + override fun get(reference: EventReference, client: Identifier): List<ServerWebSocketEvent> = + eventMap[reference]?.filter { event -> + !event.acceptedByClients.contains(client) + } ?: emptyList() + + override fun onEvent( + reference: EventReference, + client: Identifier, + handler: suspend (ServerWebSocketEvent) -> Unit + ) { + handlerMap.compute(reference) { _, handlers -> + val updatedHandlers = handlers ?: mutableListOf() + updatedHandlers.add(handler) + updatedHandlers + } + } + + override fun confirm(event: Identifier, client: Identifier) { + for (events in eventMap.values) { + events.find { it.id == event }?.apply { + if (!acceptedByClients.contains(client)) { + acceptedByClients += client + } + } + } + } + + private fun cleanupExpiredEvents() { + val now = Instant.now().toEpochMilli() + + eventMap.entries.removeIf { (_, events) -> + events.removeIf { event -> + event.initiated + lifetimeMillis < now + } + + events.isEmpty() + } + } +} \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsRoute.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt similarity index 75% rename from ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsRoute.kt rename to ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt index 8d13f1b..54ceb8a 100644 --- a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/WebSocketEventsRoute.kt +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/route/WebSocketEventsRoute.kt @@ -14,14 +14,16 @@ * limitations under the License. */ -package dev.d1s.ktor.events.server +package dev.d1s.ktor.events.server.route import dev.d1s.ktor.events.commons.EventReference import dev.d1s.ktor.events.commons.util.Routes +import dev.d1s.ktor.events.server.WebSocketEvents +import dev.d1s.ktor.events.server.entity.EventSendingConnection +import dev.d1s.ktor.events.server.util.eventProcessor import io.ktor.server.application.* import io.ktor.server.routing.* import io.ktor.server.websocket.* -import kotlinx.coroutines.channels.getOrElse import org.lighthousegames.logging.logging private val log = logging() @@ -35,7 +37,10 @@ private val log = logging() * @throws IllegalStateException if the application does not have [WebSocketEvents] plugin installed. * @see WebSocketEvents */ -public fun Route.webSocketEvents(route: String = Routes.DEFAULT_EVENTS_ROUTE, preprocess: suspend DefaultWebSocketServerSession.(EventReference) -> Unit = {}) { +public fun Route.webSocketEvents( + route: String = Routes.DEFAULT_EVENTS_ROUTE, + preprocess: suspend DefaultWebSocketServerSession.(EventReference) -> Unit = {} +) { log.d { "Exposing route $route" } @@ -46,7 +51,7 @@ public fun Route.webSocketEvents(route: String = Routes.DEFAULT_EVENTS_ROUTE, pr application.checkPluginInstalled() - val consumer = application.attributes.webSocketEventConsumer + val processor = application.attributes.eventProcessor webSocket(route) { log.d { @@ -79,28 +84,23 @@ public fun Route.webSocketEvents(route: String = Routes.DEFAULT_EVENTS_ROUTE, pr preprocess(eventReference) - val connection = WebSocketEventSendingConnection(eventReference, this) + val connection = EventSendingConnection(eventReference, this, call) + processor.process(connection) - consumer.addConnection(connection) - - var receiving = true - - while (receiving) { - incoming.receiveCatching().getOrElse { - log.w { - "Failed to receive" - } - - it?.printStackTrace() - - consumer.removeConnection(connection) - - receiving = false - } - } + receive() } } private fun Application.checkPluginInstalled() { pluginOrNull(WebSocketEvents) ?: error("WebSocketEvents plugin is not installed on this application.") +} + +private suspend fun DefaultWebSocketServerSession.receive() { + while (true) { + val data = incoming.receiveCatching().getOrNull() + data ?: log.d { + "Client connection lost" + } + data ?: break + } } \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/ClientHeader.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/ClientHeader.kt new file mode 100644 index 0000000..0062731 --- /dev/null +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/ClientHeader.kt @@ -0,0 +1,10 @@ +package dev.d1s.ktor.events.server.util + +import dev.d1s.ktor.events.commons.util.Routes +import io.ktor.server.application.* +import io.ktor.server.plugins.* +import io.ktor.server.request.* + +internal val ApplicationCall.clientId + get() = request.header(Routes.CLIENT_ID_HEADER) + ?: throw BadRequestException("${Routes.CLIENT_ID_HEADER} not specified") \ No newline at end of file diff --git a/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt new file mode 100644 index 0000000..b9f6224 --- /dev/null +++ b/ktor-ws-events-server/src/jvmMain/kotlin/dev/d1s/ktor/events/server/util/InternalComponents.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2022-2024 Mikhail Titov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.d1s.ktor.events.server.util + +import dev.d1s.ktor.events.server.EventProcessor +import dev.d1s.ktor.events.server.WEBSOCKET_EVENTS_PLUGIN_NAME +import dev.d1s.ktor.events.server.pool.EventPool +import io.ktor.util.* + +internal var Attributes.eventProcessor: EventProcessor + get() = this[Key.EventProcessor] + set(value) = this.put(Key.EventProcessor, value) + +internal var Attributes.eventPool: EventPool + get() = this[Key.EventPool] + set(value) = this.put(Key.EventPool, value) + +private object Key { + + val EventProcessor = + AttributeKey<EventProcessor>("${WEBSOCKET_EVENTS_PLUGIN_NAME}_event-processor") + + val EventPool = AttributeKey<EventPool>("${WEBSOCKET_EVENTS_PLUGIN_NAME}_event-pool") +} \ No newline at end of file