Skip to content

Commit

Permalink
Object type read state supports resharding
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 11, 2023
1 parent f72303e commit f24190b
Show file tree
Hide file tree
Showing 9 changed files with 603 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,9 @@ private String readTypeStateDelta(HollowBlobInput in) throws IOException {
HollowSchema schema = HollowSchema.readFrom(in);

int numShards = readNumShards(in);

HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler());
typeState.applyDelta(in, schema, stateEngine.getMemoryRecycler(), numShards);
} else {
discardDelta(in, schema, numShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ public BitSet getPreviousOrdinals() {
public abstract int maxOrdinal();

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;
public abstract void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
return currNumShards!=0 && deltaNumShards!=0 && currNumShards!=deltaNumShards;
}

public HollowSchema getSchema() {
return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException {
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
}

@Override
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler) throws IOException {
public void applyDelta(HollowBlobInput in, HollowSchema schema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException {
if (shouldReshard(shards.length, deltaNumShards)) {
throw new UnsupportedOperationException("Dynamic type sharding not supported for " + schema.getName()
+ ". Current numShards=" + shards.length + ", delta numShards=" + deltaNumShards);
}
if(shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.netflix.hollow.core.read.engine.object;

import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullField;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullFixedLengthField;
import static com.netflix.hollow.core.read.engine.object.HollowObjectTypeDataElements.writeNullVarLengthField;

import com.netflix.hollow.core.memory.SegmentedByteArray;
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
Expand Down Expand Up @@ -175,7 +179,7 @@ private void mergeOrdinal(int i) {
long readStartBit = currentFromStateReadFixedLengthStartBit + from.bitOffsetPerField[fieldIndex];
copyRecordField(fieldIndex, fieldIndex, from, readStartBit, currentWriteFixedLengthStartBit, currentFromStateReadVarLengthDataPointers, currentWriteVarLengthDataPointers, removeData);
} else if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
}
}
currentWriteFixedLengthStartBit += target.bitsPerField[fieldIndex];
Expand All @@ -193,7 +197,7 @@ private void mergeOrdinal(int i) {

private void addFromDelta(boolean removeData, int fieldIndex, int deltaFieldIndex) {
if(deltaFieldIndex == -1) {
writeNullField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long readStartBit = currentDeltaStateReadFixedLengthStartBit + delta.bitOffsetPerField[deltaFieldIndex];
copyRecordField(fieldIndex, deltaFieldIndex, delta, readStartBit, currentWriteFixedLengthStartBit, currentDeltaReadVarLengthDataPointers, currentWriteVarLengthDataPointers, false);
Expand All @@ -214,7 +218,7 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp

if(target.varLengthData[fieldIndex] != null) {
if((readValue & (1L << (copyFromData.bitsPerField[fromFieldIndex] - 1))) != 0) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
writeNullVarLengthField(target, fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
long readStart = currentReadVarLengthDataPointers[fieldIndex];
long length = readValue - readStart;
Expand All @@ -228,28 +232,9 @@ private void copyRecordField(int fieldIndex, int fromFieldIndex, HollowObjectTyp
}
} else if(!removeData) {
if(readValue == copyFromData.nullValueForField[fromFieldIndex])
writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit);
writeNullFixedLengthField(target, fieldIndex, currentWriteFixedLengthStartBit);
else
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], readValue);
}
}

private void writeNullField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
if(target.varLengthData[fieldIndex] != null) {
writeNullVarLengthField(fieldIndex, currentWriteFixedLengthStartBit, currentWriteVarLengthDataPointers);
} else {
writeNullFixedLengthField(fieldIndex, currentWriteFixedLengthStartBit);
}
}

private void writeNullVarLengthField(int fieldIndex, long currentWriteFixedLengthStartBit, long[] currentWriteVarLengthDataPointers) {
long writeValue = (1L << (target.bitsPerField[fieldIndex] - 1)) | currentWriteVarLengthDataPointers[fieldIndex];
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], writeValue);
}

private void writeNullFixedLengthField(int fieldIndex, long currentWriteFixedLengthStartBit) {
target.fixedLengthData.setElementValue(currentWriteFixedLengthStartBit, target.bitsPerField[fieldIndex], target.nullValueForField[fieldIndex]);
}


}
Loading

0 comments on commit f24190b

Please sign in to comment.