Skip to content

Commit

Permalink
Use handleUnaryRequest utility method for most server handlers (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 18, 2024
1 parent c96f916 commit 6849a4e
Show file tree
Hide file tree
Showing 34 changed files with 309 additions and 874 deletions.
76 changes: 46 additions & 30 deletions src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,56 +457,61 @@ public GlobalState getGlobalState() {
@Override
public void createIndex(
CreateIndexRequest req, StreamObserver<CreateIndexResponse> responseObserver) {
createIndexHandler.handle(req, responseObserver);
Handler.handleUnaryRequest("createIndex", req, responseObserver, createIndexHandler);
}

@Override
public void liveSettings(
LiveSettingsRequest req, StreamObserver<LiveSettingsResponse> responseObserver) {
liveSettingsHandler.handle(req, responseObserver);
Handler.handleUnaryRequest("liveSettings", req, responseObserver, liveSettingsHandler);
}

@Override
public void liveSettingsV2(
LiveSettingsV2Request req, StreamObserver<LiveSettingsV2Response> responseObserver) {
liveSettingsV2Handler.handle(req, responseObserver);
Handler.handleUnaryRequest("liveSettingsV2", req, responseObserver, liveSettingsV2Handler);
}

@Override
public void registerFields(
FieldDefRequest fieldDefRequest, StreamObserver<FieldDefResponse> responseObserver) {
registerFieldsHandler.handle(fieldDefRequest, responseObserver);
Handler.handleUnaryRequest(
"registerFields", fieldDefRequest, responseObserver, registerFieldsHandler);
}

@Override
public void updateFields(
FieldDefRequest fieldDefRequest, StreamObserver<FieldDefResponse> responseObserver) {
updateFieldsHandler.handle(fieldDefRequest, responseObserver);
Handler.handleUnaryRequest(
"updateFields", fieldDefRequest, responseObserver, updateFieldsHandler);
}

@Override
public void settings(
SettingsRequest settingsRequest, StreamObserver<SettingsResponse> responseObserver) {
settingsHandler.handle(settingsRequest, responseObserver);
Handler.handleUnaryRequest("settings", settingsRequest, responseObserver, settingsHandler);
}

@Override
public void settingsV2(
SettingsV2Request settingsRequest, StreamObserver<SettingsV2Response> responseObserver) {
settingsV2Handler.handle(settingsRequest, responseObserver);
Handler.handleUnaryRequest(
"settingsV2", settingsRequest, responseObserver, settingsV2Handler);
}

@Override
public void startIndex(
StartIndexRequest startIndexRequest, StreamObserver<StartIndexResponse> responseObserver) {
startIndexHandler.handle(startIndexRequest, responseObserver);
Handler.handleUnaryRequest(
"startIndex", startIndexRequest, responseObserver, startIndexHandler);
}

@Override
public void startIndexV2(
StartIndexV2Request startIndexRequest,
StreamObserver<StartIndexResponse> responseObserver) {
startIndexV2Handler.handle(startIndexRequest, responseObserver);
Handler.handleUnaryRequest(
"startIndexV2", startIndexRequest, responseObserver, startIndexV2Handler);
}

@Override
Expand All @@ -519,7 +524,8 @@ public StreamObserver<AddDocumentRequest> addDocuments(
public void refresh(
RefreshRequest refreshRequest,
StreamObserver<RefreshResponse> refreshResponseStreamObserver) {
refreshHandler.handle(refreshRequest, refreshResponseStreamObserver);
Handler.handleUnaryRequest(
"refresh", refreshRequest, refreshResponseStreamObserver, refreshHandler);
}

@Override
Expand All @@ -531,7 +537,7 @@ public void commit(
@Override
public void stats(
StatsRequest statsRequest, StreamObserver<StatsResponse> statsResponseStreamObserver) {
statsHandler.handle(statsRequest, statsResponseStreamObserver);
Handler.handleUnaryRequest("stats", statsRequest, statsResponseStreamObserver, statsHandler);
}

@Override
Expand All @@ -550,46 +556,50 @@ public void searchV2(
public void delete(
AddDocumentRequest addDocumentRequest,
StreamObserver<AddDocumentResponse> responseObserver) {
deleteDocumentsHandler.handle(addDocumentRequest, responseObserver);
Handler.handleUnaryRequest(
"delete", addDocumentRequest, responseObserver, deleteDocumentsHandler);
}

@Override
public void deleteByQuery(
DeleteByQueryRequest deleteByQueryRequest,
StreamObserver<AddDocumentResponse> responseObserver) {
deleteByQueryHandler.handle(deleteByQueryRequest, responseObserver);
Handler.handleUnaryRequest(
"deleteByQuery", deleteByQueryRequest, responseObserver, deleteByQueryHandler);
}

@Override
public void deleteAll(
DeleteAllDocumentsRequest deleteAllDocumentsRequest,
StreamObserver<DeleteAllDocumentsResponse> responseObserver) {
deleteAllDocumentsHandler.handle(deleteAllDocumentsRequest, responseObserver);
Handler.handleUnaryRequest(
"deleteAll", deleteAllDocumentsRequest, responseObserver, deleteAllDocumentsHandler);
}

@Override
public void deleteIndex(
DeleteIndexRequest deleteIndexRequest,
StreamObserver<DeleteIndexResponse> responseObserver) {
deleteIndexHandler.handle(deleteIndexRequest, responseObserver);
Handler.handleUnaryRequest(
"deleteIndex", deleteIndexRequest, responseObserver, deleteIndexHandler);
}

@Override
public void stopIndex(
StopIndexRequest stopIndexRequest, StreamObserver<DummyResponse> responseObserver) {
stopIndexHandler.handle(stopIndexRequest, responseObserver);
Handler.handleUnaryRequest("stopIndex", stopIndexRequest, responseObserver, stopIndexHandler);
}

@Override
public void reloadState(
ReloadStateRequest request, StreamObserver<ReloadStateResponse> responseObserver) {
reloadStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("reloadState", request, responseObserver, reloadStateHandler);
}

@Override
public void status(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
statusHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("status", request, responseObserver, statusHandler);
}

/**
Expand All @@ -600,45 +610,49 @@ public void status(
@Override
public void ready(
ReadyCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
readyHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("ready", request, responseObserver, readyHandler);
}

@Override
public void createSnapshot(
CreateSnapshotRequest createSnapshotRequest,
StreamObserver<CreateSnapshotResponse> responseObserver) {
createSnapshotHandler.handle(createSnapshotRequest, responseObserver);
Handler.handleUnaryRequest(
"createSnapshot", createSnapshotRequest, responseObserver, createSnapshotHandler);
}

@Override
public void releaseSnapshot(
ReleaseSnapshotRequest releaseSnapshotRequest,
StreamObserver<ReleaseSnapshotResponse> responseObserver) {
releaseSnapshotHandler.handle(releaseSnapshotRequest, responseObserver);
Handler.handleUnaryRequest(
"releaseSnapshot", releaseSnapshotRequest, responseObserver, releaseSnapshotHandler);
}

@Override
public void getAllSnapshotIndexGen(
GetAllSnapshotGenRequest request,
StreamObserver<GetAllSnapshotGenResponse> responseObserver) {
getAllSnapshotIndexGenHandler.handle(request, responseObserver);
Handler.handleUnaryRequest(
"getAllSnapshotIndexGen", request, responseObserver, getAllSnapshotIndexGenHandler);
}

@Override
public void backupWarmingQueries(
BackupWarmingQueriesRequest request,
StreamObserver<BackupWarmingQueriesResponse> responseObserver) {
backupWarmingQueriesHandler.handle(request, responseObserver);
Handler.handleUnaryRequest(
"backupWarmingQueries", request, responseObserver, backupWarmingQueriesHandler);
}

@Override
public void metrics(Empty request, StreamObserver<HttpBody> responseObserver) {
metricsHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("metrics", request, responseObserver, metricsHandler);
}

@Override
public void indices(IndicesRequest request, StreamObserver<IndicesResponse> responseObserver) {
indicesHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("indices", request, responseObserver, indicesHandler);
}

@Override
Expand All @@ -650,30 +664,32 @@ public void nodeInfo(
@Override
public void globalState(
GlobalStateRequest request, StreamObserver<GlobalStateResponse> responseObserver) {
globalStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("globalState", request, responseObserver, globalStateHandler);
}

@Override
public void state(StateRequest request, StreamObserver<StateResponse> responseObserver) {
getStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("state", request, responseObserver, getStateHandler);
}

@Override
public void forceMerge(
ForceMergeRequest forceMergeRequest, StreamObserver<ForceMergeResponse> responseObserver) {
forceMergeHandler.handle(forceMergeRequest, responseObserver);
Handler.handleUnaryRequest(
"forceMerge", forceMergeRequest, responseObserver, forceMergeHandler);
}

@Override
public void forceMergeDeletes(
ForceMergeDeletesRequest forceMergeRequest,
StreamObserver<ForceMergeDeletesResponse> responseObserver) {
forceMergeDeletesHandler.handle(forceMergeRequest, responseObserver);
Handler.handleUnaryRequest(
"forceMergeDeletes", forceMergeRequest, responseObserver, forceMergeDeletesHandler);
}

@Override
public void custom(CustomRequest request, StreamObserver<CustomResponse> responseObserver) {
customHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("custom", request, responseObserver, customHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.warming.Warmer;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,74 +34,47 @@ public BackupWarmingQueriesHandler(GlobalState globalState) {
}

@Override
public void handle(
BackupWarmingQueriesRequest request,
StreamObserver<BackupWarmingQueriesResponse> responseObserver) {
public BackupWarmingQueriesResponse handle(BackupWarmingQueriesRequest request) throws Exception {
logger.info("Received backup warming queries request: {}", request);
String index = request.getIndex();
try {
IndexState indexState = getGlobalState().getIndexOrThrow(index);
Warmer warmer = indexState.getWarmer();
if (warmer == null) {
logger.warn("Unable to backup warming queries as warmer not found for index: {}", index);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
"Unable to backup warming queries as warmer not found for index: " + index)
.asRuntimeException());
return;
}
int numQueriesThreshold = request.getNumQueriesThreshold();
int numWarmingRequests = warmer.getNumWarmingRequests();
if (numQueriesThreshold > 0 && numWarmingRequests < numQueriesThreshold) {
logger.warn(
"Unable to backup warming queries since warmer has {} requests, which is less than threshold {}",
numWarmingRequests,
numQueriesThreshold);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since warmer has %s requests, which is less than threshold %s",
numWarmingRequests, numQueriesThreshold))
.asRuntimeException());
return;
}
int uptimeMinutesThreshold = request.getUptimeMinutesThreshold();
int currUptimeMinutes =
(int) (ManagementFactory.getRuntimeMXBean().getUptime() / 1000L / 60L);
if (uptimeMinutesThreshold > 0 && currUptimeMinutes < uptimeMinutesThreshold) {
logger.warn(
"Unable to backup warming queries since uptime is {} minutes, which is less than threshold {}",
currUptimeMinutes,
uptimeMinutesThreshold);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since uptime is %s minutes, which is less than threshold %s",
currUptimeMinutes, uptimeMinutesThreshold))
.asRuntimeException());
return;
}
warmer.backupWarmingQueriesToS3(request.getServiceName());
responseObserver.onNext(BackupWarmingQueriesResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (IOException e) {
logger.error(
"Unable to backup warming queries for index: {}, service: {}",
index,
request.getServiceName(),
e);
responseObserver.onError(
Status.UNKNOWN
.withCause(e)
.withDescription(
String.format(
"Unable to backup warming queries for index: %s, service: %s",
index, request.getServiceName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
IndexState indexState = getIndexState(index);
Warmer warmer = indexState.getWarmer();
if (warmer == null) {
logger.warn("Unable to backup warming queries as warmer not found for index: {}", index);
throw Status.UNKNOWN
.withDescription(
"Unable to backup warming queries as warmer not found for index: " + index)
.asRuntimeException();
}
int numQueriesThreshold = request.getNumQueriesThreshold();
int numWarmingRequests = warmer.getNumWarmingRequests();
if (numQueriesThreshold > 0 && numWarmingRequests < numQueriesThreshold) {
logger.warn(
"Unable to backup warming queries since warmer has {} requests, which is less than threshold {}",
numWarmingRequests,
numQueriesThreshold);
throw Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since warmer has %s requests, which is less than threshold %s",
numWarmingRequests, numQueriesThreshold))
.asRuntimeException();
}
int uptimeMinutesThreshold = request.getUptimeMinutesThreshold();
int currUptimeMinutes = (int) (ManagementFactory.getRuntimeMXBean().getUptime() / 1000L / 60L);
if (uptimeMinutesThreshold > 0 && currUptimeMinutes < uptimeMinutesThreshold) {
logger.warn(
"Unable to backup warming queries since uptime is {} minutes, which is less than threshold {}",
currUptimeMinutes,
uptimeMinutesThreshold);
throw Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since uptime is %s minutes, which is less than threshold %s",
currUptimeMinutes, uptimeMinutesThreshold))
.asRuntimeException();
}
warmer.backupWarmingQueriesToS3(request.getServiceName());
return BackupWarmingQueriesResponse.newBuilder().build();
}
}
Loading

0 comments on commit 6849a4e

Please sign in to comment.