diff --git a/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java b/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java index 57fbc7d356..5a7848a5d5 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java +++ b/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java @@ -22,8 +22,6 @@ import io.airlift.slice.Slice; import org.openjdk.jol.info.ClassLayout; -import javax.annotation.concurrent.NotThreadSafe; - import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -36,7 +34,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -@NotThreadSafe +/** + * This class is NOT thread safe. + */ final class DenseHll implements HllInstance { @@ -249,16 +249,14 @@ public void insert(int bucket, int value) } if (delta > MAX_DELTA) { - int overflow = delta - MAX_DELTA; - - if (!setOverflow(bucket, overflow)) { - // grow overflows arrays if necessary - overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); - overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); + byte overflow = (byte) (delta - MAX_DELTA); - overflowBuckets[overflows] = bucket; - overflowValues[overflows] = (byte) overflow; - overflows++; + int overflowEntry = findOverflowEntry(bucket); + if (overflowEntry != -1) { + setOverflow(overflowEntry, overflow); + } + else { + addOverflow(bucket, overflow); } delta = MAX_DELTA; @@ -272,30 +270,6 @@ public void insert(int bucket, int value) } } - private int getOverflow(int bucket) - { - for (int i = 0; i < overflows; i++) { - if (overflowBuckets[i] == bucket) { - return overflowValues[i]; - } - } - return 0; - } - - /** - * Returns false if not overflow bucket matching the given bucket id was found - */ - private boolean setOverflow(int bucket, int overflow) - { - for (int i = 0; i < overflows; i++) { - if (overflowBuckets[i] == bucket) { - overflowValues[i] = (byte) overflow; - return true; - } - } - return false; - } - public Slice serialize() { int size = estimatedSerializedSize(); @@ -455,42 +429,54 @@ public DenseHll mergeWith(DenseHll other) numberOfBuckets(other.indexBitLength))); } - int baseline = Math.max(this.baseline, other.baseline); + int newBaseline = Math.max(this.baseline, other.baseline); int baselineCount = 0; - int overflows = 0; - int[] overflowBuckets = new int[OVERFLOW_GROW_INCREMENT]; - byte[] overflowValues = new byte[OVERFLOW_GROW_INCREMENT]; + int bucket = 0; + for (int i = 0; i < deltas.length; i++) { + int newSlot = 0; - int numberOfBuckets = numberOfBuckets(indexBitLength); - for (int i = 0; i < numberOfBuckets; i++) { - int value = Math.max(getValue(i), other.getValue(i)); + byte slot1 = deltas[i]; + byte slot2 = other.deltas[i]; - int delta = value - baseline; - if (delta == 0) { - baselineCount++; - } - else if (delta > MAX_DELTA) { - // grow overflows arrays if necessary - overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); - overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); + for (int shift = 4; shift >= 0; shift -= 4) { + int delta1 = (slot1 >>> shift) & 0b1111; + int delta2 = (slot2 >>> shift) & 0b1111; + + int value1 = this.baseline + delta1; + int value2 = other.baseline + delta2; + + int overflowEntry = -1; + if (delta1 == MAX_DELTA) { + overflowEntry = findOverflowEntry(bucket); + if (overflowEntry != -1) { + value1 += overflowValues[overflowEntry]; + } + } + + if (delta2 == MAX_DELTA) { + value2 += other.getOverflow(bucket); + } + + int newValue = Math.max(value1, value2); + int newDelta = newValue - newBaseline; - overflowBuckets[overflows] = i; - overflowValues[overflows] = (byte) (delta - MAX_DELTA); + if (newDelta == 0) { + baselineCount++; + } - overflows++; + newDelta = updateOverflow(bucket, overflowEntry, newDelta); - delta = MAX_DELTA; + newSlot <<= 4; + newSlot |= newDelta; + bucket++; } - setDelta(i, delta); + this.deltas[i] = (byte) newSlot; } - this.baseline = (byte) baseline; + this.baseline = (byte) newBaseline; this.baselineCount = baselineCount; - this.overflows = overflows; - this.overflowBuckets = overflowBuckets; - this.overflowValues = overflowValues; // all baseline values in one of the HLLs lost to the values // in the other HLL, so we need to adjust the final baseline @@ -499,6 +485,87 @@ else if (delta > MAX_DELTA) { return this; } + /** + * Returns "this" for chaining + */ + public DenseHll mergeWith(SparseHll other) + { + if (indexBitLength != other.getIndexBitLength()) { + throw new IllegalArgumentException(String.format( + "Cannot merge HLLs with different number of buckets: %s vs %s", + numberOfBuckets(indexBitLength), + numberOfBuckets(other.getIndexBitLength()))); + } + + other.eachBucket(this::insert); + + return this; + } + + private int findOverflowEntry(int bucket) + { + for (int i = 0; i < overflows; i++) { + if (overflowBuckets[i] == bucket) { + return i; + } + } + return -1; + } + + private int getOverflow(int bucket) + { + for (int i = 0; i < overflows; i++) { + if (overflowBuckets[i] == bucket) { + return overflowValues[i]; + } + } + return 0; + } + + private int updateOverflow(int bucket, int overflowEntry, int delta) + { + if (delta > MAX_DELTA) { + if (overflowEntry != -1) { + // update existing overflow + setOverflow(overflowEntry, (byte) (delta - MAX_DELTA)); + } + else { + addOverflow(bucket, (byte) (delta - MAX_DELTA)); + } + delta = MAX_DELTA; + } + else if (overflowEntry != -1) { + removeOverflow(overflowEntry); + } + + return delta; + } + + private void setOverflow(int overflowEntry, byte overflow) + { + overflowValues[overflowEntry] = overflow; + } + + private void removeOverflow(int overflowEntry) + { + // remove existing overflow + overflowBuckets[overflowEntry] = overflowBuckets[overflows - 1]; + overflowValues[overflowEntry] = overflowValues[overflows - 1]; + overflows--; + } + + private void addOverflow(int bucket, byte overflow) + { + // add new delta + overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); + overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); + + overflowBuckets[overflows] = bucket; + overflowValues[overflows] = overflow; + + overflows++; + } + public static int estimatedInMemorySize(int indexBitLength) { // note: we don't take into account overflow entries since their number can vary diff --git a/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java b/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java index b8104e9354..93f798baf9 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java +++ b/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java @@ -85,6 +85,9 @@ public void mergeWith(HyperLogLog other) ((SparseHll) instance).mergeWith((SparseHll) other.instance); instance = makeDenseIfNecessary((SparseHll) instance); } + else if (instance instanceof DenseHll && other.instance instanceof SparseHll) { + ((DenseHll) instance).mergeWith((SparseHll) other.instance); + } else { DenseHll dense = instance.toDense(); dense.mergeWith(other.instance.toDense()); diff --git a/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java b/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java index 54012efd1e..1052d0574b 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java +++ b/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java @@ -22,8 +22,6 @@ import io.airlift.slice.Slice; import org.openjdk.jol.info.ClassLayout; -import javax.annotation.concurrent.NotThreadSafe; - import java.util.Arrays; import static com.facebook.airlift.stats.cardinality.Utils.computeIndex; @@ -36,7 +34,9 @@ import static java.lang.Math.toIntExact; import static java.util.Comparator.comparingInt; -@NotThreadSafe +/** + * This class is NOT thread safe. + */ final class SparseHll implements HllInstance { @@ -171,7 +171,7 @@ public void eachBucket(BucketListener listener) // if zeros > EXTENDED_BITS_LENGTH - indexBits, it means all those bits were zeros, // so look at the entry value, which contains the number of leading 0 *after* EXTENDED_BITS_LENGTH int bits = EXTENDED_PREFIX_BITS - indexBitLength; - if (zeros > bits) { + if (zeros >= bits) { zeros = bits + decodeBucketValue(entry); } diff --git a/stats/src/test/java/com/facebook/airlift/stats/cardinality/BenchmarkDenseHll.java b/stats/src/test/java/com/facebook/airlift/stats/cardinality/BenchmarkDenseHll.java index 65f6d24f6e..284ee9be11 100644 --- a/stats/src/test/java/com/facebook/airlift/stats/cardinality/BenchmarkDenseHll.java +++ b/stats/src/test/java/com/facebook/airlift/stats/cardinality/BenchmarkDenseHll.java @@ -14,9 +14,11 @@ package com.facebook.airlift.stats.cardinality; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; @@ -32,14 +34,18 @@ import java.util.concurrent.TimeUnit; @State(Scope.Thread) -@OutputTimeUnit(TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@BenchmarkMode(Mode.AverageTime) @Fork(5) @Warmup(iterations = 5, time = 500, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) public class BenchmarkDenseHll { + private static final int LARGE_CARDINALITY = 1_000_000; + private static final int SMALL_CARDINALITY = 100; + @Benchmark - public DenseHll benchmarkInsert(Data data) + public DenseHll benchmarkInsert(InsertData data) { for (long hash : data.hashes) { data.instance.insertHash(hash); @@ -48,10 +54,22 @@ public DenseHll benchmarkInsert(Data data) return data.instance; } + @Benchmark + public DenseHll benchmarkMergeWithDense(MergeWithDenseData data) + { + return data.base.mergeWith(data.toMerge); + } + + @Benchmark + public DenseHll benchmarkMergeWithSparse(MergeWithSparseData data) + { + return data.base.mergeWith(data.toMerge); + } + @State(Scope.Thread) - public static class Data + public static class InsertData { - public final DenseHll instance = new DenseHll(11); + public final DenseHll instance = new DenseHll(12); public final long[] hashes = new long[500]; @Setup(Level.Iteration) @@ -63,6 +81,51 @@ public void initialize() } } + @State(Scope.Thread) + public static class MergeWithDenseData + { + public DenseHll base; + public DenseHll toMerge; + + @Setup(Level.Iteration) + public void initialize() + { + base = new DenseHll(12); + for (int i = 0; i < LARGE_CARDINALITY; i++) { + base.insertHash(ThreadLocalRandom.current().nextLong()); + } + + // Small cardinality so we can do an apples-to-apples comparison + // between dense/dense vs dense/sparse merge. Sparse only supports + // small cardinalities. + toMerge = new DenseHll(12); + for (int i = 0; i < SMALL_CARDINALITY; i++) { + toMerge.insertHash(ThreadLocalRandom.current().nextLong()); + } + } + } + + @State(Scope.Thread) + public static class MergeWithSparseData + { + public DenseHll base; + public SparseHll toMerge; + + @Setup(Level.Iteration) + public void initialize() + { + base = new DenseHll(12); + for (int i = 0; i < LARGE_CARDINALITY; i++) { + base.insertHash(ThreadLocalRandom.current().nextLong()); + } + + toMerge = new SparseHll(12); + for (int i = 0; i < SMALL_CARDINALITY; i++) { + toMerge.insertHash(ThreadLocalRandom.current().nextLong()); + } + } + } + public static void main(String[] args) throws RunnerException { diff --git a/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestHyperLogLog.java b/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestHyperLogLog.java index 4a723f153b..6dd0b0816c 100644 --- a/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestHyperLogLog.java +++ b/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestHyperLogLog.java @@ -24,6 +24,7 @@ import static com.facebook.airlift.stats.cardinality.TestUtils.sequence; import static io.airlift.slice.testing.SliceAssertions.assertSlicesEqual; +import static java.lang.Math.toIntExact; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -31,7 +32,6 @@ public class TestHyperLogLog { @Test public void testEstimates() - throws Exception { int trials = 1000; for (int indexBits = 4; indexBits <= 13; indexBits++) { @@ -80,7 +80,8 @@ public void testRetainedSize() { assertEquals( HyperLogLog.newInstance(8).estimatedInMemorySize(), - ClassLayout.parseClass(HyperLogLog.class).instanceSize() + (new SparseHll(10)).estimatedInMemorySize()); + toIntExact(ClassLayout.parseClass(HyperLogLog.class).instanceSize() + + (new SparseHll(10)).estimatedInMemorySize())); } @Test @@ -127,16 +128,6 @@ private void verifyMerge(List one, List two) assertEquals(hll1.serialize(), expected.serialize()); } - @Test - public void testNumberOfBuckets() - { - int[] bucketCounts = {16, 32, 64, 128, 256, 512, 1024, 2048, 4096}; - for (int count : bucketCounts) { - HyperLogLog hll = HyperLogLog.newInstance(count); - assertEquals(hll.getNumberOfBuckets(), count); - } - } - @Test public void testRoundtrip() throws Exception diff --git a/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestSparseHll.java b/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestSparseHll.java index a82b523a93..3f936110f5 100644 --- a/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestSparseHll.java +++ b/stats/src/test/java/com/facebook/airlift/stats/cardinality/TestSparseHll.java @@ -21,7 +21,6 @@ import java.util.List; -import static com.facebook.airlift.stats.cardinality.TestUtils.createHashForBucket; import static com.facebook.airlift.stats.cardinality.TestUtils.sequence; import static io.airlift.slice.SizeOf.sizeOf; import static io.airlift.slice.testing.SliceAssertions.assertSlicesEqual; @@ -32,41 +31,18 @@ public class TestSparseHll private static final int SPARSE_HLL_INSTANCE_SIZE = ClassLayout.parseClass(SparseHll.class).instanceSize(); @Test(dataProvider = "bits") - public void testCorrectNumberOfZeros(int indexBitLength) + public void testNumberOfZeros(int indexBitLength) { - SparseHll sparseHll = new SparseHll(indexBitLength); - // Note: the peculiar minus six in the following line reflects a surprising edge case. - // See https://github.com/prestodb/airlift/issues/56. - int limit = Math.min(Long.SIZE - indexBitLength - 6, Utils.numberOfBuckets(indexBitLength)); - for (int i = 0; i < limit; i++) { - // insert a hash for bucket i that has i leading zeros - sparseHll.insertHash(createHashForBucket(indexBitLength, i, i)); - } - - // each non-empty bucket should have value index + 1 - sparseHll.eachBucket((i, value) -> assertEquals(value, i + 1)); - } - - @Test(dataProvider = "bits") - public void testCorrectNumberOfZerosOnUpdate(int indexBitLength) - { - SparseHll sparseHll = new SparseHll(indexBitLength); - int limit = Math.min(Long.SIZE - indexBitLength - 6, Utils.numberOfBuckets(indexBitLength)); - for (int i = 0; i < limit; i++) { - // insert a hash for bucket i that has no leading zeros - sparseHll.insertHash(createHashForBucket(indexBitLength, i, 0)); - } - for (int i = 0; i < limit; i++) { - // insert a hash for bucket i that has i leading zeros - sparseHll.insertHash(createHashForBucket(indexBitLength, i, i)); - } - - // each bucket from 0 to limit should have value index + 1 - // Note: SparseHll may return multiple values for each bucket, so we keep track of the largest only. - int[] values = new int[limit]; - sparseHll.eachBucket((i, value) -> values[i] = Math.max(values[i], value)); - for (int i = 0; i < limit; i++) { - assertEquals(values[i], i + 1); + for (int i = 0; i < 64 - indexBitLength; i++) { + long hash = 1L << i; + int expectedValue = Long.numberOfLeadingZeros(hash << indexBitLength) + 1; + + SparseHll sparseHll = new SparseHll(indexBitLength); + sparseHll.insertHash(hash); + sparseHll.eachBucket((bucket, value) -> { + assertEquals(bucket, 0); + assertEquals(value, expectedValue); + }); } }