Skip to content

Commit

Permalink
WIP: still refactoring, now builds
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 16, 2023
1 parent db93829 commit 8807ee6
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ private String readTypeStateSnapshot(HollowBlobInput in, TypeFilter filter) thro
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema, numShards));
// Object types support numShards application at snapshot application time, other types allocate a fixed numShards now
populateTypeStateSnapshotWithNumShards(in, new HollowObjectTypeReadState(stateEngine, memoryMode, filteredSchema, unfilteredSchema), numShards);
}
} else if (schema instanceof HollowListSchema) {
if(!filter.includes(typeName)) {
Expand Down Expand Up @@ -361,6 +362,15 @@ private void populateTypeStateSnapshot(HollowBlobInput in, HollowTypeReadState t
typeState.readSnapshot(in, stateEngine.getMemoryRecycler());
}

private void populateTypeStateSnapshotWithNumShards(HollowBlobInput in, HollowTypeReadState typeState, int numShards) throws IOException {
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); // SNAP: TODO: can simplify to power of 2 ch
if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");

stateEngine.addTypeState(typeState);
typeState.readSnapshot(in, stateEngine.getMemoryRecycler(), numShards);
}

private String readTypeStateDelta(HollowBlobInput in) throws IOException {
HollowSchema schema = HollowSchema.readFrom(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public BitSet getPreviousOrdinals() {

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler) throws IOException;

public abstract void readSnapshot(HollowBlobInput in, ArraySegmentRecycler recycler, int numShards) throws IOException;

public abstract void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmentRecycler memoryRecycler, int deltaNumShards) throws IOException;

protected boolean shouldReshard(int currNumShards, int deltaNumShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public HollowListTypeReadState(HollowReadStateEngine stateEngine, MemoryMode mem
this.shards = shards;
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public HollowMapTypeReadState(HollowReadStateEngine stateEngine, MemoryMode memo

}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
throw new UnsupportedOperationException("This type does not yet support numShards specification when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shards.length > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public IntMap getOrdinalMapping() {
}

public HollowObjectTypeReadState createHistoricalTypeReadState() {
HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(null, typeState.getSchema());
historicalTypeState.setCurrentData(historicalDataElements);
HollowObjectTypeReadState historicalTypeState = new HollowObjectTypeReadState(typeState.getSchema(), historicalDataElements);

return historicalTypeState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,65 +49,55 @@ public class HollowObjectTypeReadState extends HollowTypeReadState implements Ho

private final HollowObjectSchema unfilteredSchema;
private final HollowObjectSampler sampler;
private int maxOrdinal;
volatile ShardsHolder shardsVolatile;

static class ShardsHolder {
final HollowObjectTypeReadStateShard shards[];
final int shardNumberMask;

private ShardsHolder(HollowSchema schema, int numShards) {
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
for(int i=0; i<numShards; i++) {
shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, shardOrdinalShift);
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

private ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataElements, int[] shardOrdinalShifts) {
int numShards = dataElements.length;
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, shardOrdinalShifts[i]);
shards[i].setCurrentData(dataElements[i]);
shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, dataElements[i], shardOrdinalShifts[i]);
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

private ShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard updatedShard, int updatedShardIndex) {
private ShardsHolder(HollowObjectTypeReadStateShard[] oldShards, HollowObjectTypeReadStateShard newShard, int newShardIndex) {
int numShards = oldShards.length;
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
if (i == updatedShardIndex) {
shards[i] = new HollowObjectTypeReadStateShard(updatedShard, this);
if (i == newShardIndex) {
shards[i] = newShard;
} else {
shards[i] = new HollowObjectTypeReadStateShard(oldShards[i], this);
shards[i] = oldShards[i];
}
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}
}

volatile ShardsHolder shardsVolatile;

private int maxOrdinal;

public HollowObjectTypeReadState(HollowReadStateEngine fileEngine, HollowObjectSchema schema) {
this(fileEngine, MemoryMode.ON_HEAP, schema, schema, 1);
}

public HollowObjectTypeReadState(HollowReadStateEngine fileEngine, MemoryMode memoryMode, HollowObjectSchema schema, HollowObjectSchema unfilteredSchema, int numShards) {
public HollowObjectTypeReadState(HollowReadStateEngine fileEngine, MemoryMode memoryMode, HollowObjectSchema schema, HollowObjectSchema unfilteredSchema) {
super(fileEngine, memoryMode, schema);
this.sampler = new HollowObjectSampler(schema, DisabledSamplingDirector.INSTANCE);
this.unfilteredSchema = unfilteredSchema;

int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");
// this.shardsVolatile = new ShardsHolder(schema, numShards); // SNAP: TODO: Remove
this.shardsVolatile = null;
}

public HollowObjectTypeReadState(HollowObjectSchema schema, HollowObjectTypeDataElements dataElements) {
super(null, MemoryMode.ON_HEAP, schema);
this.sampler = new HollowObjectSampler(schema, DisabledSamplingDirector.INSTANCE);
this.unfilteredSchema = schema;

this.shardsVolatile = new ShardsHolder(schema, numShards);
int shardOrdinalShift = 0;
this.shardsVolatile = new ShardsHolder(schema, new HollowObjectTypeDataElements[] {dataElements}, new int[] {shardOrdinalShift});
this.maxOrdinal = dataElements.maxOrdinal;
}

@Override
Expand All @@ -122,17 +112,26 @@ public int maxOrdinal() {

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler) throws IOException {
if(shardsVolatile.shards.length > 1)
throw new IllegalStateException("Object type read state requires numShards when reading snapshot");
}

@Override
public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler, int numShards) throws IOException {
if(numShards > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i=0; i<shardsVolatile.shards.length; i++) {
HollowObjectTypeDataElements snapshotData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData.readSnapshot(in, unfilteredSchema);
shardsVolatile.shards[i].setCurrentData(snapshotData);
HollowObjectTypeDataElements[] snapshotData = new HollowObjectTypeDataElements[numShards];
int shardOrdinalShifts[] = new int[numShards];
for(int i=0; i<numShards; i++) {
snapshotData[i] = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData[i].readSnapshot(in, unfilteredSchema);
shardOrdinalShifts[i] = 31 - Integer.numberOfLeadingZeros(numShards);
}

shardsVolatile = new ShardsHolder(getSchema(), snapshotData, shardOrdinalShifts);

if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].currentDataElements().maxOrdinal;
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;

SnapshotPopulatedOrdinalsReader.readOrdinals(in, stateListeners);
}
Expand All @@ -153,7 +152,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen
if(!deltaData.encodedRemovals.isEmpty())
notifyListenerAboutDeltaChanges(deltaData.encodedRemovals, deltaData.encodedAdditions, i, shardsVolatile.shards.length);

HollowObjectTypeDataElements currentData = shardsVolatile.shards[i].currentDataElements();
HollowObjectTypeDataElements currentData = shardsVolatile.shards[i].dataElements;
GapEncodedVariableLengthIntegerReader oldRemovals = currentData.encodedRemovals == null ? GapEncodedVariableLengthIntegerReader.EMPTY_READER : currentData.encodedRemovals;
if(oldRemovals.isEmpty()) {
currentData.encodedRemovals = deltaData.encodedRemovals;
Expand All @@ -169,7 +168,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen
deltaData.encodedAdditions.destroy();
} else {
HollowObjectTypeDataElements nextData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
HollowObjectTypeDataElements oldData = shardsVolatile.shards[i].currentDataElements();
HollowObjectTypeDataElements oldData = shardsVolatile.shards[i].dataElements;
nextData.applyDelta(oldData, deltaData);

HollowObjectTypeReadStateShard newShard = new HollowObjectTypeReadStateShard(getSchema(), nextData, shardsVolatile.shards[i].shardOrdinalShift);
Expand All @@ -184,7 +183,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen
}

if(shardsVolatile.shards.length == 1)
maxOrdinal = shardsVolatile.shards[0].currentDataElements().maxOrdinal;
maxOrdinal = shardsVolatile.shards[0].dataElements.maxOrdinal;
}

/**
Expand Down Expand Up @@ -234,7 +233,7 @@ void reshard(int newNumShards) {
// sufficient to serve all reads into this shard. Once all child shards for a pre-split parent
// shard have been assigned the split data elements, the parent data elements can be discarded.
for(int i=0; i<prevNumShards; i++) {
HollowObjectTypeDataElements originalDataElements = shardsVolatile.shards[i].currentDataElements();
HollowObjectTypeDataElements originalDataElements = shardsVolatile.shards[i].dataElements;

shardsVolatile = splitDataElementsForOneShard(shardsVolatile, i, prevNumShards, shardingFactor);

Expand Down Expand Up @@ -275,7 +274,7 @@ void reshard(int newNumShards) {

private void copyShardElements(ShardsHolder from, HollowObjectTypeDataElements[] newDataElements, int[] shardOrdinalShifts) {
for (int i=0; i<from.shards.length; i++) {
newDataElements[i] = from.shards[i].currentDataElements();
newDataElements[i] = from.shards[i].dataElements;
shardOrdinalShifts[i] = from.shards[i].shardOrdinalShift;
}
}
Expand All @@ -284,7 +283,7 @@ private HollowObjectTypeDataElements[] joinCandidates(HollowObjectTypeReadStateS
HollowObjectTypeDataElements[] result = new HollowObjectTypeDataElements[shardingFactor];
int newNumShards = shards.length / shardingFactor;
for (int i=0; i<shardingFactor; i++) {
result[i] = shards[indexIntoShards + (newNumShards*i)].currentDataElements();
result[i] = shards[indexIntoShards + (newNumShards*i)].dataElements;
};
return result;
}
Expand Down Expand Up @@ -317,7 +316,7 @@ ShardsHolder expandWithOriginalDataElements(ShardsHolder shardsHolder, int shard

for(int i=0; i<prevNumShards; i++) {
for (int j=0; j<shardingFactor; j++) {
newDataElements[i+(prevNumShards*j)] = shardsHolder.shards[i].currentDataElements();
newDataElements[i+(prevNumShards*j)] = shardsHolder.shards[i].dataElements;
shardOrdinalShifts[i+(prevNumShards*j)] = 31 - Integer.numberOfLeadingZeros(prevNumShards);
}
}
Expand All @@ -328,7 +327,7 @@ ShardsHolder splitDataElementsForOneShard(ShardsHolder shardsHolder, int current
int newNumShards = shardsHolder.shards.length;
int newShardOrdinalShift = 31 - Integer.numberOfLeadingZeros(newNumShards);

HollowObjectTypeDataElements dataElementsToSplit = shardsHolder.shards[currentIndex].currentDataElements();
HollowObjectTypeDataElements dataElementsToSplit = shardsHolder.shards[currentIndex].dataElements;

HollowObjectTypeDataElementsSplitter splitter = new HollowObjectTypeDataElementsSplitter();
HollowObjectTypeDataElements[] splits = splitter.split(dataElementsToSplit, shardingFactor);
Expand Down Expand Up @@ -462,7 +461,7 @@ public double readDouble(int ordinal, int fieldIndex) {
do {
shardsHolder = this.shardsVolatile;
shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
value = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex); // SNAP: Here:
value = shard.readDouble(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));

if(value == HollowObjectWriteRecord.NULL_DOUBLE_BITS)
Expand Down Expand Up @@ -621,8 +620,7 @@ private boolean readWasUnsafe(ShardsHolder shardsHolder) {
// whereas the latter is scoped to the particular load or store.
//
// For more details see http://gee.cs.oswego.edu/dl/html/j9mm.html
//
// [Credit: Paul Sandoz is the original author of this comment]
// [Comment credit: Paul Sandoz]
//
HollowUnsafeHandle.getUnsafe().loadFence();
return shardsHolder != shardsVolatile;
Expand Down Expand Up @@ -656,7 +654,7 @@ protected void invalidate() {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
stateListeners = EMPTY_LISTENERS;
for(int i=0;i<shards.length;i++)
shards[i].invalidate();
shards[i] = null; // SNAP: TODO: Confirm that this has the desired effect, usage in object longevity
}

@Override
Expand All @@ -679,7 +677,7 @@ HollowObjectTypeDataElements[] currentDataElements() {
HollowObjectTypeDataElements currentDataElements[] = new HollowObjectTypeDataElements[shards.length];

for(int i=0;i<shards.length;i++)
currentDataElements[i] = shards[i].currentDataElements();
currentDataElements[i] = shards[i].dataElements;

return currentDataElements;
}
Expand Down Expand Up @@ -722,14 +720,6 @@ public long getApproximateHoleCostInBytes() {

return totalApproximateHoleCostInBytes;
}

void setCurrentData(HollowObjectTypeDataElements data) {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
if(shards.length > 1)
throw new UnsupportedOperationException("Cannot directly set data on sharded type state");
shards[0].setCurrentData(data);
maxOrdinal = data.maxOrdinal;
}

@Override
public int numShards() {
Expand Down
Loading

0 comments on commit 8807ee6

Please sign in to comment.