Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allows clients to expect that query performance results have been logged before they receive a response #6334

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public static void DoGetCustom(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(tableExport)
.onError(observer)
.onSuccess(observer::onCompleted)
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
Object export = tableExport.get();
Expand All @@ -146,8 +147,6 @@ public static void DoGetCustom(
// shared code between `DoGet` and `BarrageSnapshotRequest`
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false,
DEFAULT_SNAPSHOT_DESER_OPTIONS, listener, metrics);

listener.onCompleted();
});
}
}
Expand Down Expand Up @@ -544,6 +543,27 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(tableExport)
.onError(listener)
.onSuccess(() -> {
final HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed,
// time to half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
})
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
Object export = tableExport.get();
Expand Down Expand Up @@ -586,25 +606,6 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport,
reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener,
metrics);
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed,
// time to half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
});
}
}
Expand All @@ -614,7 +615,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
public void close() {
// no work to do for DoGetRequest close
// possibly safely complete if finished sending data
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
final HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have half closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,8 @@ public void getFlightInfo(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(export.get());
responseObserver.onCompleted();
});
.onSuccess(responseObserver::onCompleted)
.submit(() -> responseObserver.onNext(export.get()));
return;
}

Expand Down Expand Up @@ -273,12 +271,10 @@ public void getSchema(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
responseObserver.onCompleted();
});
.onSuccess(responseObserver::onCompleted)
.submit(() -> responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build()));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,17 @@ public void executeCommand(
final SessionState.ExportObject<ScriptSession> exportedConsole =
ticketRouter.resolve(session, consoleId, "consoleId");

session.nonExport()
session.<ExecuteCommandResponse>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
.onSuccess((final ExecuteCommandResponse response) -> safelyComplete(responseObserver, response))
.submit(() -> {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
final ScriptSession scriptSession = exportedConsole.get();
final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
Expand All @@ -203,7 +204,7 @@ public void executeCommand(
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
return diff.setChanges(fieldChanges).build();
});
}
}
Expand Down Expand Up @@ -276,7 +277,8 @@ public void bindTableToVariable(
ExportBuilder<?> exportBuilder = session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.requiresSerialQueue()
.onError(responseObserver);
.onError(responseObserver)
.onSuccess(responseObserver::onCompleted);

if (request.hasConsoleId()) {
exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId");
Expand All @@ -293,7 +295,6 @@ public void bindTableToVariable(
Table table = exportedTable.get();
queryScope.putParam(request.getVariableName(), table);
responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ public void rollup(
final SessionState.ExportObject<Table> sourceTableExport =
ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId");

session.newExport(request.getResultRollupTableId(), "resultRollupTableId")
session.<RollupTable>newExport(request.getResultRollupTableId(), "resultRollupTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(sourceTableExport)
.onError(responseObserver)
.onSuccess((final RollupTable ignoredResult) -> safelyComplete(responseObserver,
RollupResponse.getDefaultInstance()))
.submit(() -> {
final Table sourceTable = sourceTableExport.get();

Expand All @@ -109,7 +111,6 @@ public void rollup(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to rollup hierarchical table");
}
safelyComplete(responseObserver, RollupResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -141,10 +142,12 @@ public void tree(
final SessionState.ExportObject<Table> sourceTableExport =
ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId");

session.newExport(request.getResultTreeTableId(), "resultTreeTableId")
session.<TreeTable>newExport(request.getResultTreeTableId(), "resultTreeTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(sourceTableExport)
.onError(responseObserver)
.onSuccess((final TreeTable ignoredResult) -> safelyComplete(responseObserver,
TreeResponse.getDefaultInstance()))
.submit(() -> {
final Table sourceTable = sourceTableExport.get();

Expand All @@ -169,7 +172,6 @@ public void tree(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to tree hierarchical table");
}
safelyComplete(responseObserver, TreeResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -202,10 +204,12 @@ public void apply(
final SessionState.ExportObject<HierarchicalTable<?>> inputHierarchicalTableExport =
ticketRouter.resolve(session, request.getInputHierarchicalTableId(), "inputHierarchicalTableId");

session.newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId")
session.<HierarchicalTable<?>>newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(inputHierarchicalTableExport)
.onError(responseObserver)
.onSuccess((final HierarchicalTable<?> ignoredResult) -> safelyComplete(responseObserver,
HierarchicalTableApplyResponse.getDefaultInstance()))
.submit(() -> {
final HierarchicalTable<?> inputHierarchicalTable = inputHierarchicalTableExport.get();

Expand Down Expand Up @@ -274,7 +278,6 @@ public void apply(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to apply to hierarchical table");
}
safelyComplete(responseObserver, HierarchicalTableApplyResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -395,6 +398,8 @@ public void view(
resultExportBuilder
.queryPerformanceRecorder(queryPerformanceRecorder)
.onError(responseObserver)
.onSuccess((final HierarchicalTableView ignoredResult) -> safelyComplete(responseObserver,
HierarchicalTableViewResponse.getDefaultInstance()))
.submit(() -> {
final Table keyTable = keyTableExport == null ? null : keyTableExport.get();
final Object target = targetExport.get();
Expand Down Expand Up @@ -439,7 +444,6 @@ public void view(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to view hierarchical table");
}
safelyComplete(responseObserver, HierarchicalTableViewResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -483,10 +487,12 @@ public void exportSource(
final SessionState.ExportObject<HierarchicalTable<?>> hierarchicalTableExport =
ticketRouter.resolve(session, request.getHierarchicalTableId(), "hierarchicalTableId");

session.newExport(request.getResultTableId(), "resultTableId")
session.<Table>newExport(request.getResultTableId(), "resultTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(hierarchicalTableExport)
.onError(responseObserver)
.onSuccess((final Table transformedResult) -> safelyComplete(responseObserver,
ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult)))
.submit(() -> {
final HierarchicalTable<?> hierarchicalTable = hierarchicalTableExport.get();

Expand All @@ -499,9 +505,6 @@ public void exportSource(
Code.FAILED_PRECONDITION,
"Not authorized to export source from hierarchical table");
}
final ExportedTableCreationResponse response =
ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult);
safelyComplete(responseObserver, response);
return transformedResult;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public ObjectServiceGrpcImpl(SessionService sessionService, TicketRouter ticketR
private enum EnqueuedState {
WAITING, RUNNING, CLOSED
}

private final class SendMessageObserver implements StreamObserver<StreamRequest> {
private final SessionState session;
private final StreamObserver<StreamResponse> responseObserver;
Expand Down Expand Up @@ -268,10 +269,12 @@ public void fetchObject(
final SessionState.ExportObject<Object> object =
ticketRouter.resolve(session, request.getSourceId().getTicket(), "sourceId");

session.nonExport()
session.<FetchObjectResponse>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(object)
.onError(responseObserver)
.onSuccess(
(final FetchObjectResponse response) -> GrpcUtil.safelyComplete(responseObserver, response))
.submit(() -> {
final Object o = object.get();
ObjectType objectTypeInstance = getObjectTypeInstance(type, o);
Expand Down Expand Up @@ -312,9 +315,7 @@ public void onCompleted() {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"Plugin didn't close response, use MessageStream instead for this object");
}
GrpcUtil.safelyComplete(responseObserver, message);

return null;
return message;
});
}
}
Expand Down
Loading
Loading