Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random IllegalReferenceCountException: refCnt: 0 #245

Closed
dstepanov opened this issue Jan 21, 2022 · 11 comments
Closed

Random IllegalReferenceCountException: refCnt: 0 #245

dstepanov opened this issue Jan 21, 2022 · 11 comments
Assignees
Labels
type: bug Something isn't working
Milestone

Comments

@dstepanov
Copy link

We have a randomly failing test on Github Actions, unfortunately, no steps to reproduce.

Versions

  • Driver: Arabba-SR12 - io.r2dbc:r2dbc-mssql:0.8.8.RELEASE
  • Java: 1.8
  • OS: Linux - Github Actions
io.micronaut.data.r2dbc.sqlserver.SqlServerRepositoryPoolSpec test custom delete FAILED

  java.lang.RuntimeException: Async resource cleanup failed after onComplete
      at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:533)
      at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:136)
      at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
      at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
      at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
      at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
      at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97)
      at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:203)
      at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:414)
      at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
      at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
      at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onError(FluxPeekFuseable.java:903)
      at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:414)
      at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
      at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
      at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:548)
      at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:415)
      at reactor.core.publisher.EmitterProcessor.subscribe(EmitterProcessor.java:209)
      at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
      at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
      at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
      at io.r2dbc.mssql.client.ReactorNettyClient$ExchangeRequest$1.onSuccess(ReactorNettyClient.java:753)
      at io.r2dbc.mssql.client.ReactorNettyClient$RequestQueue.submit(ReactorNettyClient.java:689)
      at io.r2dbc.mssql.client.ReactorNettyClient$ExchangeRequest.submit(ReactorNettyClient.java:749)
      at io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$14(ReactorNettyClient.java:586)
      at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
      at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
      at io.r2dbc.mssql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:52)
      at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83)
      at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4399)
      at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:103)
      at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
      at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:202)
      at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:87)
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
      at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
      at io.r2dbc.pool.MonoDiscardOnCancel$MonoDiscardOnCancelSubscriber.onNext(MonoDiscardOnCancel.java:92)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
      at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
      at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:229)
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
      at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:62)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4399)
      at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
      at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
      at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
      at io.r2dbc.pool.MonoDiscardOnCancel.subscribe(MonoDiscardOnCancel.java:50)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
      at reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:464)
      at reactor.pool.SimpleDequePool.lambda$drainLoop$8(SimpleDequePool.java:340)
      at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
      at reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:340)
      at reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:558)
      at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268)
      at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:427)
      at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
      at reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676)
      at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
      at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
      at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:117)
      at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:50)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4384)
      at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:103)
      at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57)
      at reactor.core.publisher.Mono.subscribe(Mono.java:4399)
      at reactor.core.publisher.Mono.blockOptional(Mono.java:1750)
      at io.micronaut.data.operations.reactive.BlockingReactorRepositoryOperations.executeUpdate(BlockingReactorRepositoryOperations.java:92)
      at io.micronaut.data.operations.RepositoryOperations.executeDelete(RepositoryOperations.java:195)
      at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:53)
      at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:35)
      at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:114)
      at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:75)
      at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
      at io.micronaut.validation.ValidatingInterceptor.intercept(ValidatingInterceptor.java:138)
      at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
      at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanupData(AbstractRepositorySpec.groovy:129)
      at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanup(AbstractRepositorySpec.groovy:141)
  Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
      at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619)
      at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629)
      at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:619)
      at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:317)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274)
      at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
      at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503)
      ... 65 more
  Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
      at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
      at io.netty.buffer.PooledUnsafeDirectByteBuf.memoryAddress(PooledUnsafeDirectByteBuf.java:241)
      at io.netty.buffer.UnsafeByteBufUtil.setBytes(UnsafeByteBufUtil.java:524)
      at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:193)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1104)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1096)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1087)
      at io.r2dbc.mssql.client.StreamDecoder$DecoderState.initial(StreamDecoder.java:180)
      at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:85)
      at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
      at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:255)
      at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
      ... 69 more

@mp911de mp911de added the type: bug Something isn't working label Jan 21, 2022
@mp911de mp911de self-assigned this Jan 21, 2022
@mp911de
Copy link
Member

mp911de commented Jan 21, 2022

This looks as if the connection was disconnected earlier on and still some activity happens later on. The refCnt: 0 comes from the buffer that was read by Netty for decoding.
Do you observe any preceding error messages in the log?

@dstepanov
Copy link
Author

I haven't seen anything, waiting for another fail to check again

@donard-commedagh
Copy link

I'm getting a similar but not predictably repeatable error, which might be the same problem:

15:40:00.842 [reactor-tcp-nio-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
	io.r2dbc.mssql.message.token.RowToken.doDecodePlp(RowToken.java:259)
	io.r2dbc.mssql.message.token.RowToken.decodeColumnData(RowToken.java:199)
	io.r2dbc.mssql.message.token.NbcRowToken.doDecode(NbcRowToken.java:123)
	io.r2dbc.mssql.message.token.NbcRowToken.decode(NbcRowToken.java:61)
	io.r2dbc.mssql.message.token.Tabular.lambda$decodeFunction$0(Tabular.java:203)
	io.r2dbc.mssql.message.token.Tabular$TabularDecoder.decode(Tabular.java:413)
	io.r2dbc.mssql.client.ConnectionState$4$1.decode(ConnectionState.java:206)
	io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:116)
	io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:88)
	io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
	io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:297)
	reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
	reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
	reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:832)

My stack is:
My app > komapper > r2dbc-mssql > SQL Server

The stack trace doesn't touch my code so I can't prove what function is triggering it.

However, I think it is due to reading from a Blob object as when I comment out the Blob reading code it never triggers.

val buffer = ByteBuffer.allocate(65536)
blobObject.stream().collect<ByteBuffer>(buffer::put)

Note: I am caching the result of the above code as Blobs can only be read once. I have no reason to believe that a second thread/co-routine might be trying to read before the result is cached.

From io/r2dbc/spi/Blob.java

    /**
     * Returns the content stream as a {@link Publisher} emitting {@link ByteBuffer} chunks.
     * <p>
     * The content stream can be consumed ("subscribed to") only once.  Subsequent consumptions result in a {@link IllegalStateException}.
     * <p>
     * Once {@link Publisher#subscribe(Subscriber) subscribed}, {@link Subscription#cancel() canceling} the subscription releases resources associated with this {@link Blob}.
     *
     * @return a {@link Publisher} emitting {@link ByteBuffer} chunks.
     */
    Publisher<ByteBuffer> stream();

I suspect that cancel() may not be getting called properly once the Blob is fully read, but I can't figure out how to test this.

@dstepanov
Copy link
Author

The issue is still there:

2022-11-14T03:07:02.6207388Z       at io.micronaut.data.r2dbc.operations.DefaultR2dbcRepositoryOperations.blockOptional(DefaultR2dbcRepositoryOperations.java:213)
2022-11-14T03:07:02.6209345Z       at io.micronaut.data.operations.reactive.BlockingExecutorReactorRepositoryOperations.executeDelete(BlockingExecutorReactorRepositoryOperations.java:105)
2022-11-14T03:07:02.6210444Z       at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:53)
2022-11-14T03:07:02.6248347Z       at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:35)
2022-11-14T03:07:02.6249490Z       at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:81)
2022-11-14T03:07:02.6251260Z       at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
2022-11-14T03:07:02.6252087Z       at io.micronaut.validation.ValidatingInterceptor.intercept(ValidatingInterceptor.java:143)
2022-11-14T03:07:02.6261824Z       at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
2022-11-14T03:07:02.6262700Z       at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanupData(AbstractRepositorySpec.groovy:164)
2022-11-14T03:07:02.6264233Z       at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanup(AbstractRepositorySpec.groovy:176)
2022-11-14T03:07:02.6265126Z   Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
2022-11-14T03:07:02.6267089Z       at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$23(ReactorNettyClient.java:688)
2022-11-14T03:07:02.6267714Z       at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:698)
2022-11-14T03:07:02.6268345Z       at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:688)
2022-11-14T03:07:02.6268952Z       at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:366)
2022-11-14T03:07:02.6269536Z       at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
2022-11-14T03:07:02.6270209Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321)
2022-11-14T03:07:02.6270891Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
2022-11-14T03:07:02.6271552Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274)
2022-11-14T03:07:02.6272189Z       at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
2022-11-14T03:07:02.6272761Z       at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:549)
2022-11-14T03:07:02.6273151Z       ... 69 more
2022-11-14T03:07:02.6283901Z   Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
2022-11-14T03:07:02.6284470Z       at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
2022-11-14T03:07:02.6285086Z       at io.netty.buffer.PooledUnsafeDirectByteBuf.memoryAddress(PooledUnsafeDirectByteBuf.java:241)
2022-11-14T03:07:02.6285657Z       at io.netty.buffer.UnsafeByteBufUtil.setBytes(UnsafeByteBufUtil.java:524)
2022-11-14T03:07:02.6286219Z       at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:193)
2022-11-14T03:07:02.6286761Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1104)
2022-11-14T03:07:02.6287242Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1096)
2022-11-14T03:07:02.6287712Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1087)
2022-11-14T03:07:02.6288225Z       at io.r2dbc.mssql.client.StreamDecoder$DecoderState.initial(StreamDecoder.java:180)
2022-11-14T03:07:02.6288736Z       at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:85)
2022-11-14T03:07:02.6289228Z       at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
2022-11-14T03:07:02.6289746Z       at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:297)
2022-11-14T03:07:02.6290366Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
2022-11-14T03:07:02.6290818Z       ... 73 more

@aironi
Copy link

aironi commented Apr 26, 2023

Hi everybody!

Reporting that we are facing possibly the same problem and also the memory leak notification from Netty leak detector.

In our use case we are using r2dbc, r2dbc-pool, Reactor, Spring and we are connecting to MS SQL Server.

Leak nag:

[2023-04-25T10:57:48.525Z] huhtik. 25, 2023 1:57:48 IP. io.netty.util.ResourceLeakDetector reportTracedLeak
[2023-04-25T10:57:48.534Z] SEVERE: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

Stack trace regarding refCnt:

Exception: IllegalReferenceCountException: refCnt: 0
Stack: java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke(JavaMethodInvokeInfo.java:22)
	at com.microsoft.azure.functions.worker.broker.EnhancedJavaMethodExecutorImpl.execute(EnhancedJavaMethodExecutorImpl.java:22)
	at com.microsoft.azure.functions.worker.chain.FunctionExecutionMiddleware.invoke(FunctionExecutionMiddleware.java:19)
	at com.microsoft.azure.functions.worker.chain.InvocationChain.doNext(InvocationChain.java:21)
	at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod(JavaFunctionBroker.java:125)
	at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:34)
	at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:10)
	at com.microsoft.azure.functions.worker.handler.MessageHandler.handle(MessageHandler.java:44)
	at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0(JavaWorkerClient.java:94)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.dao.DataAccessResourceFailureException: executeMany; SQL [< secrets >]; null
	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:231)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxOnErrorResume] :
	reactor.core.publisher.Flux.onErrorMap
	org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:149)
Error has been observed at the following site(s):
	*_____Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:149)
	|_                    ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:85)
	|_          Flux.name ⇢ at < secrets >
	|_           Flux.tag ⇢ at
	|_       Flux.metrics ⇢ at
	|_           Flux.map ⇢ at
	*_________Flux.concat ⇢ at
	|_          Flux.name ⇢ at
	|_           Flux.tag ⇢ at
	|_       Flux.metrics ⇢ at
	|_       Flux.flatMap ⇢ at
	|_       Flux.flatMap ⇢ at
	|_     Flux.publishOn ⇢ at
	|_       Flux.flatMap ⇢ at
	|_       Flux.flatMap ⇢ at
	|_           Flux.map ⇢ at
	|_ Flux.switchIfEmpty ⇢ at
	*________Flux.flatMap ⇢ at
	|_ Flux.switchIfEmpty ⇢ at
	*________Flux.flatMap ⇢ at
	|_        Flux.filter ⇢ at
	|_          Flux.sort ⇢ at
	|_          Flux.skip ⇢ at
	|_          Flux.take ⇢ at
	|_                    ⇢ at
	|_     Flux.doOnError ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunction(SimpleFunctionRegistry.java:834)
	|_          Flux.from ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputPublisherIfNecessary(SimpleFunctionRegistry.java:1415)
	|_           Flux.map ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputPublisherIfNecessary(SimpleFunctionRegistry.java:1415)
Original Stack Trace:
		at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:231)
		at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:150)
		at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:7123)
		at reactor.core.publisher.Flux.lambda$onErrorResume$29(Flux.java:7176)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398)
		at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
		at reactor.core.publisher.Operators.complete(Operators.java:137)
		at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:200)
		at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:145)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:163)
		at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
		at reactor.core.publisher.Operators.error(Operators.java:198)
		at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:364)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
		at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1717)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:159)
		at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:130)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:488)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:421)
		at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:185)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:392)
		at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:527)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
		at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:531)
		at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:761)
		at reactor.core.publisher.Operators.complete(Operators.java:137)
		at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:873)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
		at reactor.core.publisher.Operators.complete(Operators.java:137)
		at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:238)
		at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:163)
		at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
		at reactor.core.publisher.Operators.error(Operators.java:198)
		at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:384)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:540)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:488)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:432)
		at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:312)
		at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
		at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMainAndComplete(FluxTakeUntilOther.java:200)
		at reactor.core.publisher.FluxTakeUntilOther$TakeUntilOtherSubscriber.onComplete(FluxTakeUntilOther.java:128)
		at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
		at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
		at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
		at io.r2dbc.mssql.RpcQueryMessageFlow$OnCursorComplete.run(RpcQueryMessageFlow.java:686)
		at io.r2dbc.mssql.RpcQueryMessageFlow.onDone(RpcQueryMessageFlow.java:387)
		at io.r2dbc.mssql.RpcQueryMessageFlow.handleMessage(RpcQueryMessageFlow.java:377)
		at io.r2dbc.mssql.RpcQueryMessageFlow.handleMessage(RpcQueryMessageFlow.java:315)
		at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$1(RpcQueryMessageFlow.java:141)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:488)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:504)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
		at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:471)
		at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:269)
		at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
		at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
		at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:295)
		at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:214)
		at io.r2dbc.mssql.client.ReactorNettyClient$2.onNext(ReactorNettyClient.java:326)
		at io.r2dbc.mssql.client.ReactorNettyClient$2.onNext(ReactorNettyClient.java:315)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256)
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292)
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401)
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:411)
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
		at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
	at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$24(ReactorNettyClient.java:692)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxPeek] :
	reactor.core.publisher.Flux.doOnSubscribe
	io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$18(ReactorNettyClient.java:628)
Error has been observed at the following site(s):
	*_____Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$18(ReactorNettyClient.java:628)
	*_______Mono.flatMapMany ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:652)
	|_           Flux.handle ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:652)
	|_ Flux.doAfterTerminate ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
	|_        Flux.doFinally ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
	|_       Flux.doOnCancel ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
	|_           Flux.handle ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:137)
	|_           Flux.filter ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:143)
	|_       Flux.doOnCancel ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:144)
	|_    Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:146)
	|_                       ⇢ at io.r2dbc.mssql.util.Operators.discardOnCancel(Operators.java:60)
	|_      Flux.doOnDiscard ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$3(RpcQueryMessageFlow.java:149)
	|_        Flux.transform ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:149)
	|_   Flux.takeUntilOther ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:149)
	|_                       ⇢ at io.r2dbc.mssql.MssqlStatementSupport.potentiallyAttachTimeout(MssqlStatementSupport.java:100)
	|_      Flux.windowUntil ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.lambda$execute$8(ParametrizedMssqlStatement.java:139)
	|_              Flux.map ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.lambda$execute$8(ParametrizedMssqlStatement.java:139)
	*_____________Flux.defer ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.execute(ParametrizedMssqlStatement.java:119)
	|_             Flux.from ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$getResultFunction$6(DefaultDatabaseClient.java:398)
	|_             Flux.cast ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$getResultFunction$6(DefaultDatabaseClient.java:399)
	|_            checkpoint ⇢ SQL  < secrets > [DatabaseClient]
	|_          Flux.flatMap ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.lambda$all$1(DefaultFetchSpec.java:87)
	*_________Flux.usingWhen ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:137)
Original Stack Trace:
		at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$24(ReactorNettyClient.java:692)
		at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:702)
		at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:692)
		at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:370)
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:303)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
		at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:483)
		at reactor.core.publisher.SinkManyEmitterProcessor$EmitterInner.drainParent(SinkManyEmitterProcessor.java:615)
		at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:602)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.request(FluxHandleFuseable.java:653)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:140)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.request(FluxHandleFuseable.java:653)
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
		at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:115)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
		at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.request(FluxTakeUntilOther.java:182)
		at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:684)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:835)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
		at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1717)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
		at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:159)
		at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:130)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
		at reactor.core.publisher < clipped by Azure ... >

@aironi
Copy link

aironi commented Nov 20, 2023

Hi all!

I believe this problem would be corrected by moving handleConnectionError(throwable) from linre 370 to 381:

https://github.com/r2dbc/r2dbc-mssql/blob/main/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java#L370

Please confirm.

mp911de added a commit that referenced this issue Nov 20, 2023
…response handlers.

Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated.

[#245]

Signed-off-by: Mark Paluch <[email protected]>
mp911de added a commit that referenced this issue Nov 20, 2023
…response handlers.

Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated.

[#245]

Signed-off-by: Mark Paluch <[email protected]>
mp911de added a commit that referenced this issue Nov 20, 2023
…response handlers.

Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated.

[#245]

Signed-off-by: Mark Paluch <[email protected]>
@mp911de
Copy link
Member

mp911de commented Nov 20, 2023

Changing order will complete the request queue first and then discard the pending response handlers. It will guarantee that no new request will be accepted and I think, that is a useful change. I'm not sure whether it will fix the issue though.

Snapshot builds are available now, please retest.

@aironi
Copy link

aironi commented Nov 22, 2023

Changing order will complete the request queue first and then discard the pending response handlers. It will guarantee that no new request will be accepted and I think, that is a useful change. I'm not sure whether it will fix the issue though.

Snapshot builds are available now, please retest.

Thanks, I did test this with my custom snapshot that I built locally and the refCnt disappeared. Instead I got the "connection closed" stack trace. We are not still sure why connection gets closed. We do perform a lot of queries to SQL Server and have only one troublesome use case where a lot of data is fetched with multiple complex queries. But that seems to be a different problem and this bug only hid the actual stack trace. Anyways, since I am not familiar with this package, I was not completely sure if this was a proper fix or not, hence the comment instead of PR 🙂

I will test with your snapshot as soon as possible! Thanks again!

@mp911de
Copy link
Member

mp911de commented Nov 22, 2023

Instead I got the "connection closed" stack trace.

I think this is actually an improvement. Connection closed can happen when there is too much pressure on the connection (i.e. server TCP recv buffers exhausted). Can you check how much activity is going on, especially parallel activity? Switching to concatMap instead of using flatMap or large prefetch sizes should get you there.

Once I get the confirmation from your side that the fix is appropriate, I'll close the ticket here.

@aironi
Copy link

aironi commented Nov 22, 2023

Bad news: I attempted my use case with 1.1.0-BUILD-SNAPSHOT but it seems that I am hitting #276 and to my old eyes it seems that the queries get mixed up between connections like mentioned in #273 . I am using a pool.

Good news: I could not see the refCnt problem with this version.

For me, the error is the following (real parameter names renamed):

[2023-11-22T17:54:19.789Z] Exception: ExceptionFactory.MssqlNonTransientException: The parameterized query '(@P0_actualId nvarchar(4000),@P1_anotherId nvarchar(4000))SELECT ' expects the parameter '@P1_anotherId', which was not supplied.
...

[2023-11-22T17:54:19.790Z] Caused by: org.springframework.r2dbc.UncategorizedR2dbcException: executeMany; SQL [SELECT TOP(1) * FROM TABLE WITH(NOLOCK) WHERE ActualId = (:actualId)]; The parameterized query '(@P0_actualId nvarchar(4000),@P1_anotherId nvarchar(4000))SELECT ' expects the parameter '@P1_anotherId', which was not supplied.
[2023-11-22T17:54:19.790Z] 	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:245)
[2023-11-22T17:54:19.790Z] 	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
[2023-11-22T17:54:19.791Z] Assembly trace from producer [reactor.core.publisher.FluxOnErrorResume] :
[2023-11-22T17:54:19.791Z] 	reactor.core.publisher.Flux.onErrorMap
[2023-11-22T17:54:19.791Z] 	org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:151)
[2023-11-22T17:54:19.791Z] Error has been observed at the following site(s):
[2023-11-22T17:54:19.791Z] 	*_____Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:151)
[2023-11-22T17:54:19.791Z] 	|_                    ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:83)
[2023-11-22T17:54:19.791Z] 	|_        Flux.buffer ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:62)
[2023-11-22T17:54:19.791Z] 	|_       Flux.flatMap ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:63)
[2023-11-22T17:54:19.792Z] 	|_          Flux.next ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:73)
[2023-11-22T17:54:19.792Z] 	|_          Mono.name ⇢ at com.mycode.MyRepositoryImpl.findXyzSql(MyRepositoryImpl.java:123)

BUT, the query in question does not even specify the anotherId in SQL.

I will comment this also to #276.

We are using 1.0.0.RELEASE in our released solution at the moment.

@mp911de
Copy link
Member

mp911de commented Nov 23, 2023

Thanks a lot for confirming. Closing this one as done.

The other issues are different ones that we need to sort out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants