-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update HyperLogLog library to original Airlift #68
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,6 @@ | |
import io.airlift.slice.Slice; | ||
import org.openjdk.jol.info.ClassLayout; | ||
|
||
import javax.annotation.concurrent.NotThreadSafe; | ||
|
||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
|
@@ -36,7 +34,9 @@ | |
import static com.google.common.base.Preconditions.checkArgument; | ||
import static com.google.common.base.Preconditions.checkState; | ||
|
||
@NotThreadSafe | ||
/** | ||
* This class is NOT thread safe. | ||
*/ | ||
final class DenseHll | ||
implements HllInstance | ||
{ | ||
|
@@ -249,16 +249,14 @@ public void insert(int bucket, int value) | |
} | ||
|
||
if (delta > MAX_DELTA) { | ||
int overflow = delta - MAX_DELTA; | ||
|
||
if (!setOverflow(bucket, overflow)) { | ||
// grow overflows arrays if necessary | ||
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); | ||
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); | ||
byte overflow = (byte) (delta - MAX_DELTA); | ||
|
||
overflowBuckets[overflows] = bucket; | ||
overflowValues[overflows] = (byte) overflow; | ||
overflows++; | ||
int overflowEntry = findOverflowEntry(bucket); | ||
if (overflowEntry != -1) { | ||
setOverflow(overflowEntry, overflow); | ||
} | ||
else { | ||
addOverflow(bucket, overflow); | ||
} | ||
|
||
delta = MAX_DELTA; | ||
|
@@ -272,30 +270,6 @@ public void insert(int bucket, int value) | |
} | ||
} | ||
|
||
private int getOverflow(int bucket) | ||
{ | ||
for (int i = 0; i < overflows; i++) { | ||
if (overflowBuckets[i] == bucket) { | ||
return overflowValues[i]; | ||
} | ||
} | ||
return 0; | ||
} | ||
|
||
/** | ||
* Returns false if not overflow bucket matching the given bucket id was found | ||
*/ | ||
private boolean setOverflow(int bucket, int overflow) | ||
{ | ||
for (int i = 0; i < overflows; i++) { | ||
if (overflowBuckets[i] == bucket) { | ||
overflowValues[i] = (byte) overflow; | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
public Slice serialize() | ||
{ | ||
int size = estimatedSerializedSize(); | ||
|
@@ -455,42 +429,54 @@ public DenseHll mergeWith(DenseHll other) | |
numberOfBuckets(other.indexBitLength))); | ||
} | ||
|
||
int baseline = Math.max(this.baseline, other.baseline); | ||
int newBaseline = Math.max(this.baseline, other.baseline); | ||
int baselineCount = 0; | ||
|
||
int overflows = 0; | ||
int[] overflowBuckets = new int[OVERFLOW_GROW_INCREMENT]; | ||
byte[] overflowValues = new byte[OVERFLOW_GROW_INCREMENT]; | ||
int bucket = 0; | ||
for (int i = 0; i < deltas.length; i++) { | ||
int newSlot = 0; | ||
|
||
int numberOfBuckets = numberOfBuckets(indexBitLength); | ||
for (int i = 0; i < numberOfBuckets; i++) { | ||
int value = Math.max(getValue(i), other.getValue(i)); | ||
byte slot1 = deltas[i]; | ||
byte slot2 = other.deltas[i]; | ||
|
||
int delta = value - baseline; | ||
if (delta == 0) { | ||
baselineCount++; | ||
} | ||
else if (delta > MAX_DELTA) { | ||
// grow overflows arrays if necessary | ||
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); | ||
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); | ||
for (int shift = 4; shift >= 0; shift -= 4) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know why this number is 4? Is it because BITS_PER_BUCKET is 4 (I see that's used in calculating the size of the deltas array)? If so, we should use the static variable explicitly here so they can't get out of sync. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense - but ideally we should keep most of the code in sync with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's fine with me. |
||
int delta1 = (slot1 >>> shift) & 0b1111; | ||
int delta2 = (slot2 >>> shift) & 0b1111; | ||
|
||
int value1 = this.baseline + delta1; | ||
int value2 = other.baseline + delta2; | ||
|
||
int overflowEntry = -1; | ||
if (delta1 == MAX_DELTA) { | ||
overflowEntry = findOverflowEntry(bucket); | ||
if (overflowEntry != -1) { | ||
value1 += overflowValues[overflowEntry]; | ||
} | ||
} | ||
|
||
if (delta2 == MAX_DELTA) { | ||
value2 += other.getOverflow(bucket); | ||
} | ||
|
||
int newValue = Math.max(value1, value2); | ||
int newDelta = newValue - newBaseline; | ||
|
||
overflowBuckets[overflows] = i; | ||
overflowValues[overflows] = (byte) (delta - MAX_DELTA); | ||
if (newDelta == 0) { | ||
baselineCount++; | ||
} | ||
|
||
overflows++; | ||
newDelta = updateOverflow(bucket, overflowEntry, newDelta); | ||
|
||
delta = MAX_DELTA; | ||
newSlot <<= 4; | ||
newSlot |= newDelta; | ||
bucket++; | ||
} | ||
|
||
setDelta(i, delta); | ||
this.deltas[i] = (byte) newSlot; | ||
} | ||
|
||
this.baseline = (byte) baseline; | ||
this.baseline = (byte) newBaseline; | ||
this.baselineCount = baselineCount; | ||
this.overflows = overflows; | ||
this.overflowBuckets = overflowBuckets; | ||
this.overflowValues = overflowValues; | ||
|
||
// all baseline values in one of the HLLs lost to the values | ||
// in the other HLL, so we need to adjust the final baseline | ||
|
@@ -499,6 +485,87 @@ else if (delta > MAX_DELTA) { | |
return this; | ||
} | ||
|
||
/** | ||
* Returns "this" for chaining | ||
*/ | ||
public DenseHll mergeWith(SparseHll other) | ||
{ | ||
if (indexBitLength != other.getIndexBitLength()) { | ||
throw new IllegalArgumentException(String.format( | ||
"Cannot merge HLLs with different number of buckets: %s vs %s", | ||
numberOfBuckets(indexBitLength), | ||
numberOfBuckets(other.getIndexBitLength()))); | ||
} | ||
|
||
other.eachBucket(this::insert); | ||
|
||
return this; | ||
} | ||
|
||
private int findOverflowEntry(int bucket) | ||
{ | ||
for (int i = 0; i < overflows; i++) { | ||
if (overflowBuckets[i] == bucket) { | ||
return i; | ||
} | ||
} | ||
return -1; | ||
} | ||
|
||
private int getOverflow(int bucket) | ||
{ | ||
for (int i = 0; i < overflows; i++) { | ||
if (overflowBuckets[i] == bucket) { | ||
return overflowValues[i]; | ||
} | ||
} | ||
return 0; | ||
} | ||
|
||
private int updateOverflow(int bucket, int overflowEntry, int delta) | ||
{ | ||
if (delta > MAX_DELTA) { | ||
if (overflowEntry != -1) { | ||
// update existing overflow | ||
setOverflow(overflowEntry, (byte) (delta - MAX_DELTA)); | ||
} | ||
else { | ||
addOverflow(bucket, (byte) (delta - MAX_DELTA)); | ||
} | ||
delta = MAX_DELTA; | ||
} | ||
else if (overflowEntry != -1) { | ||
removeOverflow(overflowEntry); | ||
} | ||
|
||
return delta; | ||
} | ||
|
||
private void setOverflow(int overflowEntry, byte overflow) | ||
{ | ||
overflowValues[overflowEntry] = overflow; | ||
} | ||
|
||
private void removeOverflow(int overflowEntry) | ||
{ | ||
// remove existing overflow | ||
overflowBuckets[overflowEntry] = overflowBuckets[overflows - 1]; | ||
overflowValues[overflowEntry] = overflowValues[overflows - 1]; | ||
overflows--; | ||
} | ||
|
||
private void addOverflow(int bucket, byte overflow) | ||
{ | ||
// add new delta | ||
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT); | ||
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT); | ||
|
||
overflowBuckets[overflows] = bucket; | ||
overflowValues[overflows] = overflow; | ||
|
||
overflows++; | ||
} | ||
|
||
public static int estimatedInMemorySize(int indexBitLength) | ||
{ | ||
// note: we don't take into account overflow entries since their number can vary | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,6 @@ | |
import io.airlift.slice.Slice; | ||
import org.openjdk.jol.info.ClassLayout; | ||
|
||
import javax.annotation.concurrent.NotThreadSafe; | ||
|
||
import java.util.Arrays; | ||
|
||
import static com.facebook.airlift.stats.cardinality.Utils.computeIndex; | ||
|
@@ -36,7 +34,9 @@ | |
import static java.lang.Math.toIntExact; | ||
import static java.util.Comparator.comparingInt; | ||
|
||
@NotThreadSafe | ||
/** | ||
* This class is NOT thread safe. | ||
*/ | ||
final class SparseHll | ||
implements HllInstance | ||
{ | ||
|
@@ -171,7 +171,7 @@ public void eachBucket(BucketListener listener) | |
// if zeros > EXTENDED_BITS_LENGTH - indexBits, it means all those bits were zeros, | ||
// so look at the entry value, which contains the number of leading 0 *after* EXTENDED_BITS_LENGTH | ||
int bits = EXTENDED_PREFIX_BITS - indexBitLength; | ||
if (zeros > bits) { | ||
if (zeros >= bits) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a correctness fix. We should call this out explicitly in the commit message There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I'll put : |
||
zeros = bits + decodeBucketValue(entry); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDK why they removed @ NotThreadSafe. I'll let someone more familiar with Java decide what's better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed as part of airlift@1eb30dc.
It looks like the background is that javax.annotations was moved and renamed to jakarta.annotations (rename was due to some licensing disagreement between oracle and eclipse see jakartaee/rest#760). But Jakarta.annotations doesn't have the NotThreadSafe annotation (see discussion here: jakartaee/common-annotations-api#91). So seems like this change was basically part of upgrading that library.