Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update HyperLogLog library to original Airlift #68

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.airlift.slice.Slice;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -36,7 +34,9 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

@NotThreadSafe
/**
Copy link
Author

@pranjalssh pranjalssh Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK why they removed @ NotThreadSafe. I'll let someone more familiar with Java decide what's better

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed as part of airlift@1eb30dc.

It looks like the background is that javax.annotations was moved and renamed to jakarta.annotations (rename was due to some licensing disagreement between oracle and eclipse see jakartaee/rest#760). But Jakarta.annotations doesn't have the NotThreadSafe annotation (see discussion here: jakartaee/common-annotations-api#91). So seems like this change was basically part of upgrading that library.

* This class is NOT thread safe.
*/
final class DenseHll
implements HllInstance
{
Expand Down Expand Up @@ -249,16 +249,14 @@ public void insert(int bucket, int value)
}

if (delta > MAX_DELTA) {
int overflow = delta - MAX_DELTA;

if (!setOverflow(bucket, overflow)) {
// grow overflows arrays if necessary
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);
byte overflow = (byte) (delta - MAX_DELTA);

overflowBuckets[overflows] = bucket;
overflowValues[overflows] = (byte) overflow;
overflows++;
int overflowEntry = findOverflowEntry(bucket);
if (overflowEntry != -1) {
setOverflow(overflowEntry, overflow);
}
else {
addOverflow(bucket, overflow);
}

delta = MAX_DELTA;
Expand All @@ -272,30 +270,6 @@ public void insert(int bucket, int value)
}
}

private int getOverflow(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return overflowValues[i];
}
}
return 0;
}

/**
* Returns false if not overflow bucket matching the given bucket id was found
*/
private boolean setOverflow(int bucket, int overflow)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
overflowValues[i] = (byte) overflow;
return true;
}
}
return false;
}

public Slice serialize()
{
int size = estimatedSerializedSize();
Expand Down Expand Up @@ -455,42 +429,54 @@ public DenseHll mergeWith(DenseHll other)
numberOfBuckets(other.indexBitLength)));
}

int baseline = Math.max(this.baseline, other.baseline);
int newBaseline = Math.max(this.baseline, other.baseline);
int baselineCount = 0;

int overflows = 0;
int[] overflowBuckets = new int[OVERFLOW_GROW_INCREMENT];
byte[] overflowValues = new byte[OVERFLOW_GROW_INCREMENT];
int bucket = 0;
for (int i = 0; i < deltas.length; i++) {
int newSlot = 0;

int numberOfBuckets = numberOfBuckets(indexBitLength);
for (int i = 0; i < numberOfBuckets; i++) {
int value = Math.max(getValue(i), other.getValue(i));
byte slot1 = deltas[i];
byte slot2 = other.deltas[i];

int delta = value - baseline;
if (delta == 0) {
baselineCount++;
}
else if (delta > MAX_DELTA) {
// grow overflows arrays if necessary
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);
for (int shift = 4; shift >= 0; shift -= 4) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why this number is 4? Is it because BITS_PER_BUCKET is 4 (I see that's used in calculating the size of the deltas array)? If so, we should use the static variable explicitly here so they can't get out of sync.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - but ideally we should keep most of the code in sync with airlift/airlift so its easier to copy in future as well. So we can change there first and then copy it. Wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine with me.

int delta1 = (slot1 >>> shift) & 0b1111;
int delta2 = (slot2 >>> shift) & 0b1111;

int value1 = this.baseline + delta1;
int value2 = other.baseline + delta2;

int overflowEntry = -1;
if (delta1 == MAX_DELTA) {
overflowEntry = findOverflowEntry(bucket);
if (overflowEntry != -1) {
value1 += overflowValues[overflowEntry];
}
}

if (delta2 == MAX_DELTA) {
value2 += other.getOverflow(bucket);
}

int newValue = Math.max(value1, value2);
int newDelta = newValue - newBaseline;

overflowBuckets[overflows] = i;
overflowValues[overflows] = (byte) (delta - MAX_DELTA);
if (newDelta == 0) {
baselineCount++;
}

overflows++;
newDelta = updateOverflow(bucket, overflowEntry, newDelta);

delta = MAX_DELTA;
newSlot <<= 4;
newSlot |= newDelta;
bucket++;
}

setDelta(i, delta);
this.deltas[i] = (byte) newSlot;
}

this.baseline = (byte) baseline;
this.baseline = (byte) newBaseline;
this.baselineCount = baselineCount;
this.overflows = overflows;
this.overflowBuckets = overflowBuckets;
this.overflowValues = overflowValues;

// all baseline values in one of the HLLs lost to the values
// in the other HLL, so we need to adjust the final baseline
Expand All @@ -499,6 +485,87 @@ else if (delta > MAX_DELTA) {
return this;
}

/**
* Returns "this" for chaining
*/
public DenseHll mergeWith(SparseHll other)
{
if (indexBitLength != other.getIndexBitLength()) {
throw new IllegalArgumentException(String.format(
"Cannot merge HLLs with different number of buckets: %s vs %s",
numberOfBuckets(indexBitLength),
numberOfBuckets(other.getIndexBitLength())));
}

other.eachBucket(this::insert);

return this;
}

private int findOverflowEntry(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return i;
}
}
return -1;
}

private int getOverflow(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return overflowValues[i];
}
}
return 0;
}

private int updateOverflow(int bucket, int overflowEntry, int delta)
{
if (delta > MAX_DELTA) {
if (overflowEntry != -1) {
// update existing overflow
setOverflow(overflowEntry, (byte) (delta - MAX_DELTA));
}
else {
addOverflow(bucket, (byte) (delta - MAX_DELTA));
}
delta = MAX_DELTA;
}
else if (overflowEntry != -1) {
removeOverflow(overflowEntry);
}

return delta;
}

private void setOverflow(int overflowEntry, byte overflow)
{
overflowValues[overflowEntry] = overflow;
}

private void removeOverflow(int overflowEntry)
{
// remove existing overflow
overflowBuckets[overflowEntry] = overflowBuckets[overflows - 1];
overflowValues[overflowEntry] = overflowValues[overflows - 1];
overflows--;
}

private void addOverflow(int bucket, byte overflow)
{
// add new delta
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);

overflowBuckets[overflows] = bucket;
overflowValues[overflows] = overflow;

overflows++;
}

public static int estimatedInMemorySize(int indexBitLength)
{
// note: we don't take into account overflow entries since their number can vary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public void mergeWith(HyperLogLog other)
((SparseHll) instance).mergeWith((SparseHll) other.instance);
instance = makeDenseIfNecessary((SparseHll) instance);
}
else if (instance instanceof DenseHll && other.instance instanceof SparseHll) {
((DenseHll) instance).mergeWith((SparseHll) other.instance);
}
else {
DenseHll dense = instance.toDense();
dense.mergeWith(other.instance.toDense());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.airlift.slice.Slice;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Arrays;

import static com.facebook.airlift.stats.cardinality.Utils.computeIndex;
Expand All @@ -36,7 +34,9 @@
import static java.lang.Math.toIntExact;
import static java.util.Comparator.comparingInt;

@NotThreadSafe
/**
* This class is NOT thread safe.
*/
final class SparseHll
implements HllInstance
{
Expand Down Expand Up @@ -171,7 +171,7 @@ public void eachBucket(BucketListener listener)
// if zeros > EXTENDED_BITS_LENGTH - indexBits, it means all those bits were zeros,
// so look at the entry value, which contains the number of leading 0 *after* EXTENDED_BITS_LENGTH
int bits = EXTENDED_PREFIX_BITS - indexBitLength;
if (zeros > bits) {
if (zeros >= bits) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a correctness fix. We should call this out explicitly in the commit message

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll put :
"Improved accuracy and performance of hyperloglog functions"

zeros = bits + decodeBucketValue(entry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package com.facebook.airlift.stats.cardinality;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
Expand All @@ -32,14 +34,18 @@
import java.util.concurrent.TimeUnit;

@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
@Fork(5)
@Warmup(iterations = 5, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS)
public class BenchmarkDenseHll
{
private static final int LARGE_CARDINALITY = 1_000_000;
private static final int SMALL_CARDINALITY = 100;

@Benchmark
public DenseHll benchmarkInsert(Data data)
public DenseHll benchmarkInsert(InsertData data)
{
for (long hash : data.hashes) {
data.instance.insertHash(hash);
Expand All @@ -48,10 +54,22 @@ public DenseHll benchmarkInsert(Data data)
return data.instance;
}

@Benchmark
public DenseHll benchmarkMergeWithDense(MergeWithDenseData data)
{
return data.base.mergeWith(data.toMerge);
}

@Benchmark
public DenseHll benchmarkMergeWithSparse(MergeWithSparseData data)
{
return data.base.mergeWith(data.toMerge);
}

@State(Scope.Thread)
public static class Data
public static class InsertData
{
public final DenseHll instance = new DenseHll(11);
public final DenseHll instance = new DenseHll(12);
public final long[] hashes = new long[500];

@Setup(Level.Iteration)
Expand All @@ -63,6 +81,51 @@ public void initialize()
}
}

@State(Scope.Thread)
public static class MergeWithDenseData
{
public DenseHll base;
public DenseHll toMerge;

@Setup(Level.Iteration)
public void initialize()
{
base = new DenseHll(12);
for (int i = 0; i < LARGE_CARDINALITY; i++) {
base.insertHash(ThreadLocalRandom.current().nextLong());
}

// Small cardinality so we can do an apples-to-apples comparison
// between dense/dense vs dense/sparse merge. Sparse only supports
// small cardinalities.
toMerge = new DenseHll(12);
for (int i = 0; i < SMALL_CARDINALITY; i++) {
toMerge.insertHash(ThreadLocalRandom.current().nextLong());
}
}
}

@State(Scope.Thread)
public static class MergeWithSparseData
{
public DenseHll base;
public SparseHll toMerge;

@Setup(Level.Iteration)
public void initialize()
{
base = new DenseHll(12);
for (int i = 0; i < LARGE_CARDINALITY; i++) {
base.insertHash(ThreadLocalRandom.current().nextLong());
}

toMerge = new SparseHll(12);
for (int i = 0; i < SMALL_CARDINALITY; i++) {
toMerge.insertHash(ThreadLocalRandom.current().nextLong());
}
}
}

public static void main(String[] args)
throws RunnerException
{
Expand Down
Loading