Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer delta application supports re-sharding for Object type #644

Merged
merged 6 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package com.netflix.hollow.core.read.engine.object;


import com.netflix.hollow.core.read.dataaccess.HollowObjectTypeDataAccess;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.core.util.StateEngineRoundTripper;
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@State(Scope.Thread)
@BenchmarkMode({Mode.All})
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 15, time = 1)
@Fork(1)
/**
* Runs delta transitions in the background while benchmarking reads. Re-sharding in delta transitions can be toggled with a param.
*/
public class HollowObjectTypeReadStateDeltaTransitionBenchmark {
HollowWriteStateEngine writeStateEngine;
HollowReadStateEngine readStateEngine;
HollowObjectTypeDataAccess dataAccess;
HollowObjectMapper objectMapper;

int countStringsToRead = 500;

@Param({ "true" })
boolean isReshardingEnabled;

@Param({ "500", "1000" })
int shardSizeKBs;

@Param({ "5", "100" })
int maxStringLength;

int countStringsDb = 100000;

int deltaChanges = 2000;

ArrayList<Integer> readOrder;

ExecutorService refreshExecutor;
Future<?> reshardingFuture;
CountDownLatch doneBenchmark;

final Random r = new Random();

@Setup(Level.Iteration)
public void setUp() throws ExecutionException, InterruptedException {
final List<String> readStrings = new ArrayList<>();
final Set<Integer> readKeys = new HashSet<>();
refreshExecutor = Executors.newSingleThreadExecutor();

refreshExecutor.submit(() -> {
writeStateEngine = new HollowWriteStateEngine();
writeStateEngine.setTargetMaxTypeShardSize((long) shardSizeKBs * 1000l);
objectMapper = new HollowObjectMapper(writeStateEngine);
objectMapper.initializeTypeState(String.class);

readOrder = new ArrayList<>(countStringsToRead);
for (int i = 0; i < countStringsToRead; i++) {
readOrder.add(r.nextInt(countStringsDb));
}
readKeys.addAll(readOrder);

for (int i = 0; i < countStringsDb; i++) {
StringBuilder sb = new StringBuilder();
sb.append("string_");
sb.append(i);
sb.append("_");
int thisStringLength = r.nextInt(maxStringLength) - sb.length() + 1;
for (int j = 0; j < thisStringLength; j++) {
sb.append((char) (r.nextInt(26) + 'a'));
}
String s = sb.toString();
objectMapper.add(s);
if (readKeys.contains(i)) {
readStrings.add(s);
}
}

readStateEngine = new HollowReadStateEngine();
try {
StateEngineRoundTripper.roundTripSnapshot(writeStateEngine, readStateEngine, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
dataAccess = (HollowObjectTypeDataAccess) readStateEngine.getTypeDataAccess("String", 0);
}).get();

doneBenchmark = new CountDownLatch(1);
reshardingFuture = refreshExecutor.submit(() -> {
Random r = new Random();
long origShardSize = shardSizeKBs * 1000l;
long newShardSize = origShardSize;
do {
for (int i=0; i<readStrings.size(); i++) {
objectMapper.add(readStrings.get(i));
}
for (int i = 0; i < deltaChanges; i++) {
int changeKey = r.nextInt(countStringsDb);
if (readKeys.contains(changeKey)) {
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("string_");
sb.append(changeKey);
sb.append("_");
int thisStringLength = r.nextInt(maxStringLength) - sb.length() + 1;
for (int j = 0; j < thisStringLength; j++) {
sb.append((char) (r.nextInt(26) + 'a'));
}
objectMapper.add(sb.toString());
}

try {
if (isReshardingEnabled) {
if (newShardSize == origShardSize) {
newShardSize = origShardSize / 10;
} else {
newShardSize = origShardSize;
}
writeStateEngine.setTargetMaxTypeShardSize(newShardSize);
}
StateEngineRoundTripper.roundTripDelta(writeStateEngine, readStateEngine);
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (doneBenchmark.getCount() > 0);
});
}

@TearDown(Level.Iteration)
public void tearDown() {
doneBenchmark.countDown();
reshardingFuture.cancel(true);
refreshExecutor.shutdown();
try {
if (!refreshExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
refreshExecutor.shutdownNow();
}
} catch (InterruptedException e) {
refreshExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

@Benchmark
public void testReadString(Blackhole bh) {
int j = r.nextInt(readOrder.size());
String result = dataAccess.readString(j, 0);
bh.consume(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards));
populateTypeStateSnapshotWithNumShards(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema), numShards);
}
} else if (schema instanceof HollowListSchema) {
if(!filter.includes(typeName)) {
Expand Down Expand Up @@ -361,14 +361,22 @@ private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState t
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
if (numShards<=0 || ((numShards&(numShards-1))!=0)) {
throw new IllegalArgumentException("Number of shards must be a power of 2!");
}

stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler(), numShards);
}

private String readTypeStateDelta(HollowBlobInput in) throws IOException {
HollowSchema schema = HollowSchema.readFrom(in);

int numShards = readNumShards(in);

HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler());
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ public BitSet getPreviousOrdinals() {
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;
public abstract void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

public HollowSchema getSchema() {
return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem
this.shards = shards;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand All @@ -91,7 +96,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException {
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo

}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand All @@ -98,7 +103,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException {
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.netflix.hollow.core.read.engine.object;

import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullField;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullFixedLengthField;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullVarLengthField;

import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
Expand Down Expand Up @@ -175,7 +179,7 @@ private void mergeOrdinal(int i) {
long readStartBit = currentFromStateReadFixedLengthStartBit + from.bitOffsetPerField[fieldIndex];
copyRecordField(fieldIndex, fieldIndex, from, readStartBit, currentWriteFixedLengthStartBit, currentFromStateReadVarLengthDataPointers, currentWriteVarLengthDataPointers, removeData);
} else if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
}
}
currentWriteFixedLengthStartBit += target.bitsPerField[fieldIndex];
Expand All @@ -193,7 +197,7 @@ private void mergeOrdinal(int i) {

private void addFromDelta(boolean removeData, int fieldIndex, int deltaFieldIndex) {
if(deltaFieldIndex == -1) {
writeNullField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long readStartBit = currentDeltaStateReadFixedLengthStartBit + delta.bitOffsetPerField[deltaFieldIndex];
copyRecordField(fieldIndex, deltaFieldIndex, delta, readStartBit, currentWriteFixedLengthStartBit, currentDeltaReadVarLengthDataPointers, currentWriteVarLengthDataPointers, false);
Expand All @@ -214,7 +218,7 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp

if(target.varLengthData[fieldIndex] != null) {
if((readValue & (1L << (copyFromData.bitsPerField[fromFieldIndex] - 1))) != 0) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long readStart = currentReadVarLengthDataPointers[fieldIndex];
long length = readValue - readStart;
Expand All @@ -228,28 +232,9 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp
}
} else if(!removeData) {
if(readValue == copyFromData.nullValueForField[fromFieldIndex])
writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit);
writeNullFixedLengthField(target, fieldIndex, currentWriteFixedLengthStartBit);
else
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], readValue);
}
}

private void writeNullField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit);
}
}

private void writeNullVarLengthField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
long writeValue = (1L << (target.bitsPerField[fieldIndex] - 1)) | currentWriteVarLengthDataPointers[fieldIndex];
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], writeValue);
}

private void writeNullFixedLengthField(int fieldIndex, long currentWriteFixedLengthStartBit) {
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], target.nullValueForField[fieldIndex]);
}


}
Loading