Description
Hello, dear team. I use Spring Boot starters of recent 3.4.5 version to investigate Spring RSocket Interfaces. Here is my sample server.
@MessageMapping("uuid")
suspend fun uuid(): String = UUID.randomUUID().toString()
And I want to consume it using RSocket Interface. And while for HTTP Interface I can easily use both Mono<String> and suspend fun: String and it is working perfectly, with RSocket interfaces if I use it this way
interface ServerRockerClient {
@RSocketExchange("uuid")
fun uuid(): Mono<String>
}
and
val result = serverRocketClient.uuid().awaitSingle()
it works, but if I have it in coroutine way
interface ServerRockerClient {
@RSocketExchange("uuid")
suspend fun uuid(): String
}
and
val result = serverRocketClient.uuid()
I got exception
org.springframework.core.codec.EncodingException: JSON encoding error: kotlinx.atomicfu.AtomicRef
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:260)
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.encodeData(DefaultRSocketRequester.java:254)
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.lambda$createPayload$2(DefaultRSocketRequester.java:208)
at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:295)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at io.rsocket.core.DefaultRSocketClient$FlatMapMain.request(DefaultRSocketClient.java:327)
at io.rsocket.core.DefaultRSocketClient$FlattingInner.request(DefaultRSocketClient.java:454)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:54)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at io.rsocket.core.DefaultRSocketClient$FlatMapMain.onSubscribe(DefaultRSocketClient.java:256)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:194)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at io.rsocket.core.DefaultRSocketClient$RSocketClientMonoOperator.subscribe(DefaultRSocketClient.java:530)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.Mono.block(Mono.java:1778)
at org.springframework.messaging.rsocket.service.RSocketServiceMethod.lambda$initResponseFunction$3(RSocketServiceMethod.java:173)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.springframework.messaging.rsocket.service.RSocketServiceMethod.invoke(RSocketServiceMethod.java:224)
at org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory$ServiceMethodInterceptor.invoke(RSocketServiceProxyFactory.java:255)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
at jdk.proxy2/jdk.proxy2.$Proxy51.uuid(Unknown Source)
at com.german.clientrocket.verticles.ServerVerticle$start$httpServerFuture$1$1.invokeSuspend(ServerVerticle.kt:42)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:363)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:21)
at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:88)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:123)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:52)
at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:43)
at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source)
at com.german.clientrocket.verticles.ServerVerticle.start$lambda$0(ServerVerticle.kt:24)
at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:150)
at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:42)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:342)
at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:163)
at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:174)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.onHttpRequestChannelRead(WebSocketServerExtensionHandler.java:158)
at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:82)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.vertx.core.http.impl.Http1xUpgradeToH2CHandler.channelRead(Http1xUpgradeToH2CHandler.java:124)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61)
at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
at reactor.core.publisher.Mono.block(Mono.java:1779)
... 62 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: kotlinx.atomicfu.AtomicRef (through reference chain: com.german.clientrocket.verticles.ServerVerticle$start$httpServerFuture$1$1["completion"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:401)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:360)
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:778)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:184)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:341)
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1587)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:1061)
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:253)
... 87 more
Caused by: java.lang.ClassNotFoundException: kotlinx.atomicfu.AtomicRef
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.parseType(KDeclarationContainerImpl.kt:291)
at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.loadReturnType(KDeclarationContainerImpl.kt:306)
at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.findMethodBySignature(KDeclarationContainerImpl.kt:216)
at kotlin.reflect.jvm.internal.KPropertyImplKt.computeCallerForAccessor(KPropertyImpl.kt:313)
at kotlin.reflect.jvm.internal.KPropertyImplKt.access$computeCallerForAccessor(KPropertyImpl.kt:1)
at kotlin.reflect.jvm.internal.KPropertyImpl$Getter$caller$2.invoke(KPropertyImpl.kt:180)
at kotlin.reflect.jvm.internal.KPropertyImpl$Getter$caller$2.invoke(KPropertyImpl.kt:179)
at kotlin.SafePublicationLazyImpl.getValue(LazyJVM.kt:107)
at kotlin.reflect.jvm.internal.KPropertyImpl$Getter.getCaller(KPropertyImpl.kt:179)
at kotlin.reflect.jvm.ReflectJvmMapping.getJavaMethod(ReflectJvmMapping.kt:64)
at kotlin.reflect.jvm.ReflectJvmMapping.getJavaGetter(ReflectJvmMapping.kt:49)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.getRequiredMarkerFromCorrespondingAccessor(KotlinAnnotationIntrospector.kt:138)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.hasRequiredMarker(KotlinAnnotationIntrospector.kt:133)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.access$hasRequiredMarker(KotlinAnnotationIntrospector.kt:27)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector$hasRequiredMarker$hasRequired$1.invoke(KotlinAnnotationIntrospector.kt:47)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector$hasRequiredMarker$hasRequired$1.invoke(KotlinAnnotationIntrospector.kt:40)
at com.fasterxml.jackson.module.kotlin.ReflectionCache.javaMemberIsRequired(ReflectionCache.kt:106)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.hasRequiredMarker(KotlinAnnotationIntrospector.kt:40)
at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasRequiredMarker(AnnotationIntrospectorPair.java:310)
at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasRequiredMarker(AnnotationIntrospectorPair.java:310)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.getMetadata(POJOPropertyBuilder.java:225)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._anyIndexed(POJOPropertiesCollector.java:1671)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._sortProperties(POJOPropertiesCollector.java:1563)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:503)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:269)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:248)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:393)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:174)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1535)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1503)
at com.fasterxml.jackson.databind.SerializerProvider.findPrimaryPropertySerializer(SerializerProvider.java:721)
at com.fasterxml.jackson.databind.ser.impl.PropertySerializerMap.findAndAddPrimarySerializer(PropertySerializerMap.java:72)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter._findAndAddDynamic(BeanPropertyWriter.java:899)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:710)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
... 93 more
My config for proxy client is
@Configuration
class RSocketConfig {
@Bean
fun rSocketRequester(rSocketBuilder: RSocketRequester.Builder): RSocketRequester =
rSocketBuilder.rsocketConnector {
it.reconnect(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)))
}.transport(WebsocketClientTransport.create(URI.create("ws://127.0.0.1:8090")))
@Bean
fun rSocketServiceProxyFactory(@Qualifier("rSocketRequester") rSocketRequester: RSocketRequester): RSocketServiceProxyFactory =
RSocketServiceProxyFactory.builder(rSocketRequester).build()
@Bean
fun serverRocketClient(@Qualifier("rSocketServiceProxyFactory") rSocketServiceProxyFactory: RSocketServiceProxyFactory): ServerRockerClient =
rSocketServiceProxyFactory.createClient(ServerRockerClient::class.java)
}
I don't exactly know if it is bug or not implemented feature, but wanted to ask is it planned to implement RSocket Interfaces for coroutines, considering that all other RSocket components both on client and server side are very coroutine friendly.
I would be very grateful for your assist and thank you very much for all your input to community.