Skip to content

Commit

Permalink
Add check for non-positive timeout to RpcClient, AbstractStage
Browse files Browse the repository at this point in the history
  • Loading branch information
eduard-vasinskyi committed Feb 12, 2019
1 parent 4bcd0b7 commit 22969a3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.function.Function;

import static io.datakernel.eventloop.Eventloop.getCurrentEventloop;
import static io.datakernel.util.Preconditions.checkArgument;

abstract class AbstractStage<T> implements Stage<T> {

Expand Down Expand Up @@ -429,7 +428,11 @@ public Stage<T> timeout(@Nullable Duration timeout) {
if (timeout == null) {
return this;
}
checkArgument(timeout.toMillis() >= 0, "Timeout cannot be less than zero");

if (timeout.toMillis() <= 0) {
return Stage.ofException(TIMEOUT_EXCEPTION);
}

ScheduledRunnable schedule = getCurrentEventloop().delay(timeout, () -> tryCompleteExceptionally(TIMEOUT_EXCEPTION));
return then(new NextStage<T, T>() {

Expand Down
37 changes: 23 additions & 14 deletions rpc/src/main/java/io/datakernel/rpc/client/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,11 @@ void removeConnection(InetSocketAddress address) {
*/
@Override
public <I, O> void sendRequest(I request, int timeout, Callback<O> callback) {
requestSender.sendRequest(request, timeout, callback);
if (timeout > 0) {
requestSender.sendRequest(request, timeout, callback);
} else {
callback.setException(RPC_TIMEOUT_EXCEPTION);
}
}

public IRpcClient adaptToAnotherEventloop(Eventloop anotherEventloop) {
Expand All @@ -456,19 +460,24 @@ public IRpcClient adaptToAnotherEventloop(Eventloop anotherEventloop) {
return new IRpcClient() {
@Override
public <I, O> void sendRequest(I request, int timeout, Callback<O> cb) {
RpcClient.this.eventloop.execute(() ->
RpcClient.this.requestSender.sendRequest(request, timeout,
new Callback<O>() {
@Override
public void set(O result) {
anotherEventloop.execute(() -> cb.set(result));
}

@Override
public void setException(Throwable throwable) {
anotherEventloop.execute(() -> cb.setException(throwable));
}
}));
RpcClient.this.eventloop.execute(() -> {
if (timeout > 0) {
RpcClient.this.requestSender.sendRequest(request, timeout,
new Callback<O>() {
@Override
public void set(O result) {
anotherEventloop.execute(() -> cb.set(result));
}

@Override
public void setException(Throwable throwable) {
anotherEventloop.execute(() -> cb.setException(throwable));
}
});
} else {
cb.setException(RPC_TIMEOUT_EXCEPTION);
}
});
}

};
Expand Down

0 comments on commit 22969a3

Please sign in to comment.