Skip to content

Commit

Permalink
Read state concurrent access bugfix- atomicity in accessing fields of…
Browse files Browse the repository at this point in the history
… shard holder
  • Loading branch information
Sunjeet committed Oct 7, 2023
1 parent 547fcab commit 368e360
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.hollow.api.sampling.HollowObjectSampler;
import com.netflix.hollow.api.sampling.HollowSampler;
import com.netflix.hollow.api.sampling.HollowSamplingDirector;
import com.netflix.hollow.core.memory.HollowUnsafeHandle;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
import com.netflix.hollow.core.memory.encoding.VarInt;
Expand Down Expand Up @@ -60,7 +61,7 @@ public ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataElem
this.shards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
this.shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, shardOrdinalShifts[i]);
this.shards[i].setCurrentData(this, dataElements[i]);
this.shards[i].setCurrentData(dataElements[i]);
}
}

Expand Down Expand Up @@ -110,7 +111,7 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
for(int i = 0; i< shardsVolatile.shards.length; i++) {
HollowObjectTypeDataElements snapshotData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData.readSnapshot(in, unfilteredSchema);
shardsVolatile.shards[i].setCurrentData(shardsVolatile, snapshotData);
shardsVolatile.shards[i].setCurrentData(snapshotData);
}

if(shardsVolatile.shards.length == 1)
Expand Down Expand Up @@ -153,7 +154,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen
HollowObjectTypeDataElements nextData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
HollowObjectTypeDataElements oldData = shardsVolatile.shards[i].currentDataElements();
nextData.applyDelta(oldData, deltaData);
shardsVolatile.shards[i].setCurrentData(shardsVolatile, nextData);
shardsVolatile.shards[i].setCurrentData(nextData);
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);
deltaData.encodedAdditions.destroy();
oldData.destroy();
Expand Down Expand Up @@ -360,64 +361,134 @@ public int readOrdinal(int ordinal, int fieldIndex) {
@Override
public int readInt(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
int result;

do {
shardsHolder = this.shardsVolatile; // this read can return a stale shardHolder with greater or lesser than current
// num shards but maxOrdinal remains same across stale vs current. So given this
// atomic assignment below operations on a stale shards holder will be legal
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public float readFloat(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readFloat(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
float result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readFloat(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public double readDouble(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
double result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public long readLong(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readLong(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
long result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readLong(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public Boolean readBoolean(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readBoolean(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
Boolean result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readBoolean(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public byte[] readBytes(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readBytes(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
byte[] result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readBytes(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public String readString(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readString(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
String result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readString(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public boolean isStringFieldEqual(int ordinal, int fieldIndex, String testValue) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.isStringFieldEqual(ordinal >> shard.shardOrdinalShift, fieldIndex, testValue);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
boolean result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.isStringFieldEqual(ordinal >> shard.shardOrdinalShift, fieldIndex, testValue);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public int findVarLengthFieldHashCode(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.findVarLengthFieldHashCode(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
int hashCode;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
hashCode = shard.findVarLengthFieldHashCode(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return hashCode;
}

private boolean readWasUnsafe(ShardsHolder shardsHolder) {
HollowUnsafeHandle.getUnsafe().loadFence(); // for why, see explanation in HollowObjectTypeReadStateShard::readWasUnsafe
return shardsHolder != shardsVolatile;
}

/**
Expand Down Expand Up @@ -522,7 +593,7 @@ void setCurrentData(HollowObjectTypeDataElements data) {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
if(shards.length > 1)
throw new UnsupportedOperationException("Cannot directly set data on sharded type state");
shards[0].setCurrentData(this.shardsVolatile, data);
shards[0].setCurrentData(data);
maxOrdinal = data.maxOrdinal;
}

Expand Down
Loading

0 comments on commit 368e360

Please sign in to comment.