From 5ded04dcde74528b94a0c8f938a67fd36d38382e Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 14 Oct 2024 17:04:20 +0100 Subject: [PATCH] Added transaction closing on stream cancelation --- .../java/tech/ydb/query/impl/SessionImpl.java | 8 ++++++++ .../ydb/query/impl/QueryIntegrationTest.java | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index ae0ef980..25a027df 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -375,6 +375,14 @@ void handleCompletion(Status status, Throwable th) { + status, Issue.Severity.ERROR))); } } + + @Override + public void cancel() { + super.cancel(); + if (txId.compareAndSet(currentId, null)) { + logger.warn("{} transaction with id {} was cancelled", SessionImpl.this, currentId); + } + } }; } diff --git a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java index 0894d921..24b05402 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java @@ -217,6 +217,26 @@ public void testCancelStream() { try (QuerySession s4 = client.createSession(Duration.ofSeconds(5)).join().getValue()) { Assert.assertNotEquals(id, s4.getId()); + id = s4.getId(); + + QueryTransaction tx = s4.beginTransaction(TxMode.SERIALIZABLE_RW).join().getValue(); + Assert.assertTrue(tx.isActive()); + + final QueryStream query = tx.createQuery("SELECT 2 + 2;"); + final CompletableFuture stop = new CompletableFuture<>(); + CompletableFuture> future = query.execute(part -> { + stop.join(); + printQuerySetPart(part); + }); + query.cancel(); + stop.complete(null); + Result result = future.join(); + Assert.assertEquals(StatusCode.CLIENT_CANCELLED, result.getStatus().getCode()); + Assert.assertFalse(tx.isActive()); + } + + try (QuerySession s5 = client.createSession(Duration.ofSeconds(5)).join().getValue()) { + Assert.assertNotEquals(id, s5.getId()); } } }