Skip to content

RSocket Interfaces doesn't work with coroutines. #34868

Open
@doxlik

Description

@doxlik

Hello, dear team. I use Spring Boot starters of recent 3.4.5 version to investigate Spring RSocket Interfaces. Here is my sample server.

@MessageMapping("uuid")
 suspend fun uuid(): String = UUID.randomUUID().toString()

And I want to consume it using RSocket Interface. And while for HTTP Interface I can easily use both Mono<String> and suspend fun: String and it is working perfectly, with RSocket interfaces if I use it this way

interface ServerRockerClient {
    @RSocketExchange("uuid")
    fun uuid(): Mono<String>
} 

and

val result = serverRocketClient.uuid().awaitSingle()

it works, but if I have it in coroutine way

interface ServerRockerClient {
    @RSocketExchange("uuid")
    suspend fun uuid(): String
}

and

val result = serverRocketClient.uuid()

I got exception

org.springframework.core.codec.EncodingException: JSON encoding error: kotlinx.atomicfu.AtomicRef
	at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:260)
	at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.encodeData(DefaultRSocketRequester.java:254)
	at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.lambda$createPayload$2(DefaultRSocketRequester.java:208)
	at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
	at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470)
	at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
	at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:295)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	at io.rsocket.core.DefaultRSocketClient$FlatMapMain.request(DefaultRSocketClient.java:327)
	at io.rsocket.core.DefaultRSocketClient$FlattingInner.request(DefaultRSocketClient.java:454)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:54)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at io.rsocket.core.DefaultRSocketClient$FlatMapMain.onSubscribe(DefaultRSocketClient.java:256)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:194)
	at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at io.rsocket.core.DefaultRSocketClient$RSocketClientMonoOperator.subscribe(DefaultRSocketClient.java:530)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
	at reactor.core.publisher.Mono.block(Mono.java:1778)
	at org.springframework.messaging.rsocket.service.RSocketServiceMethod.lambda$initResponseFunction$3(RSocketServiceMethod.java:173)
	at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
	at org.springframework.messaging.rsocket.service.RSocketServiceMethod.invoke(RSocketServiceMethod.java:224)
	at org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory$ServiceMethodInterceptor.invoke(RSocketServiceProxyFactory.java:255)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
	at jdk.proxy2/jdk.proxy2.$Proxy51.uuid(Unknown Source)
	at com.german.clientrocket.verticles.ServerVerticle$start$httpServerFuture$1$1.invokeSuspend(ServerVerticle.kt:42)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:363)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:21)
	at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:88)
	at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:123)
	at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:52)
	at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
	at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:43)
	at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source)
	at com.german.clientrocket.verticles.ServerVerticle.start$lambda$0(ServerVerticle.kt:24)
	at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:150)
	at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:42)
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:342)
	at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:163)
	at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:174)
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.onHttpRequestChannelRead(WebSocketServerExtensionHandler.java:158)
	at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:82)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.vertx.core.http.impl.Http1xUpgradeToH2CHandler.channelRead(Http1xUpgradeToH2CHandler.java:124)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61)
	at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
		at reactor.core.publisher.Mono.block(Mono.java:1779)
		... 62 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: kotlinx.atomicfu.AtomicRef (through reference chain: com.german.clientrocket.verticles.ServerVerticle$start$httpServerFuture$1$1["completion"])
	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:401)
	at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:360)
	at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:778)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:184)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:341)
	at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1587)
	at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:1061)
	at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:253)
	... 87 more
Caused by: java.lang.ClassNotFoundException: kotlinx.atomicfu.AtomicRef
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.parseType(KDeclarationContainerImpl.kt:291)
	at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.loadReturnType(KDeclarationContainerImpl.kt:306)
	at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.findMethodBySignature(KDeclarationContainerImpl.kt:216)
	at kotlin.reflect.jvm.internal.KPropertyImplKt.computeCallerForAccessor(KPropertyImpl.kt:313)
	at kotlin.reflect.jvm.internal.KPropertyImplKt.access$computeCallerForAccessor(KPropertyImpl.kt:1)
	at kotlin.reflect.jvm.internal.KPropertyImpl$Getter$caller$2.invoke(KPropertyImpl.kt:180)
	at kotlin.reflect.jvm.internal.KPropertyImpl$Getter$caller$2.invoke(KPropertyImpl.kt:179)
	at kotlin.SafePublicationLazyImpl.getValue(LazyJVM.kt:107)
	at kotlin.reflect.jvm.internal.KPropertyImpl$Getter.getCaller(KPropertyImpl.kt:179)
	at kotlin.reflect.jvm.ReflectJvmMapping.getJavaMethod(ReflectJvmMapping.kt:64)
	at kotlin.reflect.jvm.ReflectJvmMapping.getJavaGetter(ReflectJvmMapping.kt:49)
	at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.getRequiredMarkerFromCorrespondingAccessor(KotlinAnnotationIntrospector.kt:138)
	at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.hasRequiredMarker(KotlinAnnotationIntrospector.kt:133)
	at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.access$hasRequiredMarker(KotlinAnnotationIntrospector.kt:27)
	at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector$hasRequiredMarker$hasRequired$1.invoke(KotlinAnnotationIntrospector.kt:47)
	at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector$hasRequiredMarker$hasRequired$1.invoke(KotlinAnnotationIntrospector.kt:40)
	at com.fasterxml.jackson.module.kotlin.ReflectionCache.javaMemberIsRequired(ReflectionCache.kt:106)
	at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.hasRequiredMarker(KotlinAnnotationIntrospector.kt:40)
	at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasRequiredMarker(AnnotationIntrospectorPair.java:310)
	at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasRequiredMarker(AnnotationIntrospectorPair.java:310)
	at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.getMetadata(POJOPropertyBuilder.java:225)
	at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._anyIndexed(POJOPropertiesCollector.java:1671)
	at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._sortProperties(POJOPropertiesCollector.java:1563)
	at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:503)
	at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:269)
	at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:248)
	at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:393)
	at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
	at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:174)
	at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1535)
	at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1503)
	at com.fasterxml.jackson.databind.SerializerProvider.findPrimaryPropertySerializer(SerializerProvider.java:721)
	at com.fasterxml.jackson.databind.ser.impl.PropertySerializerMap.findAndAddPrimarySerializer(PropertySerializerMap.java:72)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter._findAndAddDynamic(BeanPropertyWriter.java:899)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:710)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
	... 93 more

My config for proxy client is

@Configuration
class RSocketConfig {

    @Bean
    fun rSocketRequester(rSocketBuilder: RSocketRequester.Builder): RSocketRequester =
        rSocketBuilder.rsocketConnector {
            it.reconnect(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)))
        }.transport(WebsocketClientTransport.create(URI.create("ws://127.0.0.1:8090")))

    @Bean
    fun rSocketServiceProxyFactory(@Qualifier("rSocketRequester") rSocketRequester: RSocketRequester): RSocketServiceProxyFactory =
        RSocketServiceProxyFactory.builder(rSocketRequester).build()

    @Bean
    fun serverRocketClient(@Qualifier("rSocketServiceProxyFactory") rSocketServiceProxyFactory: RSocketServiceProxyFactory): ServerRockerClient =
        rSocketServiceProxyFactory.createClient(ServerRockerClient::class.java)
}

I don't exactly know if it is bug or not implemented feature, but wanted to ask is it planned to implement RSocket Interfaces for coroutines, considering that all other RSocket components both on client and server side are very coroutine friendly.

I would be very grateful for your assist and thank you very much for all your input to community.

Metadata

Metadata

Assignees

Labels

in: messagingIssues in messaging modules (jms, messaging)theme: kotlinAn issue related to Kotlin supporttype: enhancementA general enhancement

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions