Skip to content

Commit af674a3

Browse files
committed
Enable time-series block hash
1 parent feb44c5 commit af674a3

File tree

8 files changed

+257
-47
lines changed

8 files changed

+257
-47
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.elasticsearch.core.ReleasableIterator;
3131
import org.elasticsearch.core.Releasables;
3232

33-
import java.util.Objects;
34-
3533
/**
3634
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
3735
* Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -41,7 +39,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
4139
private final int tsHashChannel;
4240
private final int timestampIntervalChannel;
4341

44-
private final BytesRef lastTsid = new BytesRef();
42+
private int lastTsidPosition = 0;
4543
private final BytesRefArrayWithSize tsidArray;
4644

4745
private long lastTimestamp;
@@ -64,51 +62,88 @@ public void close() {
6462
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
6563
}
6664

65+
private OrdinalBytesRefVector getTsidVector(Page page) {
66+
BytesRefBlock block = page.getBlock(tsHashChannel);
67+
var ordinalBlock = block.asOrdinals();
68+
if (ordinalBlock == null) {
69+
throw new IllegalStateException("expected ordinal block for tsid");
70+
}
71+
var ordinalVector = ordinalBlock.asVector();
72+
if (ordinalVector == null) {
73+
throw new IllegalStateException("expected ordinal vector for tsid");
74+
}
75+
return ordinalVector;
76+
}
77+
78+
private LongVector getTimestampVector(Page page) {
79+
final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
80+
LongVector timestampsVector = timestampsBlock.asVector();
81+
if (timestampsVector == null) {
82+
throw new IllegalStateException("expected long vector for timestamp");
83+
}
84+
return timestampsVector;
85+
}
86+
6787
@Override
6888
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
69-
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
70-
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
71-
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
72-
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
73-
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
89+
final BytesRefVector tsidDict;
90+
final IntVector tsidOrdinals;
91+
{
92+
final var tsidVector = getTsidVector(page);
93+
tsidDict = tsidVector.getDictionaryVector();
94+
tsidOrdinals = tsidVector.getOrdinalsVector();
95+
}
96+
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
7497
final BytesRef spare = new BytesRef();
75-
// TODO: optimize incoming ordinal block
76-
for (int i = 0; i < tsidVector.getPositionCount(); i++) {
77-
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
98+
final LongVector timestampVector = getTimestampVector(page);
99+
int lastOrd = -1;
100+
for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
101+
final int newOrd = tsidOrdinals.getInt(i);
102+
boolean newGroup = false;
103+
if (lastOrd != newOrd) {
104+
final var newTsid = tsidDict.getBytesRef(newOrd, spare);
105+
if (positionCount() == 0) {
106+
newGroup = true;
107+
} else if (lastOrd == -1) {
108+
newGroup = lastTsid().equals(newTsid) == false;
109+
} else {
110+
newGroup = true;
111+
}
112+
if (newGroup) {
113+
endTsidGroup();
114+
lastTsidPosition = tsidArray.count;
115+
tsidArray.append(newTsid);
116+
}
117+
lastOrd = newOrd;
118+
}
78119
final long timestamp = timestampVector.getLong(i);
79-
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
120+
if (newGroup || timestamp != lastTimestamp) {
121+
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
122+
timestampArray.append(timestamp);
123+
lastTimestamp = timestamp;
124+
currentTimestampCount++;
125+
}
126+
ordsBuilder.appendInt(timestampArray.count - 1);
80127
}
81128
try (var ords = ordsBuilder.build()) {
82129
addInput.add(0, ords);
83130
}
84131
}
85132
}
86133

87-
private int addOnePosition(BytesRef tsid, long timestamp) {
88-
boolean newGroup = false;
89-
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90-
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
91-
endTsidGroup();
92-
tsidArray.append(tsid);
93-
tsidArray.get(tsidArray.count - 1, lastTsid);
94-
newGroup = true;
95-
}
96-
if (newGroup || timestamp != lastTimestamp) {
97-
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
98-
timestampArray.append(timestamp);
99-
lastTimestamp = timestamp;
100-
currentTimestampCount++;
101-
}
102-
return positionCount() - 1;
103-
}
104-
105134
private void endTsidGroup() {
106135
if (currentTimestampCount > 0) {
107136
perTsidCountArray.append(currentTimestampCount);
108137
currentTimestampCount = 0;
109138
}
110139
}
111140

141+
private BytesRef lastTsid() {
142+
final BytesRef bytesRef = new BytesRef();
143+
tsidArray.get(lastTsidPosition, bytesRef);
144+
return bytesRef;
145+
}
146+
112147
@Override
113148
public ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
114149
throw new UnsupportedOperationException("TODO");

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
7575
}
7676

7777
@Override
78-
public BytesRefVector asVector() {
78+
public OrdinalBytesRefVector asVector() {
7979
IntVector vector = ordinals.asVector();
8080
if (vector != null) {
8181
return new OrdinalBytesRefVector(vector, bytes);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
1515
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
1616
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
17+
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1718
import org.elasticsearch.compute.data.Block;
1819
import org.elasticsearch.compute.data.ElementType;
1920
import org.elasticsearch.compute.data.LongBlock;
@@ -30,6 +31,7 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
3031

3132
public record Factory(
3233
Rounding.Prepared timeBucket,
34+
boolean sortedInput,
3335
List<BlockHash.GroupSpec> groups,
3436
AggregatorMode aggregatorMode,
3537
List<GroupingAggregator.Factory> aggregators,
@@ -38,17 +40,18 @@ public record Factory(
3840
@Override
3941
public Operator get(DriverContext driverContext) {
4042
// TODO: use TimeSeriesBlockHash when possible
41-
return new TimeSeriesAggregationOperator(
42-
timeBucket,
43-
aggregators,
44-
() -> BlockHash.build(
45-
groups,
46-
driverContext.blockFactory(),
47-
maxPageSize,
48-
true // we can enable optimizations as the inputs are vectors
49-
),
50-
driverContext
51-
);
43+
return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
44+
if (sortedInput && groups.size() == 2) {
45+
return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
46+
} else {
47+
return BlockHash.build(
48+
groups,
49+
driverContext.blockFactory(),
50+
maxPageSize,
51+
true // we can enable optimizations as the inputs are vectors
52+
);
53+
}
54+
}, driverContext);
5255
}
5356

5457
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222
import org.elasticsearch.compute.data.IntBlock;
2323
import org.elasticsearch.compute.data.IntVector;
2424
import org.elasticsearch.compute.data.LongBlock;
25+
import org.elasticsearch.compute.data.LongVector;
2526
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
2627
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2728
import org.elasticsearch.compute.data.Page;
2829
import org.elasticsearch.compute.test.TestBlockFactory;
2930
import org.elasticsearch.core.Releasable;
3031
import org.elasticsearch.core.ReleasableIterator;
3132
import org.elasticsearch.core.Releasables;
33+
import org.elasticsearch.xpack.esql.core.util.Holder;
3234
import org.junit.After;
3335

3436
import java.util.ArrayList;
3537
import java.util.Arrays;
3638
import java.util.HashSet;
3739
import java.util.List;
40+
import java.util.Locale;
3841
import java.util.Set;
3942
import java.util.function.Consumer;
4043
import java.util.stream.IntStream;
@@ -1326,6 +1329,115 @@ public void close() {
13261329
}
13271330
}
13281331

1332+
public void testTimeSeriesBlockHash() {
1333+
long endTime = randomLongBetween(10_000_000, 20_000_000);
1334+
var hash1 = new TimeSeriesBlockHash(0, 1, blockFactory);
1335+
var hash2 = BlockHash.build(
1336+
List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.LONG)),
1337+
blockFactory,
1338+
32 * 1024,
1339+
forcePackedHash
1340+
);
1341+
int numPages = between(1, 100);
1342+
int globalTsid = -1;
1343+
long timestamp = endTime;
1344+
try (hash1; hash2) {
1345+
for (int p = 0; p < numPages; p++) {
1346+
int numRows = between(1, 1000);
1347+
if (randomBoolean()) {
1348+
timestamp -= between(0, 100);
1349+
}
1350+
try (
1351+
BytesRefVector.Builder dictBuilder = blockFactory.newBytesRefVectorBuilder(numRows);
1352+
IntVector.Builder ordinalBuilder = blockFactory.newIntVectorBuilder(numRows);
1353+
LongVector.Builder timestampsBuilder = blockFactory.newLongVectorBuilder(numRows)
1354+
) {
1355+
int perPageOrd = -1;
1356+
for (int i = 0; i < numRows; i++) {
1357+
boolean newGroup = globalTsid == -1 || randomInt(100) < 10;
1358+
if (newGroup) {
1359+
globalTsid++;
1360+
timestamp = endTime;
1361+
if (randomBoolean()) {
1362+
timestamp -= between(0, 1000);
1363+
}
1364+
}
1365+
if (perPageOrd == -1 || newGroup) {
1366+
perPageOrd++;
1367+
dictBuilder.appendBytesRef(new BytesRef(String.format(Locale.ROOT, "id-%06d", globalTsid)));
1368+
}
1369+
ordinalBuilder.appendInt(perPageOrd);
1370+
if (randomInt(100) < 20) {
1371+
timestamp -= between(1, 10);
1372+
}
1373+
timestampsBuilder.appendLong(timestamp);
1374+
}
1375+
try (
1376+
var tsidBlock = new OrdinalBytesRefVector(ordinalBuilder.build(), dictBuilder.build()).asBlock();
1377+
var timestampBlock = timestampsBuilder.build().asBlock()
1378+
) {
1379+
Page page = new Page(tsidBlock, timestampBlock);
1380+
Holder<IntVector> ords1 = new Holder<>();
1381+
hash1.add(page, new GroupingAggregatorFunction.AddInput() {
1382+
@Override
1383+
public void add(int positionOffset, IntBlock groupIds) {
1384+
throw new AssertionError("time-series block hash should emit a vector");
1385+
}
1386+
1387+
@Override
1388+
public void add(int positionOffset, IntVector groupIds) {
1389+
groupIds.incRef();
1390+
ords1.set(groupIds);
1391+
}
1392+
1393+
@Override
1394+
public void close() {
1395+
1396+
}
1397+
});
1398+
Holder<IntVector> ords2 = new Holder<>();
1399+
hash2.add(page, new GroupingAggregatorFunction.AddInput() {
1400+
@Override
1401+
public void add(int positionOffset, IntBlock groupIds) {
1402+
// TODO: check why PackedValuesBlockHash doesn't emit a vector?
1403+
IntVector vector = groupIds.asVector();
1404+
assertNotNull("should emit a vector", vector);
1405+
vector.incRef();
1406+
ords2.set(vector);
1407+
}
1408+
1409+
@Override
1410+
public void add(int positionOffset, IntVector groupIds) {
1411+
groupIds.incRef();
1412+
ords2.set(groupIds);
1413+
}
1414+
1415+
@Override
1416+
public void close() {
1417+
1418+
}
1419+
});
1420+
try {
1421+
assertThat("input=" + page, ords1.get(), equalTo(ords2.get()));
1422+
} finally {
1423+
Releasables.close(ords1.get(), ords2.get());
1424+
}
1425+
}
1426+
}
1427+
}
1428+
Block[] keys1 = null;
1429+
Block[] keys2 = null;
1430+
try {
1431+
keys1 = hash1.getKeys();
1432+
keys2 = hash2.getKeys();
1433+
assertThat(keys1, equalTo(keys2));
1434+
} finally {
1435+
Releasables.close(keys1);
1436+
Releasables.close(keys2);
1437+
}
1438+
}
1439+
}
1440+
13291441
record OrdsAndKeys(String description, int positionOffset, IntBlock ords, Block[] keys, IntVector nonEmpty) {}
13301442

13311443
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.EvalOperator;
1919
import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
2020
import org.elasticsearch.compute.operator.Operator;
21-
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
2221
import org.elasticsearch.index.analysis.AnalysisRegistry;
2322
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
2423
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -175,12 +174,12 @@ else if (aggregatorMode.isOutputPartial()) {
175174
);
176175
// time-series aggregation
177176
if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
178-
operatorFactory = new TimeSeriesAggregationOperator.Factory(
179-
ts.timeBucketRounding(context.foldCtx()),
180-
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
177+
operatorFactory = timeSeriesAggregatorOperatorFactor(
178+
ts,
181179
aggregatorMode,
182180
aggregatorFactories,
183-
context.pageSize(aggregateExec.estimatedRowSize())
181+
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
182+
context
184183
);
185184
// ordinal grouping
186185
} else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -379,4 +378,12 @@ public abstract Operator.OperatorFactory ordinalGroupingOperatorFactory(
379378
ElementType groupType,
380379
LocalExecutionPlannerContext context
381380
);
381+
382+
public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
383+
TimeSeriesAggregateExec ts,
384+
AggregatorMode aggregatorMode,
385+
List<GroupingAggregator.Factory> aggregatorFactories,
386+
List<BlockHash.GroupSpec> groupSpecs,
387+
LocalExecutionPlannerContext context
388+
);
382389
}

0 commit comments

Comments
 (0)