Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use handleUnaryRequest utility method for most server handlers #768

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 46 additions & 30 deletions src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,56 +457,61 @@ public GlobalState getGlobalState() {
@Override
public void createIndex(
CreateIndexRequest req, StreamObserver<CreateIndexResponse> responseObserver) {
createIndexHandler.handle(req, responseObserver);
Handler.handleUnaryRequest("createIndex", req, responseObserver, createIndexHandler);
}

@Override
public void liveSettings(
LiveSettingsRequest req, StreamObserver<LiveSettingsResponse> responseObserver) {
liveSettingsHandler.handle(req, responseObserver);
Handler.handleUnaryRequest("liveSettings", req, responseObserver, liveSettingsHandler);
}

@Override
public void liveSettingsV2(
LiveSettingsV2Request req, StreamObserver<LiveSettingsV2Response> responseObserver) {
liveSettingsV2Handler.handle(req, responseObserver);
Handler.handleUnaryRequest("liveSettingsV2", req, responseObserver, liveSettingsV2Handler);
}

@Override
public void registerFields(
FieldDefRequest fieldDefRequest, StreamObserver<FieldDefResponse> responseObserver) {
registerFieldsHandler.handle(fieldDefRequest, responseObserver);
Handler.handleUnaryRequest(
"registerFields", fieldDefRequest, responseObserver, registerFieldsHandler);
}

@Override
public void updateFields(
FieldDefRequest fieldDefRequest, StreamObserver<FieldDefResponse> responseObserver) {
updateFieldsHandler.handle(fieldDefRequest, responseObserver);
Handler.handleUnaryRequest(
"updateFields", fieldDefRequest, responseObserver, updateFieldsHandler);
}

@Override
public void settings(
SettingsRequest settingsRequest, StreamObserver<SettingsResponse> responseObserver) {
settingsHandler.handle(settingsRequest, responseObserver);
Handler.handleUnaryRequest("settings", settingsRequest, responseObserver, settingsHandler);
}

@Override
public void settingsV2(
SettingsV2Request settingsRequest, StreamObserver<SettingsV2Response> responseObserver) {
settingsV2Handler.handle(settingsRequest, responseObserver);
Handler.handleUnaryRequest(
"settingsV2", settingsRequest, responseObserver, settingsV2Handler);
}

@Override
public void startIndex(
StartIndexRequest startIndexRequest, StreamObserver<StartIndexResponse> responseObserver) {
startIndexHandler.handle(startIndexRequest, responseObserver);
Handler.handleUnaryRequest(
"startIndex", startIndexRequest, responseObserver, startIndexHandler);
}

@Override
public void startIndexV2(
StartIndexV2Request startIndexRequest,
StreamObserver<StartIndexResponse> responseObserver) {
startIndexV2Handler.handle(startIndexRequest, responseObserver);
Handler.handleUnaryRequest(
"startIndexV2", startIndexRequest, responseObserver, startIndexV2Handler);
}

@Override
Expand All @@ -519,7 +524,8 @@ public StreamObserver<AddDocumentRequest> addDocuments(
public void refresh(
RefreshRequest refreshRequest,
StreamObserver<RefreshResponse> refreshResponseStreamObserver) {
refreshHandler.handle(refreshRequest, refreshResponseStreamObserver);
Handler.handleUnaryRequest(
"refresh", refreshRequest, refreshResponseStreamObserver, refreshHandler);
}

@Override
Expand All @@ -531,7 +537,7 @@ public void commit(
@Override
public void stats(
StatsRequest statsRequest, StreamObserver<StatsResponse> statsResponseStreamObserver) {
statsHandler.handle(statsRequest, statsResponseStreamObserver);
Handler.handleUnaryRequest("stats", statsRequest, statsResponseStreamObserver, statsHandler);
}

@Override
Expand All @@ -550,46 +556,50 @@ public void searchV2(
public void delete(
AddDocumentRequest addDocumentRequest,
StreamObserver<AddDocumentResponse> responseObserver) {
deleteDocumentsHandler.handle(addDocumentRequest, responseObserver);
Handler.handleUnaryRequest(
"delete", addDocumentRequest, responseObserver, deleteDocumentsHandler);
}

@Override
public void deleteByQuery(
DeleteByQueryRequest deleteByQueryRequest,
StreamObserver<AddDocumentResponse> responseObserver) {
deleteByQueryHandler.handle(deleteByQueryRequest, responseObserver);
Handler.handleUnaryRequest(
"deleteByQuery", deleteByQueryRequest, responseObserver, deleteByQueryHandler);
}

@Override
public void deleteAll(
DeleteAllDocumentsRequest deleteAllDocumentsRequest,
StreamObserver<DeleteAllDocumentsResponse> responseObserver) {
deleteAllDocumentsHandler.handle(deleteAllDocumentsRequest, responseObserver);
Handler.handleUnaryRequest(
"deleteAll", deleteAllDocumentsRequest, responseObserver, deleteAllDocumentsHandler);
}

@Override
public void deleteIndex(
DeleteIndexRequest deleteIndexRequest,
StreamObserver<DeleteIndexResponse> responseObserver) {
deleteIndexHandler.handle(deleteIndexRequest, responseObserver);
Handler.handleUnaryRequest(
"deleteIndex", deleteIndexRequest, responseObserver, deleteIndexHandler);
}

@Override
public void stopIndex(
StopIndexRequest stopIndexRequest, StreamObserver<DummyResponse> responseObserver) {
stopIndexHandler.handle(stopIndexRequest, responseObserver);
Handler.handleUnaryRequest("stopIndex", stopIndexRequest, responseObserver, stopIndexHandler);
}

@Override
public void reloadState(
ReloadStateRequest request, StreamObserver<ReloadStateResponse> responseObserver) {
reloadStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("reloadState", request, responseObserver, reloadStateHandler);
}

@Override
public void status(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
statusHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("status", request, responseObserver, statusHandler);
}

/**
Expand All @@ -600,45 +610,49 @@ public void status(
@Override
public void ready(
ReadyCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
readyHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("ready", request, responseObserver, readyHandler);
}

@Override
public void createSnapshot(
CreateSnapshotRequest createSnapshotRequest,
StreamObserver<CreateSnapshotResponse> responseObserver) {
createSnapshotHandler.handle(createSnapshotRequest, responseObserver);
Handler.handleUnaryRequest(
"createSnapshot", createSnapshotRequest, responseObserver, createSnapshotHandler);
}

@Override
public void releaseSnapshot(
ReleaseSnapshotRequest releaseSnapshotRequest,
StreamObserver<ReleaseSnapshotResponse> responseObserver) {
releaseSnapshotHandler.handle(releaseSnapshotRequest, responseObserver);
Handler.handleUnaryRequest(
"releaseSnapshot", releaseSnapshotRequest, responseObserver, releaseSnapshotHandler);
}

@Override
public void getAllSnapshotIndexGen(
GetAllSnapshotGenRequest request,
StreamObserver<GetAllSnapshotGenResponse> responseObserver) {
getAllSnapshotIndexGenHandler.handle(request, responseObserver);
Handler.handleUnaryRequest(
"getAllSnapshotIndexGen", request, responseObserver, getAllSnapshotIndexGenHandler);
}

@Override
public void backupWarmingQueries(
BackupWarmingQueriesRequest request,
StreamObserver<BackupWarmingQueriesResponse> responseObserver) {
backupWarmingQueriesHandler.handle(request, responseObserver);
Handler.handleUnaryRequest(
"backupWarmingQueries", request, responseObserver, backupWarmingQueriesHandler);
}

@Override
public void metrics(Empty request, StreamObserver<HttpBody> responseObserver) {
metricsHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("metrics", request, responseObserver, metricsHandler);
}

@Override
public void indices(IndicesRequest request, StreamObserver<IndicesResponse> responseObserver) {
indicesHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("indices", request, responseObserver, indicesHandler);
}

@Override
Expand All @@ -650,30 +664,32 @@ public void nodeInfo(
@Override
public void globalState(
GlobalStateRequest request, StreamObserver<GlobalStateResponse> responseObserver) {
globalStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("globalState", request, responseObserver, globalStateHandler);
}

@Override
public void state(StateRequest request, StreamObserver<StateResponse> responseObserver) {
getStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("state", request, responseObserver, getStateHandler);
}

@Override
public void forceMerge(
ForceMergeRequest forceMergeRequest, StreamObserver<ForceMergeResponse> responseObserver) {
forceMergeHandler.handle(forceMergeRequest, responseObserver);
Handler.handleUnaryRequest(
"forceMerge", forceMergeRequest, responseObserver, forceMergeHandler);
}

@Override
public void forceMergeDeletes(
ForceMergeDeletesRequest forceMergeRequest,
StreamObserver<ForceMergeDeletesResponse> responseObserver) {
forceMergeDeletesHandler.handle(forceMergeRequest, responseObserver);
Handler.handleUnaryRequest(
"forceMergeDeletes", forceMergeRequest, responseObserver, forceMergeDeletesHandler);
}

@Override
public void custom(CustomRequest request, StreamObserver<CustomResponse> responseObserver) {
customHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("custom", request, responseObserver, customHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.warming.Warmer;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,74 +34,47 @@ public BackupWarmingQueriesHandler(GlobalState globalState) {
}

@Override
public void handle(
BackupWarmingQueriesRequest request,
StreamObserver<BackupWarmingQueriesResponse> responseObserver) {
public BackupWarmingQueriesResponse handle(BackupWarmingQueriesRequest request) throws Exception {
logger.info("Received backup warming queries request: {}", request);
String index = request.getIndex();
try {
IndexState indexState = getGlobalState().getIndexOrThrow(index);
Warmer warmer = indexState.getWarmer();
if (warmer == null) {
logger.warn("Unable to backup warming queries as warmer not found for index: {}", index);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
"Unable to backup warming queries as warmer not found for index: " + index)
.asRuntimeException());
return;
}
int numQueriesThreshold = request.getNumQueriesThreshold();
int numWarmingRequests = warmer.getNumWarmingRequests();
if (numQueriesThreshold > 0 && numWarmingRequests < numQueriesThreshold) {
logger.warn(
"Unable to backup warming queries since warmer has {} requests, which is less than threshold {}",
numWarmingRequests,
numQueriesThreshold);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since warmer has %s requests, which is less than threshold %s",
numWarmingRequests, numQueriesThreshold))
.asRuntimeException());
return;
}
int uptimeMinutesThreshold = request.getUptimeMinutesThreshold();
int currUptimeMinutes =
(int) (ManagementFactory.getRuntimeMXBean().getUptime() / 1000L / 60L);
if (uptimeMinutesThreshold > 0 && currUptimeMinutes < uptimeMinutesThreshold) {
logger.warn(
"Unable to backup warming queries since uptime is {} minutes, which is less than threshold {}",
currUptimeMinutes,
uptimeMinutesThreshold);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since uptime is %s minutes, which is less than threshold %s",
currUptimeMinutes, uptimeMinutesThreshold))
.asRuntimeException());
return;
}
warmer.backupWarmingQueriesToS3(request.getServiceName());
responseObserver.onNext(BackupWarmingQueriesResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (IOException e) {
logger.error(
"Unable to backup warming queries for index: {}, service: {}",
index,
request.getServiceName(),
e);
responseObserver.onError(
Status.UNKNOWN
.withCause(e)
.withDescription(
String.format(
"Unable to backup warming queries for index: %s, service: %s",
index, request.getServiceName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
IndexState indexState = getIndexState(index);
Warmer warmer = indexState.getWarmer();
if (warmer == null) {
logger.warn("Unable to backup warming queries as warmer not found for index: {}", index);
throw Status.UNKNOWN
.withDescription(
"Unable to backup warming queries as warmer not found for index: " + index)
.asRuntimeException();
}
int numQueriesThreshold = request.getNumQueriesThreshold();
int numWarmingRequests = warmer.getNumWarmingRequests();
if (numQueriesThreshold > 0 && numWarmingRequests < numQueriesThreshold) {
logger.warn(
"Unable to backup warming queries since warmer has {} requests, which is less than threshold {}",
numWarmingRequests,
numQueriesThreshold);
throw Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since warmer has %s requests, which is less than threshold %s",
numWarmingRequests, numQueriesThreshold))
.asRuntimeException();
}
int uptimeMinutesThreshold = request.getUptimeMinutesThreshold();
int currUptimeMinutes = (int) (ManagementFactory.getRuntimeMXBean().getUptime() / 1000L / 60L);
if (uptimeMinutesThreshold > 0 && currUptimeMinutes < uptimeMinutesThreshold) {
logger.warn(
"Unable to backup warming queries since uptime is {} minutes, which is less than threshold {}",
currUptimeMinutes,
uptimeMinutesThreshold);
throw Status.UNKNOWN
.withDescription(
String.format(
"Unable to backup warming queries since uptime is %s minutes, which is less than threshold %s",
currUptimeMinutes, uptimeMinutesThreshold))
.asRuntimeException();
}
warmer.backupWarmingQueriesToS3(request.getServiceName());
return BackupWarmingQueriesResponse.newBuilder().build();
}
}
Loading
Loading