-
Notifications
You must be signed in to change notification settings - Fork 103
Add partial result to AggregateCursor continuation #3254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
mixed mode test is failing. working on it. |
(prefixLength < highBytes.length) && | ||
(lowBytes[prefixLength] == highBytes[prefixLength])) { | ||
(prefixLength < highBytes.length) && | ||
(lowBytes[prefixLength] == highBytes[prefixLength])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird auto formating, will remove it in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were a few things that looked a little concerning to me. I had some suggestions for a few more tests, but in places where I thought that there may be something wrong with the implementation, it may be a good idea to validate with a test whether the behavior is as I think it is, because I could be wrong about how some of these fit together!
@@ -52,29 +58,43 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes | |||
// Previous record processed by this cursor | |||
@Nullable | |||
private RecordCursorResult<QueryResult> previousResult; | |||
@Nullable | |||
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null | |||
private RecordCursorResult<QueryResult> lastResult; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having both previousResult
and lastResult
is a bit confusing. There may not be a great name for what is here called lastResult
, though. I actually think we can remove this entirely, though, as I explain in a comment below.
@Nullable | ||
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable | |
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null | |
// when previousResult = row x, lastResult = row (x-1); when previousResult = null, lastResult = null | |
@Nullable |
In general, I think the annotation has to come after any comments
@Nullable | ||
private RecordCursorProto.PartialAggregationResult partialAggregationResult; | ||
@Nullable | ||
private final byte[] continuation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be (slightly) better to make this a RecordCursorContinuation
instead of a byte[]
, though it may not actually matter. The benefit of using the class is that you can differentiate between START
and END
, and you can copy the initial byte[]
into an immutable wrapper sooner, which can reduce copying. That being said, it's not much of a difference overall, I don't think.
|
||
public static class AggregateCursorContinuation implements RecordCursorContinuation { | ||
@Nullable | ||
private final ByteString innerContinuation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be a RecordCursorContinuation
. The main benefit of this being a RecordCursorContinuation
instead of a ByteString
is that we will only actually serialize the inner continuation into bytes if someone calls toBytes()
or toByteString()
on this continuation object. When parsing the first continuation, this probably doesn't matter (as this will just end up being a ByteArrayContinuation
anyway), but it can make a difference for intermediate results.
@Nullable | ||
private RecordCursorProto.AggregateCursorContinuation cachedProto; | ||
|
||
private final boolean isEnd; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having isEnd
here is a bit of an anti-pattern. We don't always follow this rule, but the only RecordCursorContinuation
s that should return true
for isEnd
is the RecordCursorEndContinuation
. If we are creating continuations here where isEnd
is true, we should be able to replace those with RecordCursorEndContinuation.END
.
@@ -455,9 +578,9 @@ public RecordQueryPlan build(final boolean useNestedResult) { | |||
final var groupingKeyValue = RecordConstructorValue.ofUnnamed(groupValues); | |||
final var aggregateValue = RecordConstructorValue.ofUnnamed(aggregateValues); | |||
if (useNestedResult) { | |||
return RecordQueryStreamingAggregationPlan.ofNested(quantifier, groupingKeyValue, aggregateValue); | |||
return RecordQueryStreamingAggregationPlan.ofNested(quantifier, groupingKeyValue, aggregateValue, RecordQueryStreamingAggregationPlan.SerializationMode.TO_NEW); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to have a bit more of a mix here. Until we're done rolling this out, we probably need both TO_NEW
and TO_OLD
tests, which probably means adding this as a builder property and then setting it where appropriate. (That might mean parameterizing much of the existing tests.)
final var plan = | ||
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord") | ||
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.BitmapConstructAgg(NumericAggregationValue.PhysicalOperator.BITMAP_CONSTRUCT_AGG_I, value)) | ||
.withGroupCriterion("str_value_indexed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use a test here that uses the record scan limit but that doesn't have a grouping key.
Another interesting test might be: one where the aggregation plan sits on top of a filter cursor. I think as written, we won't make progress if, say, the filter cursor removes all of (or a lot of) the records below, but it would be nice to test that (and/or fix that bug with an appropriate test)
@@ -97,6 +98,7 @@ public class RecordQueryStreamingAggregationPlan implements RecordQueryPlanWithC | |||
private final CorrelationIdentifier aggregateAlias; | |||
@Nonnull | |||
private final Value completeResultValue; | |||
private final SerializationMode serializationMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should serializationMode
be added to the plan equals
method? I think that would be useful in test asserts to ensure that the plan's serialization mode is correctly encoded
@@ -83,6 +83,7 @@ public boolean hasNext() { | |||
|
|||
private void fetchNextResult() { | |||
if (result != null) { | |||
continuation = ContinuationImpl.fromRecordCursorContinuation(result.getContinuation()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this about? This change is a little bit notable given that it would affect more than just aggregate queries
.hasNoNextRow(); | ||
continuation = resultSet.getContinuation(); | ||
} | ||
String postfix = " WITH CONTINUATION B64'" + Base64.getEncoder().encodeToString(continuation.serialize()) + "'"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests should probably use EXECUTE CONTINUATION
instead of WITH CONTINUATION
AggregateCursor.AggregateCursorContinuation
, which contains partialAggregationResult.