Skip to content

Commit 6161fb8

Browse files
authored
Merge pull request #331 from alex268/master
Added transaction closing on stream cancelation
2 parents 7722d88 + 5ded04d commit 6161fb8

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,14 @@ void handleCompletion(Status status, Throwable th) {
375375
+ status, Issue.Severity.ERROR)));
376376
}
377377
}
378+
379+
@Override
380+
public void cancel() {
381+
super.cancel();
382+
if (txId.compareAndSet(currentId, null)) {
383+
logger.warn("{} transaction with id {} was cancelled", SessionImpl.this, currentId);
384+
}
385+
}
378386
};
379387
}
380388

query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,26 @@ public void testCancelStream() {
217217

218218
try (QuerySession s4 = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
219219
Assert.assertNotEquals(id, s4.getId());
220+
id = s4.getId();
221+
222+
QueryTransaction tx = s4.beginTransaction(TxMode.SERIALIZABLE_RW).join().getValue();
223+
Assert.assertTrue(tx.isActive());
224+
225+
final QueryStream query = tx.createQuery("SELECT 2 + 2;");
226+
final CompletableFuture<Void> stop = new CompletableFuture<>();
227+
CompletableFuture<Result<QueryInfo>> future = query.execute(part -> {
228+
stop.join();
229+
printQuerySetPart(part);
230+
});
231+
query.cancel();
232+
stop.complete(null);
233+
Result<QueryInfo> result = future.join();
234+
Assert.assertEquals(StatusCode.CLIENT_CANCELLED, result.getStatus().getCode());
235+
Assert.assertFalse(tx.isActive());
236+
}
237+
238+
try (QuerySession s5 = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
239+
Assert.assertNotEquals(id, s5.getId());
220240
}
221241
}
222242
}

0 commit comments

Comments
 (0)