diff --git a/eventloop/src/main/java/io/datakernel/async/AbstractStage.java b/eventloop/src/main/java/io/datakernel/async/AbstractStage.java index 8534c28ee..b16b2607d 100644 --- a/eventloop/src/main/java/io/datakernel/async/AbstractStage.java +++ b/eventloop/src/main/java/io/datakernel/async/AbstractStage.java @@ -13,7 +13,6 @@ import java.util.function.Function; import static io.datakernel.eventloop.Eventloop.getCurrentEventloop; -import static io.datakernel.util.Preconditions.checkArgument; abstract class AbstractStage<T> implements Stage<T> { @@ -429,7 +428,11 @@ public Stage<T> timeout(@Nullable Duration timeout) { if (timeout == null) { return this; } - checkArgument(timeout.toMillis() >= 0, "Timeout cannot be less than zero"); + + if (timeout.toMillis() <= 0) { + return Stage.ofException(TIMEOUT_EXCEPTION); + } + ScheduledRunnable schedule = getCurrentEventloop().delay(timeout, () -> tryCompleteExceptionally(TIMEOUT_EXCEPTION)); return then(new NextStage<T, T>() { diff --git a/rpc/src/main/java/io/datakernel/rpc/client/RpcClient.java b/rpc/src/main/java/io/datakernel/rpc/client/RpcClient.java index 683378b54..2e09de1e8 100644 --- a/rpc/src/main/java/io/datakernel/rpc/client/RpcClient.java +++ b/rpc/src/main/java/io/datakernel/rpc/client/RpcClient.java @@ -445,7 +445,11 @@ void removeConnection(InetSocketAddress address) { */ @Override public <I, O> void sendRequest(I request, int timeout, Callback<O> callback) { - requestSender.sendRequest(request, timeout, callback); + if (timeout > 0) { + requestSender.sendRequest(request, timeout, callback); + } else { + callback.setException(RPC_TIMEOUT_EXCEPTION); + } } public IRpcClient adaptToAnotherEventloop(Eventloop anotherEventloop) { @@ -456,19 +460,24 @@ public IRpcClient adaptToAnotherEventloop(Eventloop anotherEventloop) { return new IRpcClient() { @Override public <I, O> void sendRequest(I request, int timeout, Callback<O> cb) { - RpcClient.this.eventloop.execute(() -> - RpcClient.this.requestSender.sendRequest(request, timeout, - new Callback<O>() { - @Override - public void set(O result) { - anotherEventloop.execute(() -> cb.set(result)); - } - - @Override - public void setException(Throwable throwable) { - anotherEventloop.execute(() -> cb.setException(throwable)); - } - })); + RpcClient.this.eventloop.execute(() -> { + if (timeout > 0) { + RpcClient.this.requestSender.sendRequest(request, timeout, + new Callback<O>() { + @Override + public void set(O result) { + anotherEventloop.execute(() -> cb.set(result)); + } + + @Override + public void setException(Throwable throwable) { + anotherEventloop.execute(() -> cb.setException(throwable)); + } + }); + } else { + cb.setException(RPC_TIMEOUT_EXCEPTION); + } + }); } };