Skip to content

Commit

Permalink
Optimize HyperLogLog performance
Browse files Browse the repository at this point in the history
  • Loading branch information
pranjalssh committed Feb 14, 2024
1 parent a9b59df commit ce2d214
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 65 deletions.
187 changes: 127 additions & 60 deletions stats/src/main/java/com/facebook/airlift/stats/cardinality/DenseHll.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.airlift.slice.Slice;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -36,7 +34,9 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

@NotThreadSafe
/**
* This class is NOT thread safe.
*/
final class DenseHll
implements HllInstance
{
Expand Down Expand Up @@ -249,16 +249,14 @@ public void insert(int bucket, int value)
}

if (delta > MAX_DELTA) {
int overflow = delta - MAX_DELTA;

if (!setOverflow(bucket, overflow)) {
// grow overflows arrays if necessary
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);
byte overflow = (byte) (delta - MAX_DELTA);

overflowBuckets[overflows] = bucket;
overflowValues[overflows] = (byte) overflow;
overflows++;
int overflowEntry = findOverflowEntry(bucket);
if (overflowEntry != -1) {
setOverflow(overflowEntry, overflow);
}
else {
addOverflow(bucket, overflow);
}

delta = MAX_DELTA;
Expand All @@ -272,30 +270,6 @@ public void insert(int bucket, int value)
}
}

private int getOverflow(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return overflowValues[i];
}
}
return 0;
}

/**
* Returns false if not overflow bucket matching the given bucket id was found
*/
private boolean setOverflow(int bucket, int overflow)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
overflowValues[i] = (byte) overflow;
return true;
}
}
return false;
}

public Slice serialize()
{
int size = estimatedSerializedSize();
Expand Down Expand Up @@ -455,42 +429,54 @@ public DenseHll mergeWith(DenseHll other)
numberOfBuckets(other.indexBitLength)));
}

int baseline = Math.max(this.baseline, other.baseline);
int newBaseline = Math.max(this.baseline, other.baseline);
int baselineCount = 0;

int overflows = 0;
int[] overflowBuckets = new int[OVERFLOW_GROW_INCREMENT];
byte[] overflowValues = new byte[OVERFLOW_GROW_INCREMENT];
int bucket = 0;
for (int i = 0; i < deltas.length; i++) {
int newSlot = 0;

int numberOfBuckets = numberOfBuckets(indexBitLength);
for (int i = 0; i < numberOfBuckets; i++) {
int value = Math.max(getValue(i), other.getValue(i));
byte slot1 = deltas[i];
byte slot2 = other.deltas[i];

int delta = value - baseline;
if (delta == 0) {
baselineCount++;
}
else if (delta > MAX_DELTA) {
// grow overflows arrays if necessary
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);
for (int shift = 4; shift >= 0; shift -= 4) {
int delta1 = (slot1 >>> shift) & 0b1111;
int delta2 = (slot2 >>> shift) & 0b1111;

int value1 = this.baseline + delta1;
int value2 = other.baseline + delta2;

int overflowEntry = -1;
if (delta1 == MAX_DELTA) {
overflowEntry = findOverflowEntry(bucket);
if (overflowEntry != -1) {
value1 += overflowValues[overflowEntry];
}
}

if (delta2 == MAX_DELTA) {
value2 += other.getOverflow(bucket);
}

int newValue = Math.max(value1, value2);
int newDelta = newValue - newBaseline;

overflowBuckets[overflows] = i;
overflowValues[overflows] = (byte) (delta - MAX_DELTA);
if (newDelta == 0) {
baselineCount++;
}

overflows++;
newDelta = updateOverflow(bucket, overflowEntry, newDelta);

delta = MAX_DELTA;
newSlot <<= 4;
newSlot |= newDelta;
bucket++;
}

setDelta(i, delta);
this.deltas[i] = (byte) newSlot;
}

this.baseline = (byte) baseline;
this.baseline = (byte) newBaseline;
this.baselineCount = baselineCount;
this.overflows = overflows;
this.overflowBuckets = overflowBuckets;
this.overflowValues = overflowValues;

// all baseline values in one of the HLLs lost to the values
// in the other HLL, so we need to adjust the final baseline
Expand All @@ -499,6 +485,87 @@ else if (delta > MAX_DELTA) {
return this;
}

/**
* Returns "this" for chaining
*/
public DenseHll mergeWith(SparseHll other)
{
if (indexBitLength != other.getIndexBitLength()) {
throw new IllegalArgumentException(String.format(
"Cannot merge HLLs with different number of buckets: %s vs %s",
numberOfBuckets(indexBitLength),
numberOfBuckets(other.getIndexBitLength())));
}

other.eachBucket(this::insert);

return this;
}

private int findOverflowEntry(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return i;
}
}
return -1;
}

private int getOverflow(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return overflowValues[i];
}
}
return 0;
}

private int updateOverflow(int bucket, int overflowEntry, int delta)
{
if (delta > MAX_DELTA) {
if (overflowEntry != -1) {
// update existing overflow
setOverflow(overflowEntry, (byte) (delta - MAX_DELTA));
}
else {
addOverflow(bucket, (byte) (delta - MAX_DELTA));
}
delta = MAX_DELTA;
}
else if (overflowEntry != -1) {
removeOverflow(overflowEntry);
}

return delta;
}

private void setOverflow(int overflowEntry, byte overflow)
{
overflowValues[overflowEntry] = overflow;
}

private void removeOverflow(int overflowEntry)
{
// remove existing overflow
overflowBuckets[overflowEntry] = overflowBuckets[overflows - 1];
overflowValues[overflowEntry] = overflowValues[overflows - 1];
overflows--;
}

private void addOverflow(int bucket, byte overflow)
{
// add new delta
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);

overflowBuckets[overflows] = bucket;
overflowValues[overflows] = overflow;

overflows++;
}

public static int estimatedInMemorySize(int indexBitLength)
{
// note: we don't take into account overflow entries since their number can vary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import io.airlift.slice.Slice;
import org.openjdk.jol.info.ClassLayout;

import static com.facebook.airlift.stats.cardinality.Utils.indexBitLength;
import static com.facebook.airlift.stats.cardinality.Utils.numberOfBuckets;
import static com.facebook.airlift.stats.cardinality.Utils.indexBitLength;
import static com.google.common.base.Preconditions.checkArgument;

public class HyperLogLog
Expand Down Expand Up @@ -85,6 +85,9 @@ public void mergeWith(HyperLogLog other)
((SparseHll) instance).mergeWith((SparseHll) other.instance);
instance = makeDenseIfNecessary((SparseHll) instance);
}
else if (instance instanceof DenseHll && other.instance instanceof SparseHll) {
((DenseHll) instance).mergeWith((SparseHll) other.instance);
}
else {
DenseHll dense = instance.toDense();
dense.mergeWith(other.instance.toDense());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.airlift.slice.Slice;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Arrays;

import static com.facebook.airlift.stats.cardinality.Utils.computeIndex;
Expand All @@ -36,7 +34,9 @@
import static java.lang.Math.toIntExact;
import static java.util.Comparator.comparingInt;

@NotThreadSafe
/**
* This class is NOT thread safe.
*/
final class SparseHll
implements HllInstance
{
Expand Down Expand Up @@ -171,7 +171,7 @@ public void eachBucket(BucketListener listener)
// if zeros > EXTENDED_BITS_LENGTH - indexBits, it means all those bits were zeros,
// so look at the entry value, which contains the number of leading 0 *after* EXTENDED_BITS_LENGTH
int bits = EXTENDED_PREFIX_BITS - indexBitLength;
if (zeros > bits) {
if (zeros >= bits) {
zeros = bits + decodeBucketValue(entry);
}

Expand Down

0 comments on commit ce2d214

Please sign in to comment.