Skip to content

Add partial result to AggregateCursor continuation #3254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

pengpeng-lu
Copy link
Contributor

@pengpeng-lu pengpeng-lu commented Mar 17, 2025

  1. Add AggregateCursor.AggregateCursorContinuation, which contains partialAggregationResult.
  2. AggregateCursor returns AggregateCursor.AggregateCursorContinuation
  3. Add tests in record-layer and relational-layer to verify

@pengpeng-lu pengpeng-lu added the enhancement New feature or request label Mar 17, 2025
@pengpeng-lu
Copy link
Contributor Author

mixed mode test is failing. working on it.

(prefixLength < highBytes.length) &&
(lowBytes[prefixLength] == highBytes[prefixLength])) {
(prefixLength < highBytes.length) &&
(lowBytes[prefixLength] == highBytes[prefixLength])) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird auto formating, will remove it in the next commit.

Copy link
Collaborator

@alecgrieser alecgrieser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were a few things that looked a little concerning to me. I had some suggestions for a few more tests, but in places where I thought that there may be something wrong with the implementation, it may be a good idea to validate with a test whether the behavior is as I think it is, because I could be wrong about how some of these fit together!

@@ -52,29 +58,43 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
// Previous record processed by this cursor
@Nullable
private RecordCursorResult<QueryResult> previousResult;
@Nullable
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null
private RecordCursorResult<QueryResult> lastResult;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both previousResult and lastResult is a bit confusing. There may not be a great name for what is here called lastResult, though. I actually think we can remove this entirely, though, as I explain in a comment below.

Comment on lines +61 to +62
@Nullable
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Nullable
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null
@Nullable

In general, I think the annotation has to come after any comments

@Nullable
private RecordCursorProto.PartialAggregationResult partialAggregationResult;
@Nullable
private final byte[] continuation;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be (slightly) better to make this a RecordCursorContinuation instead of a byte[], though it may not actually matter. The benefit of using the class is that you can differentiate between START and END, and you can copy the initial byte[] into an immutable wrapper sooner, which can reduce copying. That being said, it's not much of a difference overall, I don't think.


public static class AggregateCursorContinuation implements RecordCursorContinuation {
@Nullable
private final ByteString innerContinuation;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a RecordCursorContinuation. The main benefit of this being a RecordCursorContinuation instead of a ByteString is that we will only actually serialize the inner continuation into bytes if someone calls toBytes() or toByteString() on this continuation object. When parsing the first continuation, this probably doesn't matter (as this will just end up being a ByteArrayContinuation anyway), but it can make a difference for intermediate results.

@Nullable
private RecordCursorProto.AggregateCursorContinuation cachedProto;

private final boolean isEnd;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having isEnd here is a bit of an anti-pattern. We don't always follow this rule, but the only RecordCursorContinuations that should return true for isEnd is the RecordCursorEndContinuation. If we are creating continuations here where isEnd is true, we should be able to replace those with RecordCursorEndContinuation.END.

@@ -455,9 +578,9 @@ public RecordQueryPlan build(final boolean useNestedResult) {
final var groupingKeyValue = RecordConstructorValue.ofUnnamed(groupValues);
final var aggregateValue = RecordConstructorValue.ofUnnamed(aggregateValues);
if (useNestedResult) {
return RecordQueryStreamingAggregationPlan.ofNested(quantifier, groupingKeyValue, aggregateValue);
return RecordQueryStreamingAggregationPlan.ofNested(quantifier, groupingKeyValue, aggregateValue, RecordQueryStreamingAggregationPlan.SerializationMode.TO_NEW);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to have a bit more of a mix here. Until we're done rolling this out, we probably need both TO_NEW and TO_OLD tests, which probably means adding this as a builder property and then setting it where appropriate. (That might mean parameterizing much of the existing tests.)

final var plan =
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord")
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.BitmapConstructAgg(NumericAggregationValue.PhysicalOperator.BITMAP_CONSTRUCT_AGG_I, value))
.withGroupCriterion("str_value_indexed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use a test here that uses the record scan limit but that doesn't have a grouping key.

Another interesting test might be: one where the aggregation plan sits on top of a filter cursor. I think as written, we won't make progress if, say, the filter cursor removes all of (or a lot of) the records below, but it would be nice to test that (and/or fix that bug with an appropriate test)

@@ -97,6 +98,7 @@ public class RecordQueryStreamingAggregationPlan implements RecordQueryPlanWithC
private final CorrelationIdentifier aggregateAlias;
@Nonnull
private final Value completeResultValue;
private final SerializationMode serializationMode;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should serializationMode be added to the plan equals method? I think that would be useful in test asserts to ensure that the plan's serialization mode is correctly encoded

@@ -83,6 +83,7 @@ public boolean hasNext() {

private void fetchNextResult() {
if (result != null) {
continuation = ContinuationImpl.fromRecordCursorContinuation(result.getContinuation());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this about? This change is a little bit notable given that it would affect more than just aggregate queries

.hasNoNextRow();
continuation = resultSet.getContinuation();
}
String postfix = " WITH CONTINUATION B64'" + Base64.getEncoder().encodeToString(continuation.serialize()) + "'";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests should probably use EXECUTE CONTINUATION instead of WITH CONTINUATION

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants