Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example to handle failures in Transaction: Getting connection-timeout for subsequent requests #72

Open
ssumit opened this issue Feb 1, 2017 · 9 comments

Comments

@ssumit
Copy link

ssumit commented Feb 1, 2017

Transation 'firstTransaction' has two insert queries Q1 and Q2. In the following example query Q2 fails due to primary key constraint violation.

      @Test
    public void test() {
        Observable<Integer> firstTransactionFirstPart = database.update(queryThatInsertsinTableWithPrimaryKeyA)
                .dependsOn(database.beginTransaction())
                .parameters(new Object[0])
                .count();
        Observable<Integer> firstTransactionSecondPart = database.update(queryThatInsertsinTableWithPrimaryKeyA)
                .dependsOn(firstTransactionFirstPart)
                .parameters(new Object[0])
                .count();
        Observable<Boolean> firstTransaction = database.commit(firstTransactionSecondPart);
        firstTransaction.subscribe(...);//logging

If we attempt to run some another subsequent transaction, we get an exception

    com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:92)
at com.github.davidmoten.rx.jdbc.ConnectionProviderSingletonManualCommit.get(ConnectionProviderSingletonManualCommit.java:43)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:127)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:80)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:8460)

Max pool size is 1. (it is reproducible on even for other values). Full method impl
Versions:
rxjava-jdbc: 0.7.2 , 0.7.3
Hikari: 2.5.1
postgresql: 9.4.1212
Most likely m doing wrong, but m not able to figure out :(

@ssumit
Copy link
Author

ssumit commented Feb 2, 2017

If i set hikari data source explicitly with leakDetection threshold = 2000 millis, i get the following exceptn and then SQLTransientConnectionException for the next transaction as before

[jdbc:postgresql://localhost:5432/kronos?user=postgres housekeeper] WARN  [c.z.h.p.ProxyLeakTask] {{appResourceId,sumit@dev}} - Connection leak detection triggered for org.postgresql.jdbc.PgConnection@7bc9e6ab, stack trace follows java.lang.Exception: Apparent connection leak detected
	at com.github.davidmoten.rx.jdbc.ConnectionProviderFromDataSource.get(ConnectionProviderFromDataSource.java:30)
	at com.github.davidmoten.rx.jdbc.ConnectionProviderSingletonManualCommit.get(ConnectionProviderSingletonManualCommit.java:43)
	at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:127)
	at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:80)
	at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
	at rx.Observable.unsafeSubscribe(Observable.java:8460)
	at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
	at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:76)
	at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:55)
	at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
	at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)

Also, how to recover from this situation, since that connection cannot be used anymore.

@ssumit ssumit closed this as completed Feb 2, 2017
@ssumit
Copy link
Author

ssumit commented Feb 3, 2017

Reopening as I cant find community to help me debug this.
i have tried two more implementations:

        Observable<Boolean> firstTransaction =
                Observable
                        .just(1)
                        .compose(database.beginTransactionOnNext_())
                        .compose(database.update(query).dependsOnTransformer())
                        .compose(database.update(query).dependsOnTransformer())
                        .compose(database.commitOnNext_());

and

        Observable<Boolean> firstTransaction =
                Observable
                        .just(1)
                        .compose(database.beginTransactionOnNext_())
                        .map(toEmpty())
                        .compose(database.update(query).parameterListTransformer())
                        .compose(RxUtil.<Integer> flatten())
                        .map(toEmpty())
                        .compose(database.update(query).parameterListTransformer())
                        .compose(RxUtil.<Integer> flatten())
                        .compose(database.commitOnNext_());

Even using above styles is giving me same result.

@ssumit ssumit reopened this Feb 3, 2017
@ssumit ssumit changed the title Failures in Transaction results in connection-timeout for subsequent requests Example to handle failures in Transaction: Getting connection-timeout for subsequent requests Feb 5, 2017
@ssumit
Copy link
Author

ssumit commented Feb 5, 2017

As expected, the issue is that the sql query in QueryUpdateOnSubscribe object should be a commit or a rollback and then it would call Util::closeQuietly(connection) directly. However, in the case of failure, none of them is getting called as I am not able to handle failure (tried onErrorResumeNext()/onExceptionResumeNext)

Internal Details:
Issue: ProxyConnection::close is not getting called
Debugger based observation for two update queries in a transaction where the second update is getting failed:

Current flow (important steps):
QueryUpdateTransformerFromObservable::call
QueryUpdateOnSubscribe::call
QueryUpdateOnSubscribe::performUpdate
QueryUpdateTransformerFromObservable::call
QueryUpdateOnSubscribe::performUpdate
this results in SQL Exception which it catches and we go back to QueryUpdateOnSubscribe::call in the catch block
we end up calling Util::closeQuietly(PrepareStatement) and then "if not commit and if not rollback" then closeQuietlyIfAutoCommit. (which is getting called but the connection is not auto-commit in this case)
Hence Util::closeQuietly(connection) is not getting called

@davidmoten
Copy link
Owner

Thanks for the report. I'm a bit limited in time at the moment but I'd like to have a look at it soon.

@ssumit
Copy link
Author

ssumit commented Feb 6, 2017

so i have applied a patch locally that works for my use-case now, basically
on getting exception
Database::endTransactionObserve & Database::endTransactionSubscribe is invoked and the current connection provider is lost. So any subsequent rollback is treated differently and will block on getting connection (assuming there is just 1 connection).
Thus I made changes in QueryUpdateOnSubscribe and QuerySelectOnSubscribe, they basically close the connection if auto-commit is false. (additionally Util.rollback is called but hikariCP ProxyConnection seems to handle these cases anyway). Therefore i do not have to call rollback explicitly on error.

@zsiegel
Copy link
Contributor

zsiegel commented Mar 1, 2017

I think I am running into the same issue as well. I have the following...

        Observable<Boolean> firstTransaction =
            Observable
                .just(1)
                .compose(db.beginTransactionOnNext_())
                .compose(db.update("insert into table1").dependsOnTransformer())
                .compose(db.update("insert into table2 query exception").dependsOnTransformer())
                .compose(db.commitOnNext_());


        firstTransaction.count().subscribe(next -> {}, err -> {}, () -> {});

        System.out.println("DONE");
        db.update("delete from table1").execute();
        db.update("delete from table2").execute();
        System.out.println("CLEARED");

When I run this query 1 completes but I deliberately cause a query exception on the table 2 operations. When I do this the thread hangs at the first delete table statement and never completes.

Looking into the MySQL transactions table I see an outstanding tx (which I believe is the operation that did not close/rollback properly) and I see a transaction for the "delete from table" that is waiting.

I used SELECT * FROM information_schema.INNODB_TRX; to look into the current transactions although that may not be 100% correct.

@zsiegel
Copy link
Contributor

zsiegel commented Mar 1, 2017

Upon further debugging it seems this for me is related to autocommit.

Since the first query completes successfully and autocommit is on by default the connection and transaction are closed and then a new connection but no transaction is made :(. Not sure how to go about dealing with this. It feels like the nature of the compose does not play well with auto commit.

@ssumit
Copy link
Author

ssumit commented Mar 6, 2017

Your observations are in sync with mine. As per my understanding, the auto-commit is off in case of transactions and on for normal queries. We only need to handle the case when auto-commit is off.
In case of exception inside a transaction, the catch block in QueryUpdateOnSubscribe::call catches it. We need to make sure that the db connection is then closed. Once the connection is closed, subsequent queries won't wait/timeout.

You can try this:
Edit call method in QueryUpdateOnSubscribe.java
Inside the catch block, add the following before the existing code:

                if (!state.con.getAutoCommit()) {
                    performRollbackForcefully(state);
                }

and then define a private method like following

    private void performRollbackForcefully(State state) {
        debug("rolling back forcefully");
        query.context().endTransactionObserve();
        Conditions.checkTrue(!Util.isAutoCommit(state.con));
        Util.rollback(state.con);
        if (state.closed.compareAndSet(false, true)) {
            Util.closeQuietly(state.ps);
            Util.closeQuietly(state.con);
        }
        debug("rolled back");
    }

@jacinpoz
Copy link

I think I am running into the same issue. Does the catch block look like this after your changes?

catch (Throwable e) {
            try {
                if (!state.con.getAutoCommit()) {
                    performRollbackForcefully(state);
                }
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
            query.context().endTransactionObserve();
            query.context().endTransactionSubscribe();
            try {
                close(state);
            } finally {
                handleException(e, subscriber);
            }
        }

My tests seem to behave much better after this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants