Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-7435 Add serialization for SSE #4363

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

marychatte
Copy link
Member

Subsystem
Client SSE, Server SSE

Motivation
KTOR-7435

@marychatte marychatte self-assigned this Oct 2, 2024
@marychatte marychatte requested review from bjhham and e5l October 2, 2024 14:50
Copy link
Contributor

@bjhham bjhham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the comments about including TypeInfo in the default config so that we can use different types in our sessions.

Comment on lines 34 to 40
public suspend fun <T : Any> HttpClient.serverSentEventsSession(
deserialize: ((String) -> T)? = null,
reconnectionTime: Duration? = null,
showCommentEvents: Boolean? = null,
showRetryEvents: Boolean? = null,
block: HttpRequestBuilder.() -> Unit
): ClientSSESession {
): ClientSSESession<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider using overloads instead of the default deserialize argument here to allow the original API sans type argument when using strings?

By that, I mean something like...

public suspend fun HttpClient.sseSession(
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean? = null,
    showRetryEvents: Boolean? = null,
    block: HttpRequestBuilder.() -> Unit
) = sseSession({ it }, reconnectionTime, showCommentEvents, showRetryEvents, block)

public suspend fun <T : Any> HttpClient.sseSession(
    deserialize: (String) -> T,
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean? = null,
    showRetryEvents: Boolean? = null,
    block: HttpRequestBuilder.() -> Unit
)

This would make it so the default deserialization can't be used, however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe ignore this suggestion and just include typeInfo in the default lambda signature. We'll probably want some logic to ensure that when String is supplied as the type argument that we avoid any serialization, otherwise you'd end up with some weirdness with it expecting strings to be wrapped in quotes.

@@ -13,6 +13,7 @@ import kotlin.time.Duration.Companion.milliseconds
public class SSEConfig {
internal var showCommentEvents = false
internal var showRetryEvents = false
internal var deserialize: (String) -> Any = { s -> s }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're missing the typeInfo here required to solve the general use-case for deserializing different types. I think it may need to be (TypeInfo) -> (String) -> Any so when using a StringFormat like Json, we can supply it like deserialize = { typeInfo -> Json.serializersModule.serializer(typeInfo).let { serializer -> {{ Json.decodeFromString(serializer, it) }} } }. Rather ugly, but we could provide some helpers somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the API side, this will need to use reified T and typeOf<T>() to pass to the session.

Comment on lines 552 to 561
client.sse<Customer>({
url("$TEST_SERVER/sse/json")
}) {
incoming.single().apply {
assertEquals(1, data?.id)
assertEquals("Jet", data?.firstName)
assertEquals("Brains", data?.lastName)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to see this work with different types on the same server.

Comment on lines 18 to 24
public class ServerSentEvent<T>(
public val data: T? = null,
public val event: String? = null,
public val id: String? = null,
public val retry: Long? = null,
public val comments: String? = null
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if this was a data class, so instead of having:

incoming.collect { person ->
   println(person.data)
}

We could have

incoming.collect { (person) ->
    println(person)
}

@@ -12,16 +12,16 @@ import kotlinx.coroutines.flow.*
/**
* A Server-sent events session.
*/
public interface SSESession : CoroutineScope {
public interface SSESession<T> : CoroutineScope {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep SSESession without generic types and introduce a new type for the session with deserialization. It will be more explicit and simplify maintenance in the long run

@marychatte
Copy link
Member Author

@e5l @bjhham Thanks for the comments, please take a look at the new commits

Right now, for the client-side:

  1. Two types of sessions: SSESession and SSESessionWithDeserialization:
public interface SSESession : CoroutineScope {
    public val incoming: Flow<ServerSentEvent<String>>
}

public interface SSESessionWithDeserialization: SSESession {
    public val deserializer: (TypeInfo) -> (String) -> Any
}
  1. The difference in builder functions is in the parameter deserialize. This parameter is required for all builder functions for SSESessionWithDeserialization: deserialize: (TypeInfo) -> (String) -> Any
  2. Example of SSESessionWithDeserialization usage:
@Serializable
data class Customer(val id: Int, val firstName: String, val lastName: String)
@Serializable
data class Product(val name: String, val price: Int)

client.sse({
    url("$TEST_SERVER/sse/json")
}, deserialize = { typeInfo: TypeInfo ->
    { jsonString: String ->
         val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
         Json.decodeFromString(serializer, jsonString) ?: Exception()
     }
}) {
    incoming.collect { event ->
        if (customer) {
            val customer = deserialize<Customer>(event.data)
            assertEquals("Jet", customer?.firstName)
        } else {
           val product = deserialize<Product>(event.data)
           assertEquals(100, product?.price)
        }
   }
}

Example of SSESession usage:

val session = client.serverSentEventsSession("$TEST_SERVER/sse/hello")
val event: ServerSentEvent<String> = session.incoming.single()
assertEquals("0", event.id)

For the server-side:

  1. Also two types of sessions, SSESession and SSESessionWithSerialization:
public interface SSESession : CoroutineScope {
    public val call: ApplicationCall

    public suspend fun send(event: ServerSentEvent<String>)
}

public interface SSESessionWithSerialization : SSESession {
    public val serializer: (TypeInfo) -> (Any) -> String
}
public suspend inline fun <reified T : Any> SSESessionWithSerialization.sendSerialized(event: ServerSentEvent<T>) {
    send(
        ServerSentEvent(
            event.data?.let {
                serializer(typeInfo<T>()).invoke(it)
            },
            event.event,
            event.id,
            event.retry,
            event.comments
        )
    )
}
  1. Parameter serialize is required for all builder functions for SSESessionWithSerialization: serialize: (TypeInfo) -> (Any) -> String
  2. Example of SSESession usage:
sse("/hello") {
      send(ServerSentEvent("world", event = "send", id = "100", retry = 1000, comments = "comment"))
}

Example of SSESessionWithSerialization usage:

class Person1(val age: Int)
class Person2(val number: Int)
sse(serialize = {
    typeInfo ->
    { data ->
        when (typeInfo.type) {
            Person1::class -> {
                "Age ${(data as Person1).age}"
            }

            Person2::class -> {
                "Number ${(data as Person2).number}"
            }

            else -> {
                data.toString()
            }
        }
    }
}) {
    sendSerialized(Person1(22))
    sendSerialized(Person2(123456))
}

Questions:

  1. Is configuring the serialization/deserialization functions only through function parameters and not SSE config okay?
  2. Are we agreed on the types for serialization/deserialization functions deserialize: (TypeInfo) -> (String) -> Any and serialize: (TypeInfo) -> (Any) -> String?

@marychatte marychatte requested review from e5l and bjhham October 7, 2024 12:37
public class ServerSentEvent(
public val data: String? = null,
public data class ServerSentEvent<T>(
public val data: T? = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also split this class in 2 so String would be default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split into ServerSentEvent and ParameterizedServerSentEvent<T>

/**
* Deserializer for transforming field `data` of `ServerSentEvent` into desired data object.
*/
public val deserializer: (TypeInfo) -> (String) -> Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell me why it's not just fun deserialize(TypeInfo, String) -> Any?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Deserializer and Serializer interface:

public interface Deserializer {
    public fun deserialize(typeInfo: TypeInfo, data: String): Any
}

Do we want to make them functional because now it's used like:

client.sse(
    { url("$TEST_SERVER/sse/person") },
    object : Deserializer {
        override fun deserialize(typeInfo: TypeInfo, data: String): Any {
            return Person1(data)
    }
}) {
    incoming.single().apply {
        assertEquals("Name 0", deserialize<Person1>(data)?.name)
    }
}

@@ -31,7 +32,7 @@ public data object SSECapability : HttpClientEngineCapability<Unit>
* val client = HttpClient {
* install(SSE)
* }
* client.sse {
* client.sse<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid having generics on the sse method

@marychatte marychatte requested a review from e5l October 8, 2024 17:12
Comment on lines 90 to 98
public suspend inline fun <reified T : Any> SSESessionWithSerialization.sendSerialized(
data: T? = null,
event: String? = null,
id: String? = null,
retry: Long? = null,
comments: String? = null
) {
sendSerialized(ParameterizedServerSentEvent(data, event, id, retry, comments))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just overload send here to avoid the wordiness of sendSerialized.

Comment on lines 104 to 100
/**
* Serializer interface for transforming data object into field `data` of `ServerSentEvent`.
*/
public interface Serializer {

/**
* Transforms data object into field `data` of `ServerSentEvent`.
*/
public fun serialize(typeInfo: TypeInfo, data: Any): String
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use the lambda expression (typeInfo, data) -> String for the serializer field, since the argument provides context and this interface clutters up the API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e5l suggested using an interface in case we would need to change the API or add something. But (typeInfo, data) -> String seems like final thing

Comment on lines 41 to 81
public data class ParameterizedServerSentEvent<T>(
public val data: T? = null,
public val event: String? = null,
public val id: String? = null,
public val retry: Long? = null,
public val comments: String? = null
) {
@InternalAPI
public fun toString(serializer: (T) -> String): String =
eventToString(data?.let { serializer(it) }, event, id, retry, comments)
}

private fun eventToString(data: String?, event: String?, id: String?, retry: Long?, comments: String?): String {
return buildString {
appendField("data", data)
appendField("event", event)
appendField("id", id)
appendField("retry", retry)
appendField("", comments)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with something like:

sealed interface ServerSentEventMetadata<T> {
    val data: T?
    val event: String?
    val id: String?
    val retry: Long?
    val comments: String?
}

data class ServerSentEvent(...): ServerSentEventMetadata<String>
data class ServerSentEventParsed(...): ServerSentEventMetadata<T>

Comment on lines 449 to 462
client.sse(
{
url("$TEST_SERVER/sse/person")
parameter("times", count)
},
deserialize = object : Deserializer {
override fun deserialize(typeInfo: TypeInfo, data: String): Any {
return Person(data)
}
}
) {
incoming.collectIndexed { i, event ->
val person = deserialize<Person>(event)
assertEquals("Name $i", person?.name)
assertEquals("$i", event.id)
size++
}
}
assertEquals(count, size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the removal of the default deserializer in the client config, I think it would simplify things to just go back to (String) -> E as the argument for the session and have the incoming return the parsed events from incoming.

Copy link
Member Author

@marychatte marychatte Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean have incoming: Flow<ServerSentEventParsed<*>>?

Comment on lines 236 to 252
@Test
fun testDifferentSerializers() = testApplication {
install(SSE)
routing {
sse(object : Serializer {
override fun serialize(typeInfo: TypeInfo, data: Any): String {
return when (typeInfo.type) {
Person1::class -> {
"Age ${(data as Person1).age}"
}

Person2::class -> {
"Number ${(data as Person2).number}"
}

else -> {
data.toString()
}
}
}
}) {
sendSerialized(Person1(22))
sendSerialized(Person2(123456))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be interested in seeing an example with kotlinx-serialization-json, since this is probably the most common use-case for this feature. We ought to shape the API so that this is easy to implement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is used kotlinx-serialization-json

@marychatte marychatte force-pushed the marychatte/KTOR-7435-Add-serialization-for-SSE branch from 9ce8ec5 to 19f01fe Compare October 10, 2024 17:15
/**
* An incoming server-sent events flow.
*/
public val incoming: Flow<ServerSentEventParsed<String>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think about naming. ServerSentEventParsed looks inconsistent.

/**
* Deserializer for transforming field `data` of `ServerSentEvent` into desired data object.
*/
public val deserializer: (TypeInfo, String) -> Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we avoid functional fields?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants