Skip to content

Commit

Permalink
Add tests for resharding with type filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 9, 2023
1 parent 1ea0ebc commit deeefa8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void populateStats(HollowObjectTypeDataElements to, HollowObjectTypeDataElements
to.bitsPerRecord += to.bitsPerField[fieldIdx];
}

to.bitsPerUnfilteredField = from[0].bitsPerUnfilteredField;
to.unfilteredFieldIsIncluded = from[0].unfilteredFieldIsIncluded;
// unused
// to.bitsPerUnfilteredField
// to.unfilteredFieldIsIncluded
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

import com.netflix.hollow.api.objects.generic.GenericHollowObject;
import com.netflix.hollow.core.AbstractStateEngineTest;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.core.read.filter.HollowFilterConfig;
import com.netflix.hollow.core.schema.HollowObjectSchema;
import com.netflix.hollow.core.util.StateEngineRoundTripper;
import com.netflix.hollow.core.write.HollowObjectTypeWriteState;
import com.netflix.hollow.core.write.HollowObjectWriteRecord;
import java.io.IOException;
Expand Down Expand Up @@ -39,7 +42,7 @@ protected void initializeTypeStates() {
writeStateEngine.addTypeState(new HollowObjectTypeWriteState(schema));
}

protected HollowObjectTypeReadState populateTypeStateWith(int numRecords) throws IOException {
private void populateWriteStateEngine(int numRecords) {
initWriteStateEngine();
HollowObjectWriteRecord rec = new HollowObjectWriteRecord(schema);
for(int i=0;i<numRecords;i++) {
Expand All @@ -51,17 +54,33 @@ protected HollowObjectTypeReadState populateTypeStateWith(int numRecords) throws

writeStateEngine.add("TestObject", rec);
}
}

protected HollowObjectTypeReadState populateTypeStateWith(int numRecords) throws IOException {
populateWriteStateEngine(numRecords);
roundTripSnapshot();
return (HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject");
}

protected HollowObjectTypeReadState populateTypeStateWithFilter(int numRecords) throws IOException {
populateWriteStateEngine(numRecords);
readStateEngine = new HollowReadStateEngine();
HollowFilterConfig readFilter = new HollowFilterConfig(true);
readFilter.addField("TestObject", "intField");
StateEngineRoundTripper.roundTripSnapshot(writeStateEngine, readStateEngine, readFilter);
return (HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject");
}

protected void assertDataUnchanged(int numRecords) {
for(int i=0;i<numRecords;i++) {
GenericHollowObject obj = new GenericHollowObject(readStateEngine, "TestObject", i);
assertEquals(i, obj.getLong("longField"));
assertEquals("Value"+i, obj.getString("stringField"));
assertEquals(i, obj.getInt("intField"));
HollowObjectTypeReadState typeState = (HollowObjectTypeReadState) readStateEngine.getTypeState("TestObject");
assertEquals((double)i, obj.getDouble("doubleField"), 0);
if (typeState.getSchema().numFields() == 4) { // filtered
assertEquals(i, obj.getInt("intField"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ public void testSplitThenJoin() throws IOException {
}
}

@Test
public void testSplitThenJoinWithFilter() throws IOException {
HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter();
HollowObjectTypeDataElementsJoiner joiner = new HollowObjectTypeDataElementsJoiner();

int numSplits = 2;
for (int numRecords=0;numRecords<1*1000;numRecords++) {
HollowObjectTypeReadState typeReadState = populateTypeStateWithFilter(numRecords);
assertEquals(1, typeReadState.numShards());
assertDataUnchanged(numRecords);
HollowChecksum origChecksum = typeReadState.getChecksum(typeReadState.getSchema());

HollowObjectTypeDataElements[] splitElements = splitter.split(typeReadState.currentDataElements()[0], numSplits);
HollowObjectTypeDataElements joinedElements = joiner.join(splitElements);
typeReadState.setCurrentData(joinedElements);

assertDataUnchanged(numRecords);
HollowChecksum resultChecksum = typeReadState.getChecksum(typeReadState.getSchema());
assertEquals(origChecksum, resultChecksum);
}
}

// manually invoked
// @Test
public void testSplittingAndJoiningWithSnapshotBlob() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,41 @@ public void testResharding() throws Exception {
}
}

@Test
public void testReshardingWithFilter() throws Exception {

for (int shardingFactor : new int[]{2, 64})
{
for(int numRecords=1;numRecords<=100000;numRecords+=new Random().nextInt(10000))
{
HollowObjectTypeReadState objectTypeReadState = populateTypeStateWithFilter(numRecords);
assertDataUnchanged(numRecords);

// Splitting shards
{
int prevShardCount = objectTypeReadState.numShards();
int newShardCount = shardingFactor * prevShardCount;
objectTypeReadState.reshard(newShardCount);

assertEquals(newShardCount, objectTypeReadState.numShards());
assertEquals(newShardCount, shardingFactor * prevShardCount);
}
assertDataUnchanged(numRecords);

// Joining shards
{
int prevShardCount = objectTypeReadState.numShards();
int newShardCount = prevShardCount / shardingFactor;
objectTypeReadState.reshard(newShardCount);

assertEquals(newShardCount, objectTypeReadState.numShards());
assertEquals(shardingFactor * newShardCount, prevShardCount);
}
assertDataUnchanged(numRecords);
}
}
}

@Test
public void testReshardingIntermediateStages_expandWithOriginalDataElements() throws Exception {
for (int shardingFactor : new int[]{2, 4, 8}) {
Expand Down

0 comments on commit deeefa8

Please sign in to comment.