-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connection leaks (not released) when concurrent query is canceled #198
Comments
@agorbachenko Hi Alexei - I ran your repro application but was not able to reproduce the issue. It ran for about 10 minutes on my machine locally. Built with Java 18 via We have an issue which seems similar (#206) and I was hoping to observe the repro to understand the problem. UPDATE: I actually was able to reproduce the issue, it ran for about 25 minutes and ran into the problem. I will investigate and will keep you posted here. |
btw if you change the delaySubscription in the package com.example.r2dbc;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;
@Component
public class R2dbcConnectionLeakDemo {
private final ConnectionFactory pooledConnectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
.option(HOST, "127.0.0.1")
.option(PORT, 5432)
.option(USER, "postgres")
.option(PASSWORD, "demo")
.option(DATABASE, "postgres")
.option(MAX_SIZE, 1)
.option(INITIAL_SIZE, 1)
.build());
@EventListener(ApplicationReadyEvent.class)
private void runGoodQueryPeriodically() {
Mono.usingWhen(pooledConnectionFactory.create(),
connection -> Mono.from(connection.createStatement("SELECT demo.* FROM demo WHERE demo.id = 1 LIMIT 2")
.execute()),
Connection::close)
.delaySubscription(Duration.ofMillis(1000), Schedulers.single())
.repeat()
.subscribe();
}
@EventListener(ApplicationReadyEvent.class)
private void runSomethingBadPeriodically() {
getDemoEntitiesAsList()
.zipWith(runSomethingFailing())
.delaySubscription(Duration.ofMillis(10), Schedulers.single())
// resume on error to repeat
.onErrorResume(e -> Mono.empty())
.repeat()
.subscribe();
}
private Mono<List<Integer>> getDemoEntitiesAsList() {
return Mono.usingWhen(pooledConnectionFactory.create(),
connection -> Mono.from(connection.createStatement("SELECT demo.id FROM demo")
.execute()),
Connection::close)
.map(result -> result.map(readable -> readable.get(0, Integer.class)))
.flatMapMany(Flux::from)
.collectList();
}
private static Mono<?> runSomethingFailing() {
return Mono.error(RuntimeException::new)
.delaySubscription(Duration.ofMillis(10), Schedulers.single());
}
} Of course I confirm that the issue is still present:
|
Thanks! I haven't had the chance to investigate, hopefully I can get back to this in the beginning of March |
You can add a bit more debugging logging via |
It seems be propagated? I updated the code, also using different threads, it actually makes the log clear: package com.example.r2dbc;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;
@Component
public class R2dbcConnectionLeakDemo {
private final ConnectionFactory pooledConnectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
.option(HOST, "127.0.0.1")
.option(PORT, 5432)
.option(USER, "postgres")
.option(PASSWORD, "demo")
.option(DATABASE, "postgres")
.option(MAX_SIZE, 1)
.option(INITIAL_SIZE, 1)
.build());
@EventListener(ApplicationReadyEvent.class)
private void runGoodQueryPeriodically() {
Mono.usingWhen(pooledConnectionFactory.create(),
connection -> Mono.from(connection.createStatement("SELECT demo.* FROM demo WHERE demo.id = 1 LIMIT 2")
.execute()),
Connection::close)
.log()
.delaySubscription(Duration.ofMillis(1000), Schedulers.newSingle("GoodThread"))
.repeat()
.subscribe();
}
@EventListener(ApplicationReadyEvent.class)
private void runSomethingBadPeriodically() {
getDemoEntitiesAsList()
.zipWith(runSomethingFailing())
.delaySubscription(Duration.ofMillis(10), Schedulers.newSingle("LeakingThread"))
// resume on error to repeat
.onErrorResume(e -> Mono.empty())
.repeat()
.subscribe();
}
private Mono<List<Integer>> getDemoEntitiesAsList() {
return Mono.usingWhen(pooledConnectionFactory.create(),
connection -> Mono.from(connection.createStatement("SELECT demo.id FROM demo")
.execute()),
Connection::close)
.log()
.map(result -> result.map(readable -> readable.get(0, Integer.class)))
.flatMapMany(Flux::from)
.collectList();
}
private static Mono<?> runSomethingFailing() {
return Mono.error(RuntimeException::new)
.delaySubscription(Duration.ofMillis(10), Schedulers.newSingle("BadThread"));
}
}
It seems that the cancellation signal happening on the The odd thing is that the
I changed this line:
and the log slightly change:
Now the Finally I decided to increase the delay to 1250ms (less frequently than the good query which runs every 1000ms):
At least at the beginning everything seems working and the Log looks completely different:
The |
|
@mp911de Hey :) I was summoned here from reactor/reactor-core#3695. Please note that the original reporter is not using the |
@chemicL, hmm... As I understand, my example uses |
@agorbachenko thanks for the quick response. I was not aware of the internals in Spring, I just had a look at r2dbc sources. I'll have a look then! |
Btw @chemicL I revised the author example keeping the test logic and its goal on showing the issue. However I removed the "Spring" layer, hoping it make it easier to understand. I don't think you have to look at the spring internal if you just look at the revised example here. |
Thanks @SimoneGiusso. We experimented together with @mp911de today with both examples. Thanks for providing your version of it! What I can say is that |
Thanks to @chemicL's investigation it seems that the underlying Reactor Pool causes the issue. The related issue is reactor/reactor-pool#124. I think the problem is going to be addressed there so closing the ticket here. |
@chemicL and I ran some investigations. For now, it seems that the issue only happens when there is an actual driver involved. So far, we tested only against Postgres. Using R2DBC Mocks doesn't reproduce the problem so we need to widen our scope again. Anyone willing to help looking into the cause is welcome. |
What I have found is that the connection acquisition is working properly and the connection is not stuck in the pool, but is being held by an actor that fails to consume the responses for an ongoing query. The scenario is as follows:
So with that scenario I replicated what was observed using the Postgres driver to check just the integration and the code is in https://github.com/chemicL/r2dbc-lab. However, all the tests pass in those mocked scenarios. Further investigation is needed. It looks as if the reactor-pool issue mentioned above is not the root cause here. |
@chemicL Hello Dariusz, greetings to Krakow! Yesterday I checked your r2dbc-lab in order to reproduce the problem. But your current example is using H2, not Postgre. I had to rewrite it for Postgre. Anyway I cannot reproduce the problem with bad actor. Can we help each other? |
Hey, @cyberluke 👋 I'm afraid I don't understand - r2dbc-lab has no actual dependency on H2 nor Postgresql. It uses only mocks with r2dbc-pool, r2dbc-spi, and r2dbc-spi-test. Can you elaborate? Also, r2dbc-lab shows in fact that using the primitives from the SPI there is no issue - the problem is not reproducible in isolation, only using the examples provided by @SimoneGiusso and @agorbachenko. |
@chemicL You're right, too much work. That H2 database was used in R2DBC Test case to reproduce the bug from another issue here. |
Is there a status update on this issue? |
@blangenberg @agorbachenko @mp911de If you increase size of "reactor.bufferSize.small" property the issue will gone. The source of problem is in r2dbc-postgresql ReactorNettyClient class. The internal ReactorNettyClient.Conversation structure is stucked if data consumer not consumed data. And this is what happened when bad thread cancelled it request |
@alexeykurshakov great finding! Thanks also for opening pgjdbc/r2dbc-postgresql#661 - linking it here for reference. |
Bug Report
Versions
Current Behavior
When you have some database query zipped with other action failing for some reason, your query is canceled, database connection is released, BUT it seems that it also affects another concurrently running query leading to its connection not being released.
See the reproducer project for the details: https://github.com/agorbachenko/r2dbc-connection-leak-demo
Output sample:
This connection was not released:
pg_stat_activity shows that the connection has not changed its state since then
Stack trace
Table schema
Input Code
-- your SQL here;
Steps to reproduce
Input Code
// your code here;
Expected behavior/code
Possible Solution
Additional context
The text was updated successfully, but these errors were encountered: