Skip to content

Commit

Permalink
Resharding: Data elements split/join utils for list, set, and map types
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Sep 23, 2024
1 parent 4d7eba8 commit 74cd4fa
Show file tree
Hide file tree
Showing 46 changed files with 2,134 additions and 341 deletions.
2 changes: 1 addition & 1 deletion docs/advanced-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ The number of bits used to represent a field which is one of the types (`INT`, `

32 bits are used to represent a `FLOAT`, and 64 bits are used to represent a `DOUBLE`.

`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of number of bits required to represent the maximum offset, plus one.
`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of bits required to represent the maximum offset, plus one.

Each field type may be assigned a null value. For `INT`, `LONG`, and `REFERENCE` fields, null is encoded as a value with all ones. For `FLOAT` and `DOUBLE` fields, null is encoded as special bit sequences. For `STRING` and `BYTES` fields, null is encoded by setting a designated null bit at the beginning of each field, followed by the end offset of the last populated value for that field.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class HollowDiffUIServerTest {
public void test() throws Exception {
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();

HollowDiffUIServer server = new HollowDiffUIServer();
HollowDiffUIServer server = new HollowDiffUIServer(0);

server.addDiff("diff", testDiff);

Expand All @@ -22,7 +22,7 @@ public void test() throws Exception {
public void testBackwardsCompatibiltyWithJettyImplementation() throws Exception {
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();

com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer();
com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer(0);

server.addDiff("diff", testDiff);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.map.HollowMapTypeDataElements;
import java.io.IOException;

public abstract class AbstractHollowTypeDataElements {

public int maxOrdinal;

public GapEncodedVariableLengthIntegerReader encodedAdditions;
public GapEncodedVariableLengthIntegerReader encodedRemovals;

public final ArraySegmentRecycler memoryRecycler;
public final MemoryMode memoryMode;

public AbstractHollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}

public abstract void destroy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

public abstract class AbstractHollowTypeDataElementsJoiner <T extends AbstractHollowTypeDataElements> {
public final int fromMask;
public final int fromOrdinalShift;
public final T[] from;

public T to;

public AbstractHollowTypeDataElementsJoiner(T[] from) {
this.from = from;
this.fromMask = from.length - 1;
this.fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);

if (from.length<=0 || !((from.length&(from.length-1))==0)) {
throw new IllegalStateException("No. of DataElements to be joined must be a power of 2");
}

for (int i=0;i<from.length;i++) {
if (from[i].maxOrdinal == -1) {
continue;
}
if (from[i].maxOrdinal > (1<<29)
|| from[i].maxOrdinal != 0 && (from.length > (1<<29)/from[i].maxOrdinal)
|| from[i].maxOrdinal * from.length + i > (1<<29)) {
throw new IllegalArgumentException("Too large to join, maxOrdinal would exceed 2<<29");
}
}

for (AbstractHollowTypeDataElements elements : from) {
if (elements.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}
}

public T join() {

initToElements();
to.maxOrdinal = -1;

populateStats();

copyRecords();

GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length];
for (int i=0;i<from.length;i++) {
fromRemovals[i] = from[i].encodedRemovals;
}
to.encodedRemovals = GapEncodedVariableLengthIntegerReader.join(fromRemovals);

return to;
}

/**
* Initialize the target data elements.
*/
public abstract void initToElements();

/**
* Populate the stats of the target data elements.
*/
public abstract void populateStats();

/**
* Copy records from the source data elements to the target data elements.
*/
public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

/**
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
public abstract class AbstractHollowTypeDataElementsSplitter<T extends AbstractHollowTypeDataElements> {
public final int numSplits;
public final int toMask;
public final int toOrdinalShift;
public final T from;

public T[] to;

public AbstractHollowTypeDataElementsSplitter(T from, int numSplits) {
this.from = from;
this.numSplits = numSplits;
this.toMask = numSplits - 1;
this.toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);

if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
throw new IllegalStateException("Must split by power of 2");
}

if (from.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
"delta data elements are never split/joined");
}
}

public T[] split() {

initToElements();
for(int i=0;i<to.length;i++) {
to[i].maxOrdinal = -1;
}

populateStats();

copyRecords();

if (from.encodedRemovals != null) {
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
for(int i=0;i<to.length;i++) {
to[i].encodedRemovals = splitRemovals[i];
}
}

return to;
}

/**
* Initialize the target data elements.
*/
public abstract void initToElements();

/**
* Populate the stats of the target data elements.
*/
public abstract void populateStats();

/**
* Copy records from the source data elements to the target data elements.
*/
public abstract void copyRecords();


}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ private void populateStats() {
historicalDataElements.maxOrdinal = removedEntryCount - 1;
historicalDataElements.totalNumberOfElements = totalElementCount;
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement;

for (int i=0;i<stateEngineDataElements.length;i++) {
if (stateEngineDataElements[i].bitsPerElement > historicalDataElements.bitsPerElement) {
historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement;
}
}
ordinalMapping = new IntMap(removedEntryCount);
}

Expand All @@ -110,8 +113,8 @@ private void copyRecord(int ordinal) {
int shardOrdinal = ordinal >> shardOrdinalShift;

long bitsPerElement = stateEngineDataElements[shard].bitsPerElement;
long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromEndElement = stateEngineDataElements[shard].listPointerData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromStartElement = stateEngineDataElements[shard].getStartElement(shardOrdinal);
long fromEndElement = stateEngineDataElements[shard].getEndElement(shardOrdinal);
long size = fromEndElement - fromStartElement;

historicalDataElements.elementData.copyBits(stateEngineDataElements[shard].elementData, fromStartElement * bitsPerElement, nextStartElement * bitsPerElement, size * bitsPerElement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
import java.io.IOException;

/**
Expand All @@ -31,30 +32,21 @@
* During a delta, the HollowListTypeReadState will create a new HollowListTypeDataElements and atomically swap
* with the existing one to make sure a consistent view of the data is always available.
*/
public class HollowListTypeDataElements {

int maxOrdinal;
public class HollowListTypeDataElements extends AbstractHollowTypeDataElements {

FixedLengthData listPointerData;
FixedLengthData elementData;

GapEncodedVariableLengthIntegerReader encodedAdditions;
GapEncodedVariableLengthIntegerReader encodedRemovals;

int bitsPerListPointer;
int bitsPerElement;
int bitsPerElement = 0;
long totalNumberOfElements = 0;

final ArraySegmentRecycler memoryRecycler;
final MemoryMode memoryMode;

public HollowListTypeDataElements(ArraySegmentRecycler memoryRecycler) {
this(MemoryMode.ON_HEAP, memoryRecycler);
}

public HollowListTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
super(memoryMode, memoryRecycler);
}

void readSnapshot(HollowBlobInput in) throws IOException {
Expand Down Expand Up @@ -109,9 +101,31 @@ public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataEl
new HollowListDeltaApplicator(fromData, deltaData, this).applyDelta();
}

@Override
public void destroy() {
FixedLengthDataFactory.destroy(listPointerData, memoryRecycler);
FixedLengthDataFactory.destroy(elementData, memoryRecycler);
}

long getStartElement(int ordinal) {
return ordinal == 0 ? 0 : listPointerData.getElementValue(((long)(ordinal-1) * bitsPerListPointer), bitsPerListPointer);
}

long getEndElement(int ordinal) {
return listPointerData.getElementValue((long)ordinal * bitsPerListPointer, bitsPerListPointer);
}

void copyElementsFrom(long startElement, HollowListTypeDataElements src, long srcStartElement, long srcEndElement) {
if (bitsPerElement == src.bitsPerElement) {
// fast path can bulk copy elements
long numElements = srcEndElement - srcStartElement;
elementData.copyBits(src.elementData, srcStartElement * bitsPerElement, startElement * bitsPerElement, numElements * bitsPerElement);
} else {
for (long element=srcStartElement;element<srcEndElement;element++) {
long elementVal = src.elementData.getElementValue(element * src.bitsPerElement, src.bitsPerElement);
elementData.setElementValue(startElement * bitsPerElement, bitsPerElement, elementVal);
startElement++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.FixedLengthDataFactory;
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsJoiner;


/**
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
* Ordinals are remapped and corresponding data is copied over.
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
class HollowListTypeDataElementsJoiner extends AbstractHollowTypeDataElementsJoiner<HollowListTypeDataElements> {

public HollowListTypeDataElementsJoiner(HollowListTypeDataElements[] from) {
super(from);
}

@Override
public void initToElements() {
this.to = new HollowListTypeDataElements(from[0].memoryMode, from[0].memoryRecycler);
}

@Override
public void populateStats() {
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
int mappedMaxOrdinal = from[fromIndex].maxOrdinal == -1 ? -1 : (from[fromIndex].maxOrdinal * from.length) + fromIndex;
to.maxOrdinal = Math.max(to.maxOrdinal, mappedMaxOrdinal);
if (from[fromIndex].bitsPerElement > to.bitsPerElement) {
// uneven bitsPerElement could be the case for consumers that skip type shards with no additions, so pick max across all shards
to.bitsPerElement = from[fromIndex].bitsPerElement;
}
}

long totalOfListSizes = 0;
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

long startElement = from[fromIndex].getStartElement(fromOrdinal);
long endElement = from[fromIndex].getEndElement(fromOrdinal);
long numElements = endElement - startElement;
totalOfListSizes += numElements;

}
to.bitsPerListPointer = totalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalOfListSizes);
to.totalNumberOfElements = totalOfListSizes;
}

@Override
public void copyRecords() {
long elementCounter = 0;

to.listPointerData = FixedLengthDataFactory.get((long)to.bitsPerListPointer * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);
to.elementData = FixedLengthDataFactory.get(to.bitsPerElement * to.totalNumberOfElements, to.memoryMode, to.memoryRecycler);

for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
int fromIndex = ordinal & fromMask;
int fromOrdinal = ordinal >> fromOrdinalShift;

if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shard for e.g. when consumers skip type shards with no additions
HollowListTypeDataElements source = from[fromIndex];
long startElement = source.getStartElement(fromOrdinal);
long endElement = source.getEndElement(fromOrdinal);

long numElements = endElement - startElement;
to.copyElementsFrom(elementCounter, source, startElement, endElement);
elementCounter += numElements;
}
to.listPointerData.setElementValue((long)to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter);
}
}
}
Loading

0 comments on commit 74cd4fa

Please sign in to comment.