Skip to content

Commit

Permalink
add test for List type resharding
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Sep 7, 2024
1 parent afa9513 commit 9e377a4
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
if(typeState != null) {
if (shouldReshard(typeState, typeState.numShards(), numShards)) {
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);
reshardingStrategy.reshard(typeState, numShards);
reshardingStrategy.reshard(typeState, typeState.numShards(), numShards);
}
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ public static HollowTypeReshardingStrategy getInstance(HollowTypeReadState typeS
* Reshards this type state to the desired shard count using O(shard size) space while supporting concurrent reads
* into the underlying data elements.
*
* @param newNumShards The desired number of shards
* @param typeState The type state to reshard
* @param prevNumShards The current number of shards in typeState
* @param newNumShards The desired number of shards for typeState
*/
public void reshard(HollowTypeReadState typeState, int newNumShards) {
int prevNumShards = typeState.getShardsVolatile().getShards().length; // SNAP: TODO: or .shards.length
public void reshard(HollowTypeReadState typeState, int prevNumShards, int newNumShards) {
int shardingFactor = shardingFactor(prevNumShards, newNumShards);
HollowTypeDataElements[] newDataElements;
int[] shardOrdinalShifts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public HollowListTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {

@Override
public HollowTypeReadStateShard[] getShards() {
throw new UnsupportedOperationException("Not implemented yet");
// return shards;
return shards;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public HollowMapTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {

@Override
public HollowTypeReadStateShard[] getShards() {
throw new UnsupportedOperationException("Not implemented yet");
// return shards;
return shards;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public HollowSetTypeShardsHolder(HollowTypeReadStateShard[] fromShards) {

@Override
public HollowTypeReadStateShard[] getShards() {
throw new UnsupportedOperationException("Not implemented yet");
// return shards;
return shards;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package com.netflix.hollow.core.read.engine;

import static com.netflix.hollow.core.read.engine.HollowTypeReshardingStrategy.shardingFactor;
import static junit.framework.TestCase.assertEquals;

import com.netflix.hollow.api.objects.generic.GenericHollowObject;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeReadState;
import com.netflix.hollow.core.read.engine.object.HollowObjectTypeShardsHolder;
import com.netflix.hollow.core.schema.HollowObjectSchema;
import com.netflix.hollow.core.util.StateEngineRoundTripper;
import com.netflix.hollow.core.write.HollowObjectTypeWriteState;
import com.netflix.hollow.core.write.HollowObjectWriteRecord;
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import java.io.IOException;
import java.util.Random;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;

public class HollowTypeReshardingStrategyTest {

private HollowReadStateEngine readStateEngine;
private HollowObjectSchema schema;

@Test
public void testMappingAnOrdinalToAShardAndBack() {
int maxOrdinal = 1000;
int numShards = 4;
int minRecordLocationsPerShard = (maxOrdinal + 1) / numShards;
int[][] shardOrdinals = new int[numShards][];
for(int i=0;i<numShards;i++) {
int maxShardOrdinal = (i < ((maxOrdinal + 1) & (numShards - 1))) ? minRecordLocationsPerShard : minRecordLocationsPerShard - 1;
shardOrdinals[i] = new int[maxShardOrdinal + 1];
}

int shardNumberMask = numShards - 1;
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);

for (int ordinal=0; ordinal<=maxOrdinal; ordinal++) {
int shardIndex = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;
shardOrdinals[shardIndex][shardOrdinal] = ordinal;
}

for (int shardIndex=0; shardIndex<numShards; shardIndex++) {
for (int shardOrdinal=0; shardOrdinal<shardOrdinals[shardIndex].length; shardOrdinal++) {
int ordinal = (shardOrdinal * numShards) + shardIndex;
assertEquals(shardOrdinals[shardIndex][shardOrdinal], ordinal);
}
}
}

@Test
public void testShardingFactor() {
assertEquals(2, shardingFactor(1, 2));
assertEquals(2, shardingFactor(2, 1));

assertEquals(2, shardingFactor(4, 2));
assertEquals(2, shardingFactor(2, 4));

assertEquals(16, shardingFactor(1, 16));
assertEquals(16, shardingFactor(32, 2));

assertIllegalStateException(() -> shardingFactor(0, 1));
assertIllegalStateException(() -> shardingFactor(2, 0));
assertIllegalStateException(() -> shardingFactor(1, 1));
assertIllegalStateException(() -> shardingFactor(1, -1));
assertIllegalStateException(() -> shardingFactor(2, 3));
}

@Test
public void testReshardingIntermediateStages_expandWithOriginalDataElements() throws Exception {
for (int shardingFactor : new int[]{2, 4}) {
for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000))
{
HollowObjectTypeReadState expectedTypeState = populateTypeStateWith(numRecords);
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(expectedTypeState);

HollowObjectTypeShardsHolder original = expectedTypeState.getShardsVolatile();
HollowObjectTypeReadState actualTypeState = new HollowObjectTypeReadState(readStateEngine, MemoryMode.ON_HEAP, schema, schema);
actualTypeState.updateShardsVolatile(reshardingStrategy.expandWithOriginalDataElements(original, shardingFactor));

assertEquals(shardingFactor * expectedTypeState.numShards(), actualTypeState.numShards());
assertDataUnchanged(actualTypeState, numRecords);
}
}
}

@Test
public void testReshardingIntermediateStages_splitDataElementsForOneShard() throws Exception {
for (int shardingFactor : new int[]{2, 4}) {
for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(5000))
{
HollowObjectTypeReadState typeState = populateTypeStateWith(numRecords);
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);

HollowObjectTypeShardsHolder originalShardsHolder = typeState.getShardsVolatile();
int originalNumShards = typeState.numShards();

// expand shards
typeState.updateShardsVolatile(reshardingStrategy.expandWithOriginalDataElements(originalShardsHolder, shardingFactor));

for(int i=0; i<originalNumShards; i++) {
HollowTypeDataElements originalDataElements = typeState.getShardsVolatile().getShards()[i].getDataElements();

typeState.updateShardsVolatile(reshardingStrategy.splitDataElementsForOneShard(typeState, i, originalNumShards, shardingFactor));

assertEquals(shardingFactor * originalNumShards, typeState.numShards());
assertDataUnchanged(typeState, numRecords); // as each original shard is processed

originalDataElements.destroy();
}
}
}
}

@Test
public void testReshardingIntermediateStages_joinDataElementsForOneShard() throws Exception {
for (int shardingFactor : new int[]{2, 4, 8}) {
for (int numRecords = 75000; numRecords <= 100000; numRecords += new Random().nextInt(1000)) {
HollowObjectTypeReadState typeState = populateTypeStateWith(numRecords);
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(typeState);

HollowObjectTypeShardsHolder originalShardsHolder = typeState.getShardsVolatile();
int originalNumShards = typeState.numShards();
assertEquals(8, originalNumShards);

int newNumShards = originalNumShards / shardingFactor;
for (int i=0; i<newNumShards; i++) {
HollowTypeDataElements dataElementsToJoin[] = new HollowObjectTypeDataElements[shardingFactor];
for (int j=0; j<shardingFactor; j++) {
dataElementsToJoin[j] = originalShardsHolder.getShards()[i + (newNumShards*j)].getDataElements();
};

typeState.updateShardsVolatile(reshardingStrategy.joinDataElementsForOneShard(typeState, i, shardingFactor));

for (int j = 0; j < shardingFactor; j ++) {
dataElementsToJoin[j].destroy();
};

assertEquals(originalNumShards, typeState.numShards()); // numShards remains unchanged
assertDataUnchanged(typeState, numRecords); // as each original shard is processed
}
}
}
}

private HollowObjectTypeReadState populateTypeStateWith(int numRecords) throws IOException {
HollowWriteStateEngine writeStateEngine = new HollowWriteStateEngine();
schema = new HollowObjectSchema("TestObject", 4);
schema.addField("longField", HollowObjectSchema.FieldType.LONG);
schema.addField("stringField", HollowObjectSchema.FieldType.STRING);
schema.addField("intField", HollowObjectSchema.FieldType.INT);
schema.addField("doubleField", HollowObjectSchema.FieldType.DOUBLE);
writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema));
writeStateEngine.setTargetMaxTypeShardSize(4 * 100 * 1024);

HollowObjectWriteRecord rec = new HollowObjectWriteRecord(schema);
for(int i=0;i<numRecords;i++) {
rec.reset();
rec.setLong("longField", i);
rec.setString("stringField", "Value" + i);
rec.setInt("intField", i);
rec.setDouble("doubleField", i);

writeStateEngine.add("TestObject", rec);
}
readStateEngine = new HollowReadStateEngine();
StateEngineRoundTripper.roundTripSnapshot(writeStateEngine, readStateEngine);
return (HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject");
}

private void assertDataUnchanged(HollowObjectTypeReadState typeState, int numRecords) {
for(int i=0;i<numRecords;i++) {
GenericHollowObject obj = new GenericHollowObject(typeState, i);
Assert.assertEquals(i, obj.getLong("longField"));
Assert.assertEquals("Value"+i, obj.getString("stringField"));
Assert.assertEquals((double)i, obj.getDouble("doubleField"), 0);
if (typeState.getSchema().numFields() == 4) { // filtered
Assert.assertEquals(i, obj.getInt("intField"));
}
}
}

private void assertIllegalStateException(Supplier<Integer> invocation) {
try {
invocation.get();
Assert.fail();
} catch (IllegalStateException e) {
// expected
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ protected HollowListTypeReadState populateTypeStateWith(int[][] listContents) th
return (HollowListTypeReadState) readStateEngine.getTypeState("TestList");
}

protected int[][] generateListContents(int numRecords) {
int[][] listContents = new int[numRecords][];
for (int i=0;i<numRecords;i++) {
listContents[i] = new int[i+1];
for (int j=0;j<i+1;j++) {
listContents[i][j] = j;
}
}
return listContents;
}

protected void assertDataUnchanged(HollowListTypeReadState typeState, int[][] listContents) {
int numListRecords = listContents.length;
for(int i=0;i<numListRecords;i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@ public class HollowListTypeDataElementsSplitJoinTest extends AbstractHollowListT
@Test
public void testSplitThenJoin() throws IOException {

int numListRecords = 100;
int[][] listContents = new int[numListRecords][];
for (int i=0;i<numListRecords;i++) {
listContents[i] = new int[i+1];
for (int j=0;j<i+1;j++) {
listContents[i][j] = j;
}
}
int maxNumListRecords = 100;

// 1->2->1, 1->4->1, ...
for (int listRecord=0;listRecord<numListRecords;listRecord++) {
for (int numRecords=0;numRecords<maxNumListRecords;numRecords++) { // SNAP: TODO: what is this iteration for?

int[][] listContents = generateListContents(numRecords);
HollowListTypeReadState typeReadState = populateTypeStateWith(listContents);
assertEquals(1, typeReadState.numShards());
assertEquals(numListRecords, typeReadState.getPopulatedOrdinals().cardinality());
assertEquals(numRecords, typeReadState.getPopulatedOrdinals().cardinality());
assertDataUnchanged(typeReadState, listContents);

for (int numSplits : new int[]{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.netflix.hollow.core.read.engine.list;

import static junit.framework.TestCase.assertEquals;

import com.netflix.hollow.core.read.engine.HollowTypeReshardingStrategy;
import java.util.Random;
import org.junit.Test;

public class HollowListTypeReadStateTest extends AbstractHollowListTypeDataElementsSplitJoinTest {

@Test
public void testResharding() throws Exception {

for (int shardingFactor : new int[]{2, 4, 8, 16, 32, 64, 128, 256, 512, 1024})
{
for(int numRecords=1;numRecords<=1000;numRecords+=new Random().nextInt(100))
{
int[][] listContents = generateListContents(numRecords);
HollowListTypeReadState listTypeReadState = populateTypeStateWith(listContents);
assertDataUnchanged(listTypeReadState, listContents);
HollowTypeReshardingStrategy reshardingStrategy = HollowTypeReshardingStrategy.getInstance(listTypeReadState);

// Splitting shards
{
int prevShardCount = listTypeReadState.numShards();
int newShardCount = shardingFactor * prevShardCount;
reshardingStrategy.reshard(listTypeReadState, listTypeReadState.numShards(), newShardCount);

assertEquals(newShardCount, listTypeReadState.numShards());
assertEquals(newShardCount, shardingFactor * prevShardCount);
}
assertDataUnchanged(listTypeReadState, listContents);

// Joining shards
{
int prevShardCount = listTypeReadState.numShards();
int newShardCount = prevShardCount / shardingFactor;
reshardingStrategy.reshard(listTypeReadState, listTypeReadState.numShards(), newShardCount);

assertEquals(newShardCount, listTypeReadState.numShards());
assertEquals(shardingFactor * newShardCount, prevShardCount);
}
assertDataUnchanged(listTypeReadState, listContents);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void testSplitThenJoin() throws IOException {
};

// 1->2->1, 1->4->1, ...
for (int listRecord=0;listRecord<maps.length;listRecord++) {
for (int listRecord=0;listRecord<maps.length;listRecord++) { // SNAP: TODO: what is this iteration for?
HollowMapTypeReadState typeReadState = populateTypeStateWith(maps);
assertEquals(1, typeReadState.numShards());
assertEquals(maps.length, typeReadState.getPopulatedOrdinals().cardinality());
Expand Down
Loading

0 comments on commit 9e377a4

Please sign in to comment.