Skip to content

Commit

Permalink
fix: Alows clients to expect that query performance results have been…
Browse files Browse the repository at this point in the history
… logged before they receive a response (#6334)

Defer response messaging to `onSuccess` for all `ExportBuilders` that
use `QueryPerformanceRecorder`. This allows clients to expect that query
performance results have been logged before they receive a response.

Change summary:
* Adjust all `SessionState.ExportBuilder` uses that set a
`QueryPerformanceRecorder` to use `onSuccess` for response delivery if
possible
* Exclusions: `DoExchange` subscriptions and async input table
operations
* Note: batch already used `onSuccess`, but completion now happens after
performance results are reported
* Some cleanup to `GrpcUtil` usage
  • Loading branch information
rcaudy authored Nov 5, 2024
1 parent b603fc7 commit 7657ff5
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.UUID;
import java.util.function.Supplier;

public class GrpcUtil {
private static final Logger log = LoggerFactory.getLogger(GrpcUtil.class);
Expand Down Expand Up @@ -56,7 +56,7 @@ public static <T> void safelyOnNext(StreamObserver<T> observer, T message) {
* @param message the last message to send on this stream before completing
* @param <T> the type of message that the stream handles
*/
public static <T> void safelyComplete(StreamObserver<T> observer, T message) {
public static <T> void safelyOnNextAndComplete(StreamObserver<T> observer, T message) {
safelyExecuteLocked(observer, () -> {
observer.onNext(message);
observer.onCompleted();
Expand Down
47 changes: 24 additions & 23 deletions server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public static void DoGetCustom(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(tableExport)
.onError(observer)
.onSuccess(observer)
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
Object export = tableExport.get();
Expand All @@ -146,8 +147,6 @@ public static void DoGetCustom(
// shared code between `DoGet` and `BarrageSnapshotRequest`
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false,
DEFAULT_SNAPSHOT_DESER_OPTIONS, listener, metrics);

listener.onCompleted();
});
}
}
Expand Down Expand Up @@ -544,6 +543,27 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(tableExport)
.onError(listener)
.onSuccess(() -> {
final HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed,
// time to half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
GrpcUtil.safelyComplete(listener);
}
})
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
Object export = tableExport.get();
Expand Down Expand Up @@ -586,25 +606,6 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport,
reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener,
metrics);
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed,
// time to half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
});
}
}
Expand All @@ -614,7 +615,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
public void close() {
// no work to do for DoGetRequest close
// possibly safely complete if finished sending data
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
final HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have half closed
Expand All @@ -630,7 +631,7 @@ public void close() {
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
GrpcUtil.safelyComplete(listener);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;

import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyError;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNextAndComplete;

@Singleton
public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBase {
private static final Logger log = LoggerFactory.getLogger(FlightServiceGrpcImpl.class);
Expand Down Expand Up @@ -97,7 +101,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
GrpcUtil.safelyComplete(responseObserver);
safelyComplete(responseObserver);
}
};
}
Expand All @@ -118,7 +122,7 @@ private HandshakeObserver(StreamObserver<Flight.HandshakeResponse> responseObser
public void onNext(final Flight.HandshakeRequest value) {
final AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener =
(protocol, response) -> {
GrpcUtil.safelyComplete(responseObserver, Flight.HandshakeResponse.newBuilder()
safelyOnNextAndComplete(responseObserver, Flight.HandshakeResponse.newBuilder()
.setProtocolVersion(protocol)
.setPayload(ByteStringAccess.wrap(response))
.build());
Expand Down Expand Up @@ -222,30 +226,28 @@ public void getFlightInfo(
ticketRouter.flightInfoFor(session, request, "request");

if (session != null) {
session.nonExport()
session.<Flight.FlightInfo>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(export.get());
responseObserver.onCompleted();
});
.onSuccess((final Flight.FlightInfo resultFlightInfo) -> safelyOnNextAndComplete(
responseObserver, resultFlightInfo))
.submit(export::get);
return;
}

StatusRuntimeException exception = null;
if (export.tryRetainReference()) {
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
GrpcUtil.safelyOnNext(responseObserver, export.get());
GrpcUtil.safelyComplete(responseObserver);
safelyOnNextAndComplete(responseObserver, export.get());
}
} finally {
export.dropReference();
}
} else {
exception = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info");
GrpcUtil.safelyError(responseObserver, exception);
safelyError(responseObserver, exception);
}

if (queryPerformanceRecorder.endQuery() || exception != null) {
Expand All @@ -269,27 +271,26 @@ public void getSchema(
ticketRouter.flightInfoFor(session, request, "request");

if (session != null) {
session.nonExport()
session.<Flight.SchemaResult>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
responseObserver.onCompleted();
});
.onSuccess((final Flight.SchemaResult resultSchema) -> safelyOnNextAndComplete(
responseObserver,
resultSchema))
.submit(() -> Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
return;
}

StatusRuntimeException exception = null;
if (export.tryRetainReference()) {
try {
if (export.getState() == ExportNotification.State.EXPORTED) {
GrpcUtil.safelyOnNext(responseObserver, Flight.SchemaResult.newBuilder()
safelyOnNextAndComplete(responseObserver, Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
GrpcUtil.safelyComplete(responseObserver);
}
} finally {
export.dropReference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext;
import static io.deephaven.extensions.barrage.util.GrpcUtil.*;

@Singleton
public class ConsoleServiceGrpcImpl extends ConsoleServiceGrpc.ConsoleServiceImplBase {
Expand Down Expand Up @@ -139,11 +138,9 @@ public void startConsole(
.onError(responseObserver)
.submit(() -> {
final ScriptSession scriptSession = new DelegatingScriptSession(scriptSessionProvider.get());

safelyComplete(responseObserver, StartConsoleResponse.newBuilder()
safelyOnNextAndComplete(responseObserver, StartConsoleResponse.newBuilder()
.setResultId(request.getResultId())
.build());

return scriptSession;
});
}
Expand All @@ -154,7 +151,7 @@ public void subscribeToLogs(
@NotNull final StreamObserver<LogSubscriptionData> responseObserver) {
sessionService.getCurrentSession();
if (REMOTE_CONSOLE_DISABLED) {
GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
return;
}
// Session close logic implicitly handled in
Expand Down Expand Up @@ -183,16 +180,18 @@ public void executeCommand(
final SessionState.ExportObject<ScriptSession> exportedConsole =
ticketRouter.resolve(session, consoleId, "consoleId");

session.nonExport()
session.<ExecuteCommandResponse>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
.onSuccess((final ExecuteCommandResponse response) -> safelyOnNextAndComplete(responseObserver,
response))
.submit(() -> {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
final ScriptSession scriptSession = exportedConsole.get();
final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
Expand All @@ -203,7 +202,7 @@ public void executeCommand(
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
return diff.setChanges(fieldChanges).build();
});
}
}
Expand Down Expand Up @@ -276,7 +275,9 @@ public void bindTableToVariable(
ExportBuilder<?> exportBuilder = session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.requiresSerialQueue()
.onError(responseObserver);
.onError(responseObserver)
.onSuccess(() -> safelyOnNextAndComplete(responseObserver,
BindTableToVariableResponse.getDefaultInstance()));

if (request.hasConsoleId()) {
exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId");
Expand All @@ -292,8 +293,6 @@ public void bindTableToVariable(

Table table = exportedTable.get();
queryScope.putParam(request.getVariableName(), table);
responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
}
}
Expand Down Expand Up @@ -405,7 +404,7 @@ public void start() {
}

public void stop() {
GrpcUtil.safelyComplete(client);
safelyComplete(client);
}

// ------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -459,7 +458,7 @@ public void run() {
return;
}
if (tooSlow) {
GrpcUtil.safelyError(client, Code.RESOURCE_EXHAUSTED, String.format(
safelyError(client, Code.RESOURCE_EXHAUSTED, String.format(
"Too slow: the client or network may be too slow to keep up with the logging rates, or there may be logging bursts that exceed the available buffer size. The buffer size can be configured through the server property '%s'.",
SUBSCRIBE_TO_LOGS_BUFFER_SIZE_PROP));
return;
Expand All @@ -472,7 +471,7 @@ public void run() {
bufferIsKnownEmpty = true;
break;
}
GrpcUtil.safelyOnNext(client, payload);
safelyOnNext(client, payload);
}
} finally {
guard.set(false);
Expand Down
Loading

0 comments on commit 7657ff5

Please sign in to comment.