Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Propagete errors and discards for queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mirromutth committed Nov 7, 2019
1 parent 51792d2 commit 8ee8c44
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.codec.Codecs;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import dev.miku.r2dbc.mysql.message.ParameterValue;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -127,7 +127,7 @@ public Flux<MySqlResult> execute() {

return Flux.defer(() -> {
if (!executed.compareAndSet(false, true)) {
throw new IllegalStateException("Statement was already executed");
return Flux.error(new IllegalStateException("Statement was already executed"));
}

String sql = query.getSql();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/miku/r2dbc/mysql/QueryFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ static Flux<ServerMessage> execute(Client client, String sql, int statementId, L

return OperatorUtils.discardOnCancel(Flux.fromIterable(bindings))
.doOnDiscard(Binding.class, CLEAR)
.concatMap(binding -> OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(statementId), EXECUTE_DONE)
.doOnDiscard(ReferenceCounted.class, RELEASE))
.concatMap(binding -> OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(statementId), EXECUTE_DONE))
.doOnDiscard(ReferenceCounted.class, RELEASE)
.handle(handler));
}

Expand Down
80 changes: 31 additions & 49 deletions src/main/java/dev/miku/r2dbc/mysql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ final class ReactorNettyClient implements Client {

private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class);

private static final Function<Flux<ServerMessage>, Flux<ServerMessage>> IDENTITY = Function.identity();

private static final Function<Flux<ServerMessage>, Mono<ServerMessage>> NEXT = Flux::next;

private static final Consumer<ServerMessage> INFO_LOGGING = ReactorNettyClient::infoLogging;

private static final Consumer<ServerMessage> DEBUG_LOGGING = message -> {
Expand Down Expand Up @@ -124,14 +120,17 @@ public Flux<ServerMessage> exchange(ExchangeableMessage request, Predicate<Serve

return Mono.<Flux<ServerMessage>>create(sink -> {
if (!isConnected()) {
if (request instanceof Disposable) {
((Disposable) request).dispose();
}
sink.error(new IllegalStateException("Cannot send messages because the connection is closed"));
return;
}

requestQueue.submit(DisposableExchange.wrap(request, () -> {
requestQueue.submit(RequestTask.wrap(request, sink, () -> {
boolean[] completed = new boolean[]{false};

sink.success(send(request)
return send(request)
.thenMany(responseProcessor)
.<ServerMessage>handle((message, response) -> {
response.next(message);
Expand All @@ -142,23 +141,26 @@ public Flux<ServerMessage> exchange(ExchangeableMessage request, Predicate<Serve
}
})
.doOnTerminate(requestQueue)
.doOnCancel(exchangeCancel(completed)));
.doOnCancel(exchangeCancel(completed));
}));
}).flatMapMany(IDENTITY);
}).flatMapMany(identity());
}

@Override
public Mono<Void> sendOnly(SendOnlyMessage message) {
requireNonNull(message, "message must not be null");

return Mono.create(sink -> {
return Mono.<Mono<Void>>create(sink -> {
if (!isConnected()) {
if (message instanceof Disposable) {
((Disposable) message).dispose();
}
sink.error(new IllegalStateException("Cannot send messages because the connection is closed"));
return;
}

requestQueue.submit(() -> send(message).doOnTerminate(requestQueue).subscribe(null, sink::error, sink::success));
});
requestQueue.submit(RequestTask.wrap(message, sink, () -> send(message).doOnTerminate(requestQueue)));
}).flatMap(identity());
}

@Override
Expand All @@ -169,34 +171,33 @@ public Mono<ServerMessage> receiveOnly() {
return;
}

requestQueue.submit(() -> {
requestQueue.submit(RequestTask.wrap(sink, () -> {
boolean[] completed = new boolean[]{false};

sink.success(responseProcessor.next()
.doOnNext(ignored -> completed[0] = true)
return responseProcessor.next()
.doOnSuccess(ignored -> completed[0] = true)
.doOnTerminate(requestQueue)
.doOnCancel(exchangeCancel(completed)));
});
}).flatMap(Function.identity());
.doOnCancel(exchangeCancel(completed));
}));
}).flatMap(identity());
}

@Override
public Mono<Void> close() {
return Mono.create(sink -> {
return Mono.<Mono<Void>>create(sink -> {
if (!closing.compareAndSet(false, true)) {
// client is closing or closed
sink.success();
return;
}

requestQueue.submit(() -> send(ExitMessage.getInstance())
requestQueue.submit(RequestTask.wrap(sink, () -> send(ExitMessage.getInstance())
.onErrorResume(e -> {
logger.error("Exit message sending failed, force closing", e);
return Mono.empty();
})
.concatWith(forceClose())
.subscribe(null, sink::error, sink::success));
});
.then(forceClose())));
}).flatMap(identity());
}

@Override
Expand Down Expand Up @@ -258,37 +259,18 @@ private static void infoLogging(ServerMessage message) {
}
}

private static final class DisposableExchange implements Runnable, Disposable {

private final Runnable runnable;
@SuppressWarnings("unchecked")
private static <T> Function<T, T> identity() {
return (Function<T, T>) Identity.INSTANCE;
}

private final Disposable disposable;
private static final class Identity implements Function<Object, Object> {

private DisposableExchange(Runnable runnable, Disposable disposable) {
this.runnable = runnable;
this.disposable = disposable;
}
private static final Identity INSTANCE = new Identity();

@Override
public void dispose() {
disposable.dispose();
}

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void run() {
runnable.run();
}

private static Runnable wrap(ClientMessage request, Runnable runnable) {
if (request instanceof Disposable) {
return new DisposableExchange(runnable, (Disposable) request);
}
return runnable;
public Object apply(Object o) {
return o;
}
}
}
76 changes: 44 additions & 32 deletions src/main/java/dev/miku/r2dbc/mysql/client/RequestQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package dev.miku.r2dbc.mysql.client;

import reactor.core.Disposable;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

import java.util.Queue;
Expand Down Expand Up @@ -49,32 +49,34 @@ abstract class ActiveStatus extends LeftPadding {
* <p>
* Submission conditionally queues requests if an ongoing exchange was active by the time of subscription.
* Drains queued commands on exchange completion if there are queued commands or disable active flag.
* <p>
* It should discard all tasks when it is discarded by connection.
*/
final class RequestQueue extends ActiveStatus implements Runnable {

private final Queue<Runnable> queue = Queues.<Runnable>small().get();
private final Queue<RequestTask<?>> queue = Queues.<RequestTask<?>>small().get();

@Nullable
private volatile RuntimeException disposed;

/**
* Current exchange completed, refresh to next exchange or set to inactive.
*/
@Override
public void run() {
Runnable exchange = queue.poll();
RequestTask<?> task = queue.poll();

if (exchange == null) {
if (task == null) {
// Queue was empty, set it to idle if it is not disposed.
STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE);
} else {
int status = this.status;

if (status == DISPOSE) {
// Disposed, release polled and should clear queue because don't sure it is empty.
if (exchange instanceof Disposable) {
((Disposable) exchange).dispose();
}
freeAll();
// Cancel and no need clear queue because it should be cleared by other one.
task.cancel(requireDisposed());
} else {
exchange.run();
task.run();
}
}
}
Expand All @@ -83,33 +85,28 @@ public void run() {
* Submit an exchange task. If the queue is inactive, it will execute directly rather than queuing.
* Otherwise it will be queuing.
*
* @param exchange the exchange task includes request messages sending and response messages processor.
* @param task the exchange task includes request messages sending and response messages processor.
*/
void submit(Runnable exchange) {
<T> void submit(RequestTask<T> task) {
if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) {
// Fast path for general way.
exchange.run();
task.run();
return;
}

// Check dispose after fast path failed.
int status = this.status;

if (status == DISPOSE) {
// Disposed and no need clear queue because it should be cleared by other one.
if (exchange instanceof Disposable) {
((Disposable) exchange).dispose();
}
throw new IllegalStateException("Request queue was disposed");
// Cancel and no need clear queue because it should be cleared by other one.
task.cancel(requireDisposed());
return;
}

// Prev task may be completing before queue offer, so queue may be idle now.
if (!queue.offer(exchange)) {
if (exchange instanceof Disposable) {
((Disposable) exchange).dispose();
}
// Even disposed now, no need clear queue because it should be cleared by other one.
throw new IllegalStateException("Request queue is full");
if (!queue.offer(task)) {
task.cancel(new IllegalStateException("Request queue is full"));
return;
}

if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) {
Expand All @@ -121,8 +118,7 @@ void submit(Runnable exchange) {

if (status == DISPOSE) {
// Disposed, should clear queue because an element has just been offered to the queue.
freeAll();
throw new IllegalStateException("Request queue was disposed");
cancelAll(requireDisposed());
}
}
}
Expand All @@ -140,16 +136,32 @@ long keeping(int v) {

void dispose() {
STATUS_UPDATER.set(this, DISPOSE);
freeAll();
cancelAll(requireDisposed());
}

private void freeAll() {
Runnable runnable;
private RuntimeException requireDisposed() {
RuntimeException disposed = this.disposed;

if (disposed == null) {
synchronized (this) {
disposed = this.disposed;

while ((runnable = queue.poll()) != null) {
if (runnable instanceof Disposable) {
((Disposable) runnable).dispose();
if (disposed == null) {
this.disposed = disposed = new IllegalStateException("Request queue was disposed");
}

return disposed;
}
}

return disposed;
}

private void cancelAll(RuntimeException e) {
RequestTask<?> task;

while ((task = queue.poll()) != null) {
task.cancel(e);
}
}
}
Loading

0 comments on commit 8ee8c44

Please sign in to comment.