Skip to content

Commit

Permalink
Issue 5589 trigger a timeout (#7469)
Browse files Browse the repository at this point in the history
* Make timeout feature use vertx timer.
* Handle all errors resulting from timeout.
* Ensure that after the timeout occurs, we properly handle the response and don't allow the execution to continue.


---------

Signed-off-by: Ade Lucas <[email protected]>
Signed-off-by: cloudspores <[email protected]>
Co-authored-by: Ade Lucas <[email protected]>
Co-authored-by: Ade Lucas <[email protected]>
Co-authored-by: Justin Florentine <[email protected]>
  • Loading branch information
4 people authored Oct 30, 2024
1 parent ba86ce1 commit f9f721c
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ protected static void handleJsonRpcError(
private static HttpResponseStatus statusCodeFromError(final RpcErrorType error) {
return switch (error) {
case INVALID_REQUEST, PARSE_ERROR -> HttpResponseStatus.BAD_REQUEST;
case TIMEOUT_ERROR -> HttpResponseStatus.REQUEST_TIMEOUT;
default -> HttpResponseStatus.OK;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
public class JsonRpcExecutorHandler {
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcExecutorHandler.class);

// Default timeout for RPC calls in seconds
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 30_000L;

private JsonRpcExecutorHandler() {}

public static Handler<RoutingContext> handler(
Expand All @@ -49,6 +52,19 @@ public static Handler<RoutingContext> handler(
final Tracer tracer,
final JsonRpcConfiguration jsonRpcConfiguration) {
return ctx -> {
final long timerId =
ctx.vertx()
.setTimer(
DEFAULT_TIMEOUT_MILLISECONDS,
id -> {
final String method =
ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name()).toString();
LOG.error("Timeout occurred in JSON-RPC executor for method {}", method);
handleErrorAndEndResponse(ctx, null, RpcErrorType.TIMEOUT_ERROR);
});

ctx.put("timerId", timerId);

try {
createExecutor(jsonRpcExecutor, tracer, ctx, jsonRpcConfiguration)
.ifPresentOrElse(
Expand All @@ -58,18 +74,38 @@ public static Handler<RoutingContext> handler(
} catch (IOException e) {
final String method = executor.getRpcMethodName(ctx);
LOG.error("{} - Error streaming JSON-RPC response", method, e);
handleJsonRpcError(ctx, null, RpcErrorType.INTERNAL_ERROR);
handleErrorAndEndResponse(ctx, null, RpcErrorType.INTERNAL_ERROR);
} finally {
cancelTimer(ctx);
}
},
() -> handleJsonRpcError(ctx, null, RpcErrorType.PARSE_ERROR));
() -> {
handleErrorAndEndResponse(ctx, null, RpcErrorType.PARSE_ERROR);
cancelTimer(ctx);
});
} catch (final RuntimeException e) {
final String method = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
final String method = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name()).toString();
LOG.error("Unhandled exception in JSON-RPC executor for method {}", method, e);
handleJsonRpcError(ctx, null, RpcErrorType.INTERNAL_ERROR);
handleErrorAndEndResponse(ctx, null, RpcErrorType.INTERNAL_ERROR);
cancelTimer(ctx);
}
};
}

private static void cancelTimer(final RoutingContext ctx) {
Long timerId = ctx.get("timerId");
if (timerId != null) {
ctx.vertx().cancelTimer(timerId);
}
}

private static void handleErrorAndEndResponse(
final RoutingContext ctx, final Object id, final RpcErrorType errorType) {
if (!ctx.response().ended()) {
handleJsonRpcError(ctx, id, errorType);
}
}

private static Optional<AbstractJsonRpcExecutor> createExecutor(
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.handlers;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.trace.Tracer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

class JsonRpcExecutorHandlerTest {

private JsonRpcExecutor mockExecutor;
private Tracer mockTracer;
private JsonRpcConfiguration mockConfig;
private RoutingContext mockContext;
private Vertx mockVertx;
private HttpServerResponse mockResponse;

@BeforeEach
void setUp() {
mockExecutor = mock(JsonRpcExecutor.class);
mockTracer = mock(Tracer.class);
mockConfig = mock(JsonRpcConfiguration.class);
mockContext = mock(RoutingContext.class);
mockVertx = mock(Vertx.class);
mockResponse = mock(HttpServerResponse.class);

when(mockContext.vertx()).thenReturn(mockVertx);
when(mockContext.response()).thenReturn(mockResponse);
when(mockResponse.ended()).thenReturn(false);
when(mockResponse.setStatusCode(anyInt())).thenReturn(mockResponse);
}

@Test
void testTimeoutHandling() {
// Arrange
Handler<RoutingContext> handler =
JsonRpcExecutorHandler.handler(mockExecutor, mockTracer, mockConfig);
ArgumentCaptor<Long> delayCaptor = ArgumentCaptor.forClass(Long.class);
@SuppressWarnings("unchecked")
ArgumentCaptor<Handler<Long>> timerHandlerCaptor = ArgumentCaptor.forClass(Handler.class);

when(mockContext.get(eq(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name()))).thenReturn("{}");
when(mockVertx.setTimer(delayCaptor.capture(), timerHandlerCaptor.capture())).thenReturn(1L);
when(mockContext.get("timerId")).thenReturn(1L);

// Act
handler.handle(mockContext);

// Assert
verify(mockVertx).setTimer(eq(30000L), any());

// Simulate timeout
timerHandlerCaptor.getValue().handle(1L);

// Verify timeout handling
verify(mockResponse, times(1))
.setStatusCode(eq(HttpResponseStatus.REQUEST_TIMEOUT.code())); // Expect 408 Request Timeout
verify(mockResponse, times(1)).end(contains("Timeout expired"));
verify(mockVertx, times(1)).cancelTimer(1L);
}

@Test
void testCancelTimerOnSuccessfulExecution() {
// Arrange
Handler<RoutingContext> handler =
JsonRpcExecutorHandler.handler(mockExecutor, mockTracer, mockConfig);
when(mockContext.get(eq(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name()))).thenReturn("{}");
when(mockVertx.setTimer(anyLong(), any())).thenReturn(1L);
when(mockContext.get("timerId")).thenReturn(1L);

// Act
handler.handle(mockContext);

// Assert
verify(mockVertx).setTimer(anyLong(), any());
verify(mockVertx).cancelTimer(1L);
}
}

0 comments on commit f9f721c

Please sign in to comment.