From 0b41c1c224e1667ca62e921dd87b82145f3db4ca Mon Sep 17 00:00:00 2001 From: Mirro Mutth Date: Thu, 7 Nov 2019 18:26:51 +0800 Subject: [PATCH] Propagete errors and discards for queue --- .../mysql/ParametrizedMySqlStatement.java | 4 +- .../java/dev/miku/r2dbc/mysql/QueryFlow.java | 4 +- .../mysql/client/ReactorNettyClient.java | 80 ++++++-------- .../miku/r2dbc/mysql/client/RequestQueue.java | 76 +++++++------ .../miku/r2dbc/mysql/client/RequestTask.java | 73 ++++++++++++ .../PrepareQueryIntegrationTestSupport.java | 8 +- .../r2dbc/mysql/client/RequestQueueTest.java | 104 ++++++++++++++++-- .../mysql/util/FluxDiscardOnCancelTest.java | 4 +- 8 files changed, 254 insertions(+), 99 deletions(-) create mode 100644 src/main/java/dev/miku/r2dbc/mysql/client/RequestTask.java diff --git a/src/main/java/dev/miku/r2dbc/mysql/ParametrizedMySqlStatement.java b/src/main/java/dev/miku/r2dbc/mysql/ParametrizedMySqlStatement.java index 7a87780e..f42d3258 100644 --- a/src/main/java/dev/miku/r2dbc/mysql/ParametrizedMySqlStatement.java +++ b/src/main/java/dev/miku/r2dbc/mysql/ParametrizedMySqlStatement.java @@ -18,8 +18,8 @@ import dev.miku.r2dbc.mysql.client.Client; import dev.miku.r2dbc.mysql.codec.Codecs; -import dev.miku.r2dbc.mysql.util.ConnectionContext; import dev.miku.r2dbc.mysql.message.ParameterValue; +import dev.miku.r2dbc.mysql.util.ConnectionContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -127,7 +127,7 @@ public Flux execute() { return Flux.defer(() -> { if (!executed.compareAndSet(false, true)) { - throw new IllegalStateException("Statement was already executed"); + return Flux.error(new IllegalStateException("Statement was already executed")); } String sql = query.getSql(); diff --git a/src/main/java/dev/miku/r2dbc/mysql/QueryFlow.java b/src/main/java/dev/miku/r2dbc/mysql/QueryFlow.java index 7a2b0aad..9f77f9d6 100644 --- a/src/main/java/dev/miku/r2dbc/mysql/QueryFlow.java +++ b/src/main/java/dev/miku/r2dbc/mysql/QueryFlow.java @@ -104,8 +104,8 @@ static Flux execute(Client client, String sql, int statementId, L return OperatorUtils.discardOnCancel(Flux.fromIterable(bindings)) .doOnDiscard(Binding.class, CLEAR) - .concatMap(binding -> OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(statementId), EXECUTE_DONE) - .doOnDiscard(ReferenceCounted.class, RELEASE)) + .concatMap(binding -> OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(statementId), EXECUTE_DONE)) + .doOnDiscard(ReferenceCounted.class, RELEASE) .handle(handler)); } diff --git a/src/main/java/dev/miku/r2dbc/mysql/client/ReactorNettyClient.java b/src/main/java/dev/miku/r2dbc/mysql/client/ReactorNettyClient.java index 8fa62dba..5f37f94d 100644 --- a/src/main/java/dev/miku/r2dbc/mysql/client/ReactorNettyClient.java +++ b/src/main/java/dev/miku/r2dbc/mysql/client/ReactorNettyClient.java @@ -53,10 +53,6 @@ final class ReactorNettyClient implements Client { private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class); - private static final Function, Flux> IDENTITY = Function.identity(); - - private static final Function, Mono> NEXT = Flux::next; - private static final Consumer INFO_LOGGING = ReactorNettyClient::infoLogging; private static final Consumer DEBUG_LOGGING = message -> { @@ -124,14 +120,17 @@ public Flux exchange(ExchangeableMessage request, Predicate>create(sink -> { if (!isConnected()) { + if (request instanceof Disposable) { + ((Disposable) request).dispose(); + } sink.error(new IllegalStateException("Cannot send messages because the connection is closed")); return; } - requestQueue.submit(DisposableExchange.wrap(request, () -> { + requestQueue.submit(RequestTask.wrap(request, sink, () -> { boolean[] completed = new boolean[]{false}; - sink.success(send(request) + return send(request) .thenMany(responseProcessor) .handle((message, response) -> { response.next(message); @@ -142,23 +141,26 @@ public Flux exchange(ExchangeableMessage request, Predicate sendOnly(SendOnlyMessage message) { requireNonNull(message, "message must not be null"); - return Mono.create(sink -> { + return Mono.>create(sink -> { if (!isConnected()) { + if (message instanceof Disposable) { + ((Disposable) message).dispose(); + } sink.error(new IllegalStateException("Cannot send messages because the connection is closed")); return; } - requestQueue.submit(() -> send(message).doOnTerminate(requestQueue).subscribe(null, sink::error, sink::success)); - }); + requestQueue.submit(RequestTask.wrap(message, sink, () -> send(message).doOnTerminate(requestQueue))); + }).flatMap(identity()); } @Override @@ -169,34 +171,33 @@ public Mono receiveOnly() { return; } - requestQueue.submit(() -> { + requestQueue.submit(RequestTask.wrap(sink, () -> { boolean[] completed = new boolean[]{false}; - sink.success(responseProcessor.next() - .doOnNext(ignored -> completed[0] = true) + return responseProcessor.next() + .doOnSuccess(ignored -> completed[0] = true) .doOnTerminate(requestQueue) - .doOnCancel(exchangeCancel(completed))); - }); - }).flatMap(Function.identity()); + .doOnCancel(exchangeCancel(completed)); + })); + }).flatMap(identity()); } @Override public Mono close() { - return Mono.create(sink -> { + return Mono.>create(sink -> { if (!closing.compareAndSet(false, true)) { // client is closing or closed sink.success(); return; } - requestQueue.submit(() -> send(ExitMessage.getInstance()) + requestQueue.submit(RequestTask.wrap(sink, () -> send(ExitMessage.getInstance()) .onErrorResume(e -> { logger.error("Exit message sending failed, force closing", e); return Mono.empty(); }) - .concatWith(forceClose()) - .subscribe(null, sink::error, sink::success)); - }); + .then(forceClose()))); + }).flatMap(identity()); } @Override @@ -258,37 +259,18 @@ private static void infoLogging(ServerMessage message) { } } - private static final class DisposableExchange implements Runnable, Disposable { - - private final Runnable runnable; + @SuppressWarnings("unchecked") + private static Function identity() { + return (Function) Identity.INSTANCE; + } - private final Disposable disposable; + private static final class Identity implements Function { - private DisposableExchange(Runnable runnable, Disposable disposable) { - this.runnable = runnable; - this.disposable = disposable; - } + private static final Identity INSTANCE = new Identity(); @Override - public void dispose() { - disposable.dispose(); - } - - @Override - public boolean isDisposed() { - return disposable.isDisposed(); - } - - @Override - public void run() { - runnable.run(); - } - - private static Runnable wrap(ClientMessage request, Runnable runnable) { - if (request instanceof Disposable) { - return new DisposableExchange(runnable, (Disposable) request); - } - return runnable; + public Object apply(Object o) { + return o; } } } diff --git a/src/main/java/dev/miku/r2dbc/mysql/client/RequestQueue.java b/src/main/java/dev/miku/r2dbc/mysql/client/RequestQueue.java index 7282b9e7..55b71a15 100644 --- a/src/main/java/dev/miku/r2dbc/mysql/client/RequestQueue.java +++ b/src/main/java/dev/miku/r2dbc/mysql/client/RequestQueue.java @@ -16,7 +16,7 @@ package dev.miku.r2dbc.mysql.client; -import reactor.core.Disposable; +import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; import java.util.Queue; @@ -49,32 +49,34 @@ abstract class ActiveStatus extends LeftPadding { *

* Submission conditionally queues requests if an ongoing exchange was active by the time of subscription. * Drains queued commands on exchange completion if there are queued commands or disable active flag. + *

+ * It should discard all tasks when it is discarded by connection. */ final class RequestQueue extends ActiveStatus implements Runnable { - private final Queue queue = Queues.small().get(); + private final Queue> queue = Queues.>small().get(); + + @Nullable + private volatile RuntimeException disposed; /** * Current exchange completed, refresh to next exchange or set to inactive. */ @Override public void run() { - Runnable exchange = queue.poll(); + RequestTask task = queue.poll(); - if (exchange == null) { + if (task == null) { // Queue was empty, set it to idle if it is not disposed. STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE); } else { int status = this.status; if (status == DISPOSE) { - // Disposed, release polled and should clear queue because don't sure it is empty. - if (exchange instanceof Disposable) { - ((Disposable) exchange).dispose(); - } - freeAll(); + // Cancel and no need clear queue because it should be cleared by other one. + task.cancel(requireDisposed()); } else { - exchange.run(); + task.run(); } } } @@ -83,12 +85,12 @@ public void run() { * Submit an exchange task. If the queue is inactive, it will execute directly rather than queuing. * Otherwise it will be queuing. * - * @param exchange the exchange task includes request messages sending and response messages processor. + * @param task the exchange task includes request messages sending and response messages processor. */ - void submit(Runnable exchange) { + void submit(RequestTask task) { if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) { // Fast path for general way. - exchange.run(); + task.run(); return; } @@ -96,20 +98,15 @@ void submit(Runnable exchange) { int status = this.status; if (status == DISPOSE) { - // Disposed and no need clear queue because it should be cleared by other one. - if (exchange instanceof Disposable) { - ((Disposable) exchange).dispose(); - } - throw new IllegalStateException("Request queue was disposed"); + // Cancel and no need clear queue because it should be cleared by other one. + task.cancel(requireDisposed()); + return; } // Prev task may be completing before queue offer, so queue may be idle now. - if (!queue.offer(exchange)) { - if (exchange instanceof Disposable) { - ((Disposable) exchange).dispose(); - } - // Even disposed now, no need clear queue because it should be cleared by other one. - throw new IllegalStateException("Request queue is full"); + if (!queue.offer(task)) { + task.cancel(new IllegalStateException("Request queue is full")); + return; } if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) { @@ -121,8 +118,7 @@ void submit(Runnable exchange) { if (status == DISPOSE) { // Disposed, should clear queue because an element has just been offered to the queue. - freeAll(); - throw new IllegalStateException("Request queue was disposed"); + cancelAll(requireDisposed()); } } } @@ -140,16 +136,32 @@ long keeping(int v) { void dispose() { STATUS_UPDATER.set(this, DISPOSE); - freeAll(); + cancelAll(requireDisposed()); } - private void freeAll() { - Runnable runnable; + private RuntimeException requireDisposed() { + RuntimeException disposed = this.disposed; + + if (disposed == null) { + synchronized (this) { + disposed = this.disposed; - while ((runnable = queue.poll()) != null) { - if (runnable instanceof Disposable) { - ((Disposable) runnable).dispose(); + if (disposed == null) { + this.disposed = disposed = new IllegalStateException("Request queue was disposed"); + } + + return disposed; } } + + return disposed; + } + + private void cancelAll(RuntimeException e) { + RequestTask task; + + while ((task = queue.poll()) != null) { + task.cancel(e); + } } } diff --git a/src/main/java/dev/miku/r2dbc/mysql/client/RequestTask.java b/src/main/java/dev/miku/r2dbc/mysql/client/RequestTask.java new file mode 100644 index 00000000..f59999ad --- /dev/null +++ b/src/main/java/dev/miku/r2dbc/mysql/client/RequestTask.java @@ -0,0 +1,73 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.miku.r2dbc.mysql.client; + +import dev.miku.r2dbc.mysql.message.client.ClientMessage; +import reactor.core.Disposable; +import reactor.core.publisher.MonoSink; +import reactor.util.annotation.Nullable; + +import java.util.function.Supplier; + +/** + * A task for execute, propagate errors and release resources. + *

+ * If task executed, resources should been released by {@code supplier} instead of task self. + */ +final class RequestTask { + + @Nullable + private final Disposable disposable; + + private final MonoSink sink; + + private final Supplier supplier; + + private RequestTask(@Nullable Disposable disposable, MonoSink sink, Supplier supplier) { + this.disposable = disposable; + this.sink = sink; + this.supplier = supplier; + } + + void run() { + sink.success(supplier.get()); + } + + /** + * Cancel task and release resources. + * + * @param e canceled by which error + */ + void cancel(Throwable e) { + if (disposable != null) { + disposable.dispose(); + } + sink.error(e); + } + + static RequestTask wrap(ClientMessage message, MonoSink sink, Supplier supplier) { + if (message instanceof Disposable) { + return new RequestTask<>((Disposable) message, sink, supplier); + } + + return new RequestTask<>(null, sink, supplier); + } + + static RequestTask wrap(MonoSink sink, Supplier supplier) { + return new RequestTask<>(null, sink, supplier); + } +} diff --git a/src/test/java/dev/miku/r2dbc/mysql/PrepareQueryIntegrationTestSupport.java b/src/test/java/dev/miku/r2dbc/mysql/PrepareQueryIntegrationTestSupport.java index 2e4eb68f..aa45d5c2 100644 --- a/src/test/java/dev/miku/r2dbc/mysql/PrepareQueryIntegrationTestSupport.java +++ b/src/test/java/dev/miku/r2dbc/mysql/PrepareQueryIntegrationTestSupport.java @@ -188,12 +188,12 @@ void ignoreResult() { .flatMap(IntegrationTestSupport::extractRowsUpdated) .then(Mono.from(connection.createStatement("INSERT INTO test(`value`) VALUES (1),(2),(3),(4),(5)").execute())) .flatMap(IntegrationTestSupport::extractRowsUpdated) - .then(Mono.from(connection.createStatement("SELECT value FROM test WHERE id > ?").bind(0, 0).execute())) - .then(Mono.from(connection.createStatement("SELECT value FROM test ORDER BY id DESC LIMIT ?,?") + .thenMany(connection.createStatement("SELECT value FROM test WHERE id > ?").bind(0, 0).execute()) + .thenMany(connection.createStatement("SELECT value FROM test ORDER BY id DESC LIMIT ?,?") .bind(0, 2) .bind(1, 5) - .execute())) - .flatMapMany(r -> r.map((row, metadata) -> row.get(0, Integer.TYPE))) + .execute()) + .flatMap(r -> r.map((row, metadata) -> row.get(0, Integer.TYPE))) .concatWith(close(connection)) ) .as(StepVerifier::create) diff --git a/src/test/java/dev/miku/r2dbc/mysql/client/RequestQueueTest.java b/src/test/java/dev/miku/r2dbc/mysql/client/RequestQueueTest.java index 3da6c243..dd0d8899 100644 --- a/src/test/java/dev/miku/r2dbc/mysql/client/RequestQueueTest.java +++ b/src/test/java/dev/miku/r2dbc/mysql/client/RequestQueueTest.java @@ -16,12 +16,24 @@ package dev.miku.r2dbc.mysql.client; +import dev.miku.r2dbc.mysql.message.client.ClientMessage; +import dev.miku.r2dbc.mysql.util.ConnectionContext; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.IllegalReferenceCountException; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -35,9 +47,14 @@ void submit() { RequestQueue queue = new RequestQueue(); List arr = new AddEventList(queue); - queue.submit(() -> arr.add(1)); - queue.submit(() -> arr.add(2)); - queue.submit(() -> arr.add(3)); + Mono third = Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(3)))); + Mono second = Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(2)))); + Mono first = Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(1)))); + + Flux.concat(first, second, third) + .as(StepVerifier::create) + .expectNext(true, true, true) + .verifyComplete(); assertEquals(arr, Arrays.asList(1, 2, 3)); } @@ -47,13 +64,53 @@ void dispose() { RequestQueue queue = new RequestQueue(); List arr = new AddEventList(queue); - queue.submit(() -> arr.add(1)); - queue.submit(() -> arr.add(2)); + Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(5)))) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(4)))) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + queue.dispose(); + Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(3)))) + .as(StepVerifier::create) + .verifyError(IllegalStateException.class); + Mono.create(sink -> queue.submit(RequestTask.wrap(sink, () -> arr.add(2)))) + .as(StepVerifier::create) + .verifyError(IllegalStateException.class); + + assertEquals(arr, Arrays.asList(5, 4)); + } + + @Test + void disposeWithRelease() { + RequestQueue queue = new RequestQueue(); + IntegerData[] sources = new IntegerData[]{new IntegerData(1), new IntegerData(2), new IntegerData(3), new IntegerData(4)}; + List arr = new AddEventList(queue); + + Mono.create(sink -> queue.submit(RequestTask.wrap(sources[0], sink, () -> arr.add(sources[0].consumeData())))) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + Mono.create(sink -> queue.submit(RequestTask.wrap(sources[1], sink, () -> arr.add(sources[1].consumeData())))) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); queue.dispose(); - assertThrows(IllegalStateException.class, () -> queue.submit(() -> arr.add(3))); - assertThrows(IllegalStateException.class, () -> queue.submit(() -> arr.add(4))); + Mono.create(sink -> queue.submit(RequestTask.wrap(sources[2], sink, () -> { + throw new Error(); + }))) + .as(StepVerifier::create) + .verifyError(IllegalStateException.class); + Mono.create(sink -> queue.submit(RequestTask.wrap(sources[3], sink, () -> { + throw new Error(); + }))) + .as(StepVerifier::create) + .verifyError(IllegalStateException.class); assertEquals(arr, Arrays.asList(1, 2)); + assertThat(sources).extracting(Disposable::isDisposed).containsOnly(true); } @Test @@ -63,6 +120,39 @@ void keeping() { assertEquals(queue.keeping(-1), -1L); } + private static final class IntegerData extends AtomicInteger implements ClientMessage, Disposable { + + private final int data; + + IntegerData(int data) { + super(1); + this.data = data; + } + + @Override + public Publisher encode(ByteBufAllocator allocator, ConnectionContext context) { + return Mono.empty(); + } + + int consumeData() { + dispose(); + return data; + } + + @Override + public void dispose() { + int last = getAndDecrement(); + if (last <= 0) { + throw new IllegalReferenceCountException(last); + } + } + + @Override + public boolean isDisposed() { + return get() <= 0; + } + } + private static final class AddEventList extends ArrayList { private final RequestQueue queue; diff --git a/src/test/java/dev/miku/r2dbc/mysql/util/FluxDiscardOnCancelTest.java b/src/test/java/dev/miku/r2dbc/mysql/util/FluxDiscardOnCancelTest.java index 50f09fff..4c3c72d2 100644 --- a/src/test/java/dev/miku/r2dbc/mysql/util/FluxDiscardOnCancelTest.java +++ b/src/test/java/dev/miku/r2dbc/mysql/util/FluxDiscardOnCancelTest.java @@ -24,7 +24,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; import reactor.test.StepVerifier; -import reactor.test.StepVerifierOptions; import java.util.ArrayList; import java.util.Iterator; @@ -80,7 +79,6 @@ void allRelease() { .collect(Collectors.toList()); Flux.fromIterable(rows) - .handle((it, sink) -> sink.next(it)) .as(OperatorUtils::discardOnCancel) .doOnDiscard(ReferenceCounted.class, ReferenceCounted::release) .handle((it, sink) -> { @@ -90,7 +88,7 @@ void allRelease() { it.release(); } }) - .as(it -> StepVerifier.create(it, StepVerifierOptions.create().initialRequest(0))) + .as(it -> StepVerifier.create(it, 0)) .thenRequest(2) .expectNext(0, 1) .thenCancel()