Skip to content

Commit

Permalink
Resharding: consumer reshards for list/set/map in response to num sha…
Browse files Browse the repository at this point in the history
…rds encoded in delta
  • Loading branch information
Sunjeet committed Sep 23, 2024
1 parent 74cd4fa commit c8e3d2d
Show file tree
Hide file tree
Showing 72 changed files with 2,475 additions and 1,490 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,30 +337,25 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
if(!filter.includes(typeName)) {
HollowListTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowListTypeReadState(stateEngine, memoryMode, (HollowListSchema)schema), numShards);
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.includes(typeName)) {
HollowSetTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowSetTypeReadState(stateEngine, memoryMode, (HollowSetSchema)schema), numShards);
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.includes(typeName)) {
HollowMapTypeReadState.discardSnapshot(in, numShards);
} else {
populateTypeStateSnapshot(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowMapTypeReadState(stateEngine, memoryMode, (HollowMapSchema)schema), numShards);
}
}

return typeName;
}

private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState typeState) throws IOException {
stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
Expand All @@ -376,6 +371,10 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
int numShards = readNumShards(in);
HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
if (shouldReshard(typeState, typeState.numShards(), numShards)) {
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);
reshardingStrategy.reshard(typeState, typeState.numShards(), numShards);
}
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
Expand All @@ -384,6 +383,14 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
return schema.getName();
}

private boolean shouldReshard(HollowTypeReadState typeState, int currNumShards, int deltaNumShards) {
if (typeState instanceof HollowObjectTypeReadState) {
return currNumShards != 0 && deltaNumShards != 0 && currNumShards != deltaNumShards;
} else {
return false;
}
}

private int readNumShards(HollowBlobInput in) throws IOException {
int backwardsCompatibilityBytes = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import com.netflix.hollow.core.read.engine.map.HollowMapTypeDataElements;
import java.io.IOException;

public abstract class AbstractHollowTypeDataElements {
public abstract class HollowTypeDataElements {

public int maxOrdinal;

Expand All @@ -17,7 +14,7 @@ public abstract class AbstractHollowTypeDataElements {
public final ArraySegmentRecycler memoryRecycler;
public final MemoryMode memoryMode;

public AbstractHollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
public HollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
this.memoryMode = memoryMode;
this.memoryRecycler = memoryRecycler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;

public abstract class AbstractHollowTypeDataElementsJoiner <T extends AbstractHollowTypeDataElements> {
public abstract class HollowTypeDataElementsJoiner<T extends HollowTypeDataElements> {
public final int fromMask;
public final int fromOrdinalShift;
public final T[] from;

public T to;

public AbstractHollowTypeDataElementsJoiner(T[] from) {
public HollowTypeDataElementsJoiner(T[] from) {
this.from = from;
this.fromMask = from.length - 1;
this.fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);
Expand All @@ -29,7 +29,7 @@ public AbstractHollowTypeDataElementsJoiner(T[] from) {
}
}

for (AbstractHollowTypeDataElements elements : from) {
for (HollowTypeDataElements elements : from) {
if (elements.encodedAdditions != null) {
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
* The original data elements are not destroyed.
* The no. of passed data elements must be a power of 2.
*/
public abstract class AbstractHollowTypeDataElementsSplitter<T extends AbstractHollowTypeDataElements> {
public abstract class HollowTypeDataElementsSplitter<T extends HollowTypeDataElements> {
public final int numSplits;
public final int toMask;
public final int toOrdinalShift;
public final T from;

public T[] to;

public AbstractHollowTypeDataElementsSplitter(T from, int numSplits) {
public HollowTypeDataElementsSplitter(T from, int numSplits) {
this.from = from;
this.numSplits = numSplits;
this.toMask = numSplits - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.stream.Stream;

/**
* A HollowTypeReadState contains and is the root handle to all of the records of a specific type in
* A HollowTypeReadState contains and is the root handle to all the records of a specific type in
* a {@link HollowReadStateEngine}.
*/
public abstract class HollowTypeReadState implements HollowTypeDataAccess {
Expand Down Expand Up @@ -121,16 +121,10 @@ public BitSet getPreviousOrdinals() {
*/
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

public HollowSchema getSchema() {
return schema;
}
Expand Down Expand Up @@ -206,4 +200,18 @@ public long getApproximateShardSizeInBytes() {
*/
public abstract int numShards();

public abstract ShardsHolder getShardsVolatile();

public abstract void updateShardsVolatile(HollowTypeReadStateShard[] shards);

public abstract HollowTypeDataElements[] createTypeDataElements(int len);

public abstract HollowTypeReadStateShard createTypeReadStateShard(HollowSchema schema, HollowTypeDataElements dataElements, int shardOrdinalShift);

public void destroyOriginalDataElements(HollowTypeDataElements dataElements) {
dataElements.destroy();
if (dataElements.encodedRemovals != null) {
dataElements.encodedRemovals.destroy();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.netflix.hollow.core.read.engine;

public interface HollowTypeReadStateShard {

HollowTypeDataElements getDataElements();

int getShardOrdinalShift();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.core.read.engine.list.HollowListTypeReadState;
import com.netflix.hollow.core.read.engine.list.HollowListTypeReshardingStrategy;
import com.netflix.hollow.core.read.engine.map.HollowMapTypeReadState;
import com.netflix.hollow.core.read.engine.map.HollowMapTypeReshardingStrategy;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReshardingStrategy;
import com.netflix.hollow.core.read.engine.set.HollowSetTypeReadState;
import com.netflix.hollow.core.read.engine.set.HollowSetTypeReshardingStrategy;
import java.util.Arrays;

public abstract class HollowTypeReshardingStrategy {
private final static HollowTypeReshardingStrategy OBJECT_RESHARDING_STRATEGY = new HollowObjectTypeReshardingStrategy();
private final static HollowTypeReshardingStrategy LIST_RESHARDING_STRATEGY = new HollowListTypeReshardingStrategy();
private final static HollowTypeReshardingStrategy SET_RESHARDING_STRATEGY = new HollowSetTypeReshardingStrategy();
private final static HollowTypeReshardingStrategy MAP_RESHARDING_STRATEGY = new HollowMapTypeReshardingStrategy();

public abstract HollowTypeDataElementsSplitter createDataElementsSplitter(HollowTypeDataElements from, int shardingFactor);

public abstract HollowTypeDataElementsJoiner createDataElementsJoiner(HollowTypeDataElements[] from);

public static HollowTypeReshardingStrategy getInstance(HollowTypeReadState typeState) {
if (typeState instanceof HollowObjectTypeReadState) {
return OBJECT_RESHARDING_STRATEGY;
} else if (typeState instanceof HollowListTypeReadState) {
return LIST_RESHARDING_STRATEGY;
} else if (typeState instanceof HollowSetTypeReadState) {
return SET_RESHARDING_STRATEGY;
} else if (typeState instanceof HollowMapTypeReadState) {
return MAP_RESHARDING_STRATEGY;
} else {
throw new IllegalArgumentException("Unsupported type state: " + typeState.getClass().getName());
}
}

/**
* Reshards this type state to the desired shard count using O(shard size) space while supporting concurrent reads
* into the underlying data elements.
*
* @param typeState The type state to reshard
* @param prevNumShards The current number of shards in typeState
* @param newNumShards The desired number of shards for typeState
*/
public void reshard(HollowTypeReadState typeState, int prevNumShards, int newNumShards) {
int shardingFactor = shardingFactor(prevNumShards, newNumShards);
HollowTypeDataElements[] newDataElements;
int[] shardOrdinalShifts;

try {
if (newNumShards > prevNumShards) { // split existing shards
// Step 1: Grow the number of shards. Each original shard will result in N child shards where N is the sharding factor.
// The child shards will reference into the existing data elements as-is, and reuse existing shardOrdinalShift.
// However since the shards array is resized, a read will map into the new shard index, as a result a subset of
// ordinals in each shard will be accessed. In the next "splitting" step, the data elements in these new shards
// will be filtered to only retain the subset of ordinals that are actually accessed.
//
// This is an atomic update to shardsVolatile: full construction happens-before the store to shardsVolatile,
// in other words a fully constructed object as visible to this thread will be visible to other threads that
// load the new shardsVolatile.
typeState.updateShardsVolatile(expandWithOriginalDataElements(typeState.getShardsVolatile(), shardingFactor));

// Step 2: Split each original data element into N child data elements where N is the sharding factor.
// Then update each of the N child shards with the respective split of data element, this will be
// sufficient to serve all reads into this shard. Once all child shards for a pre-split parent
// shard have been assigned the split data elements, the parent data elements can be discarded.
for (int i = 0; i < prevNumShards; i++) {
HollowTypeDataElements originalDataElements = typeState.getShardsVolatile().getShards()[i].getDataElements();

typeState.updateShardsVolatile(splitDataElementsForOneShard(typeState, i, prevNumShards, shardingFactor));

typeState.destroyOriginalDataElements(originalDataElements);
}
// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a split of original data elements.

} else { // join existing shards
// Step 1: Join N data elements to create one, where N is the sharding factor. Then update each of the
// N shards to reference the joined result, but with a new shardOrdinalShift.
// Reads will continue to reference the same shard index as before, but the new shardOrdinalShift
// will help these reads land at the right ordinal in the joined shard. When all N old shards
// corresponding to one new shard have been updated, the N pre-join data elements can be destroyed.
for (int i = 0; i < newNumShards; i++) {
HollowTypeDataElements destroyCandidates[] = joinCandidates(typeState, i, shardingFactor);

typeState.updateShardsVolatile(joinDataElementsForOneShard(typeState, i, shardingFactor)); // atomic update to shardsVolatile

for (int j = 0; j < shardingFactor; j++) {
typeState.destroyOriginalDataElements(destroyCandidates[j]);
}
}

// Step 2: Resize the shards array to only keep the first newNumShards shards.
newDataElements = typeState.createTypeDataElements(typeState.getShardsVolatile().getShards().length);
shardOrdinalShifts = new int[typeState.getShardsVolatile().getShards().length];
copyShardDataElements(typeState.getShardsVolatile(), newDataElements, shardOrdinalShifts);

HollowTypeReadStateShard[] newShards = Arrays.copyOfRange(typeState.getShardsVolatile().getShards(), 0, newNumShards);
typeState.updateShardsVolatile(newShards);

// Re-sharding done.
// shardsVolatile now contains newNumShards shards where each shard contains
// a join of original data elements.
}
} catch (Exception e) {
throw new RuntimeException("Error in re-sharding", e);
}
}

/**
* Given old and new numShards, this method returns the shard resizing multiplier.
*/
public static int shardingFactor(int oldNumShards, int newNumShards) {
if (newNumShards <= 0 || oldNumShards <= 0 || newNumShards == oldNumShards) {
throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards);
}

boolean isNewGreater = newNumShards > oldNumShards;
int dividend = isNewGreater ? newNumShards : oldNumShards;
int divisor = isNewGreater ? oldNumShards : newNumShards;

if (dividend % divisor != 0) {
throw new IllegalStateException("Invalid shard resizing, oldNumShards=" + oldNumShards + ", newNumShards=" + newNumShards);
}
return dividend / divisor;
}

private void copyShardDataElements(ShardsHolder from, HollowTypeDataElements[] newDataElements, int[] shardOrdinalShifts) {
for (int i=0; i<from.getShards().length; i++) {
newDataElements[i] = from.getShards()[i].getDataElements();
shardOrdinalShifts[i] = from.getShards()[i].getShardOrdinalShift();
}
}

private HollowTypeDataElements[] joinCandidates(HollowTypeReadState typeState, int indexIntoShards, int shardingFactor) {
HollowTypeReadStateShard[] shards = typeState.getShardsVolatile().getShards();
HollowTypeDataElements[] result = typeState.createTypeDataElements(shardingFactor);
int newNumShards = shards.length / shardingFactor;
for (int i=0; i<shardingFactor; i++) {
result[i] = shards[indexIntoShards + (newNumShards*i)].getDataElements();
}
return result;
}

public HollowTypeReadStateShard[] joinDataElementsForOneShard(HollowTypeReadState typeState, int currentIndex, int shardingFactor) {
ShardsHolder shardsHolder = typeState.getShardsVolatile();
int newNumShards = shardsHolder.getShards().length / shardingFactor;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowTypeDataElements[] joinCandidates = joinCandidates(typeState, currentIndex, shardingFactor);
HollowTypeDataElementsJoiner joiner = createDataElementsJoiner(joinCandidates);
HollowTypeDataElements joined = joiner.join();

HollowTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.getShards(), shardsHolder.getShards().length);
for (int i=0; i<shardingFactor; i++) {
newShards[currentIndex + (newNumShards*i)] = typeState.createTypeReadStateShard(typeState.getSchema(), joined, newShardOrdinalShift);
}

return newShards;
}

public HollowTypeReadStateShard[] expandWithOriginalDataElements(ShardsHolder shardsHolder, int shardingFactor) {
int prevNumShards = shardsHolder.getShards().length;
int newNumShards = prevNumShards * shardingFactor;
HollowTypeReadStateShard[] newShards = new HollowTypeReadStateShard[newNumShards];

for(int i=0; i<prevNumShards; i++) {
for (int j=0; j<shardingFactor; j++) {
newShards[i+(prevNumShards*j)] = shardsHolder.getShards()[i];
}
}
return newShards;
}

public HollowTypeReadStateShard[] splitDataElementsForOneShard(HollowTypeReadState typeState, int currentIndex, int prevNumShards, int shardingFactor) {
ShardsHolder shardsHolder = typeState.getShardsVolatile();
int newNumShards = shardsHolder.getShards().length;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowTypeDataElements dataElementsToSplit = shardsHolder.getShards()[currentIndex].getDataElements();
HollowTypeDataElementsSplitter splitter = createDataElementsSplitter(dataElementsToSplit, shardingFactor);
HollowTypeDataElements[] splits = splitter.split();

HollowTypeReadStateShard[] newShards = Arrays.copyOf(shardsHolder.getShards(), shardsHolder.getShards().length);
for (int i = 0; i < shardingFactor; i ++) {
newShards[currentIndex + (prevNumShards*i)] = typeState.createTypeReadStateShard(typeState.getSchema(), splits[i], newShardOrdinalShift);
}
return newShards;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netflix.hollow.core.read.engine;

public abstract class ShardsHolder {

public abstract HollowTypeReadStateShard[] getShards();

public abstract int getShardNumberMask();

}
Loading

0 comments on commit c8e3d2d

Please sign in to comment.