diff --git a/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java b/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java index 57fbc7d356f..5a7848a5d5d 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java +++ b/stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java @@ -22,8 +22,6 @@ import io.airlift.slice.Slice; import org.openjdk.jol.info.ClassLayout; -import javax.annotation.concurrent.NotThreadSafe; - import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -36,7 +34,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -@NotThreadSafe +/** + * This class is NOT thread safe. + */ final class DenseHll implements HllInstance { @@ -249,16 +249,14 @@ public void insert(int bucket, int value) } if (delta > MAX_DELTA) { - int overflow = delta - MAX_DELTA; - - if (!setOverflow(bucket, overflow)) { - // grow overflows arrays if necessary - overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); - overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); + byte overflow = (byte) (delta - MAX_DELTA); - overflowBuckets[overflows] = bucket; - overflowValues[overflows] = (byte) overflow; - overflows++; + int overflowEntry = findOverflowEntry(bucket); + if (overflowEntry != -1) { + setOverflow(overflowEntry, overflow); + } + else { + addOverflow(bucket, overflow); } delta = MAX_DELTA; @@ -272,30 +270,6 @@ public void insert(int bucket, int value) } } - private int getOverflow(int bucket) - { - for (int i = 0; i < overflows; i++) { - if (overflowBuckets[i] == bucket) { - return overflowValues[i]; - } - } - return 0; - } - - /** - * Returns false if not overflow bucket matching the given bucket id was found - */ - private boolean setOverflow(int bucket, int overflow) - { - for (int i = 0; i < overflows; i++) { - if (overflowBuckets[i] == bucket) { - overflowValues[i] = (byte) overflow; - return true; - } - } - return false; - } - public Slice serialize() { int size = estimatedSerializedSize(); @@ -455,42 +429,54 @@ public DenseHll mergeWith(DenseHll other) numberOfBuckets(other.indexBitLength))); } - int baseline = Math.max(this.baseline, other.baseline); + int newBaseline = Math.max(this.baseline, other.baseline); int baselineCount = 0; - int overflows = 0; - int[] overflowBuckets = new int[OVERFLOW_GROW_INCREMENT]; - byte[] overflowValues = new byte[OVERFLOW_GROW_INCREMENT]; + int bucket = 0; + for (int i = 0; i < deltas.length; i++) { + int newSlot = 0; - int numberOfBuckets = numberOfBuckets(indexBitLength); - for (int i = 0; i < numberOfBuckets; i++) { - int value = Math.max(getValue(i), other.getValue(i)); + byte slot1 = deltas[i]; + byte slot2 = other.deltas[i]; - int delta = value - baseline; - if (delta == 0) { - baselineCount++; - } - else if (delta > MAX_DELTA) { - // grow overflows arrays if necessary - overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); - overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); + for (int shift = 4; shift >= 0; shift -= 4) { + int delta1 = (slot1 >>> shift) & 0b1111; + int delta2 = (slot2 >>> shift) & 0b1111; + + int value1 = this.baseline + delta1; + int value2 = other.baseline + delta2; + + int overflowEntry = -1; + if (delta1 == MAX_DELTA) { + overflowEntry = findOverflowEntry(bucket); + if (overflowEntry != -1) { + value1 += overflowValues[overflowEntry]; + } + } + + if (delta2 == MAX_DELTA) { + value2 += other.getOverflow(bucket); + } + + int newValue = Math.max(value1, value2); + int newDelta = newValue - newBaseline; - overflowBuckets[overflows] = i; - overflowValues[overflows] = (byte) (delta - MAX_DELTA); + if (newDelta == 0) { + baselineCount++; + } - overflows++; + newDelta = updateOverflow(bucket, overflowEntry, newDelta); - delta = MAX_DELTA; + newSlot <<= 4; + newSlot |= newDelta; + bucket++; } - setDelta(i, delta); + this.deltas[i] = (byte) newSlot; } - this.baseline = (byte) baseline; + this.baseline = (byte) newBaseline; this.baselineCount = baselineCount; - this.overflows = overflows; - this.overflowBuckets = overflowBuckets; - this.overflowValues = overflowValues; // all baseline values in one of the HLLs lost to the values // in the other HLL, so we need to adjust the final baseline @@ -499,6 +485,87 @@ else if (delta > MAX_DELTA) { return this; } + /** + * Returns "this" for chaining + */ + public DenseHll mergeWith(SparseHll other) + { + if (indexBitLength != other.getIndexBitLength()) { + throw new IllegalArgumentException(String.format( + "Cannot merge HLLs with different number of buckets: %s vs %s", + numberOfBuckets(indexBitLength), + numberOfBuckets(other.getIndexBitLength()))); + } + + other.eachBucket(this::insert); + + return this; + } + + private int findOverflowEntry(int bucket) + { + for (int i = 0; i < overflows; i++) { + if (overflowBuckets[i] == bucket) { + return i; + } + } + return -1; + } + + private int getOverflow(int bucket) + { + for (int i = 0; i < overflows; i++) { + if (overflowBuckets[i] == bucket) { + return overflowValues[i]; + } + } + return 0; + } + + private int updateOverflow(int bucket, int overflowEntry, int delta) + { + if (delta > MAX_DELTA) { + if (overflowEntry != -1) { + // update existing overflow + setOverflow(overflowEntry, (byte) (delta - MAX_DELTA)); + } + else { + addOverflow(bucket, (byte) (delta - MAX_DELTA)); + } + delta = MAX_DELTA; + } + else if (overflowEntry != -1) { + removeOverflow(overflowEntry); + } + + return delta; + } + + private void setOverflow(int overflowEntry, byte overflow) + { + overflowValues[overflowEntry] = overflow; + } + + private void removeOverflow(int overflowEntry) + { + // remove existing overflow + overflowBuckets[overflowEntry] = overflowBuckets[overflows - 1]; + overflowValues[overflowEntry] = overflowValues[overflows - 1]; + overflows--; + } + + private void addOverflow(int bucket, byte overflow) + { + // add new delta + overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); + overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); + + overflowBuckets[overflows] = bucket; + overflowValues[overflows] = overflow; + + overflows++; + } + public static int estimatedInMemorySize(int indexBitLength) { // note: we don't take into account overflow entries since their number can vary diff --git a/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java b/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java index b8104e9354f..e8f552e2018 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java +++ b/stats/src/main/java/com/facebook/airlift/stats/cardinality/HyperLogLog.java @@ -18,8 +18,8 @@ import io.airlift.slice.Slice; import org.openjdk.jol.info.ClassLayout; -import static com.facebook.airlift.stats.cardinality.Utils.indexBitLength; import static com.facebook.airlift.stats.cardinality.Utils.numberOfBuckets; +import static com.facebook.airlift.stats.cardinality.Utils.indexBitLength; import static com.google.common.base.Preconditions.checkArgument; public class HyperLogLog @@ -85,6 +85,9 @@ public void mergeWith(HyperLogLog other) ((SparseHll) instance).mergeWith((SparseHll) other.instance); instance = makeDenseIfNecessary((SparseHll) instance); } + else if (instance instanceof DenseHll && other.instance instanceof SparseHll) { + ((DenseHll) instance).mergeWith((SparseHll) other.instance); + } else { DenseHll dense = instance.toDense(); dense.mergeWith(other.instance.toDense()); diff --git a/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java b/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java index 54012efd1e1..1052d0574b5 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java +++ b/stats/src/main/java/com/facebook/airlift/stats/cardinality/SparseHll.java @@ -22,8 +22,6 @@ import io.airlift.slice.Slice; import org.openjdk.jol.info.ClassLayout; -import javax.annotation.concurrent.NotThreadSafe; - import java.util.Arrays; import static com.facebook.airlift.stats.cardinality.Utils.computeIndex; @@ -36,7 +34,9 @@ import static java.lang.Math.toIntExact; import static java.util.Comparator.comparingInt; -@NotThreadSafe +/** + * This class is NOT thread safe. + */ final class SparseHll implements HllInstance { @@ -171,7 +171,7 @@ public void eachBucket(BucketListener listener) // if zeros > EXTENDED_BITS_LENGTH - indexBits, it means all those bits were zeros, // so look at the entry value, which contains the number of leading 0 *after* EXTENDED_BITS_LENGTH int bits = EXTENDED_PREFIX_BITS - indexBitLength; - if (zeros > bits) { + if (zeros >= bits) { zeros = bits + decodeBucketValue(entry); }