Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 7, 2023
1 parent 368e360 commit 83f0544
Showing 1 changed file with 56 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,29 @@ public class HollowObjectTypeReadState extends HollowTypeReadState implements Ho
private final HollowObjectSchema unfilteredSchema;
private final HollowObjectSampler sampler;

static class ShardsHolder {
private static class ShardsHolder {
final HollowObjectTypeReadStateShard shards[];
final int shardNumberMask;

public ShardsHolder(int numShards) {
this.shards = new HollowObjectTypeReadStateShard[numShards];
private ShardsHolder(HollowSchema schema, int numShards) {
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
for(int i=0; i<numShards; i++) {
shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, shardOrdinalShift);
}
this.shards = shards;
this.shardNumberMask = numShards - 1;
}

public ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataElements, int[] shardOrdinalShifts) {
private ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataElements, int[] shardOrdinalShifts) {
int numShards = dataElements.length;
this.shardNumberMask = numShards - 1;
this.shards = new HollowObjectTypeReadStateShard[numShards];
HollowObjectTypeReadStateShard[] shards = new HollowObjectTypeReadStateShard[numShards];
for (int i=0; i<numShards; i++) {
this.shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, shardOrdinalShifts[i]);
this.shards[i].setCurrentData(dataElements[i]);
shards[i] = new HollowObjectTypeReadStateShard((HollowObjectSchema) schema, shardOrdinalShifts[i]);
shards[i].setCurrentData(dataElements[i]);
}
}

public HollowObjectTypeReadStateShard[] getShards() { // TODO: package private
return shards;
this.shards = shards;
this.shardNumberMask = numShards - 1;
}
}

Expand All @@ -83,14 +85,11 @@ public HollowObjectTypeReadState(HollowReadStateEngine fileEngine, MemoryMode me
this.sampler = new HollowObjectSampler(schema, DisabledSamplingDirector.INSTANCE);
this.unfilteredSchema = unfilteredSchema;

int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards); // numShards = 4 => shardOrdinalShift = 2. Ordinal 4 = 100, shardOrdinal = 100 >> 2 == 1 (in shard 0). Ordinal 10 = 1010, shardOrdinal = 1010 >> 2 = 2 (in shard 2)
int shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(numShards);
if(numShards < 1 || 1 << shardOrdinalShift != numShards)
throw new IllegalArgumentException("Number of shards must be a power of 2!");

this.shardsVolatile = new ShardsHolder(numShards);
for(int i=0;i<numShards;i++) {
this.shardsVolatile.shards[i] = new HollowObjectTypeReadStateShard(schema, shardOrdinalShift);
}
this.shardsVolatile = new ShardsHolder(schema, numShards);
}

@Override
Expand All @@ -108,7 +107,7 @@ public void readSnapshot(HollowBlobInput in, ArraySegmentRecycler memoryRecycler
if(shardsVolatile.shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i = 0; i< shardsVolatile.shards.length; i++) {
for(int i=0; i<shardsVolatile.shards.length; i++) {
HollowObjectTypeDataElements snapshotData = new HollowObjectTypeDataElements(getSchema(), memoryMode, memoryRecycler);
snapshotData.readSnapshot(in, unfilteredSchema);
shardsVolatile.shards[i].setCurrentData(snapshotData);
Expand All @@ -128,7 +127,7 @@ public void applyDelta(HollowBlobInput in, HollowSchema deltaSchema, ArraySegmen
if(shardsVolatile.shards.length > 1)
maxOrdinal = VarInt.readVInt(in);

for(int i = 0; i< shardsVolatile.shards.length; i++) {
for(int i=0; i<shardsVolatile.shards.length; i++) {
HollowObjectTypeDataElements deltaData = new HollowObjectTypeDataElements((HollowObjectSchema)deltaSchema, memoryMode, memoryRecycler);
deltaData.readDelta(in);
if(stateEngine.isSkipTypeShardUpdateWithNoAdditions() && deltaData.encodedAdditions.isEmpty()) {
Expand Down Expand Up @@ -213,7 +212,6 @@ public void reshard(int newNumShards) { // TODO: package private

shardsVolatile = splitDataElementsForOneShard(shardsVolatile, i, prevNumShards, shardingFactor);

// it is now safe to destroy pre-split data elements
destroyOriginalDataElements(originalDataElements);
}
// Re-sharding done.
Expand All @@ -237,7 +235,6 @@ public void reshard(int newNumShards) { // TODO: package private

shardsVolatile = joinDataElementsForOneShard(shardsVolatile, i, shardingFactor); // atomic update to shardsVolatile

// it is now safe to destroy original data elements
for (int j = 0; j < shardingFactor; j ++) {
destroyOriginalDataElements(dataElementsToJoin[j]);
};
Expand Down Expand Up @@ -293,8 +290,8 @@ ShardsHolder expandWithOriginalDataElements(ShardsHolder shardsHolder, int shard
HollowObjectTypeDataElements[] newDataElements = new HollowObjectTypeDataElements[newNumShards];
int[] shardOrdinalShifts = new int[newNumShards];

for(int i = 0; i< prevNumShards; i++) {
for (int j = 0; j < shardingFactor; j ++) {
for(int i=0; i<prevNumShards; i++) {
for (int j=0; j<shardingFactor; j++) {
newDataElements[i+(prevNumShards*j)] = shardsHolder.shards[i].currentDataElements();
shardOrdinalShifts[i+(prevNumShards*j)] = 31 - Integer.numberOfLeadingZeros(prevNumShards);
}
Expand Down Expand Up @@ -347,15 +344,31 @@ public static void discardType(HollowBlobInput in, HollowObjectSchema schema, in
@Override
public boolean isNull(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.isNull(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
boolean result;

do {
shardsHolder = this.shardsVolatile; // this read can return a stale shardHolder with greater or lesser than current
// num shards but maxOrdinal remains same across stale vs current. So given this
// atomic assignment below operations on a stale shards holder will be legal
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.isNull(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
public int readOrdinal(int ordinal, int fieldIndex) {
sampler.recordFieldAccess(fieldIndex);
HollowObjectTypeReadStateShard shard = shardsVolatile.shards[ordinal & shardsVolatile.shardNumberMask];
return shard.readOrdinal(ordinal >> shard.shardOrdinalShift, fieldIndex);
HollowObjectTypeReadState.ShardsHolder shardsHolder;
int result;

do {
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readOrdinal(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
return result;
}

@Override
Expand All @@ -365,9 +378,7 @@ public int readInt(int ordinal, int fieldIndex) {
int result;

do {
shardsHolder = this.shardsVolatile; // this read can return a stale shardHolder with greater or lesser than current
// num shards but maxOrdinal remains same across stale vs current. So given this
// atomic assignment below operations on a stale shards holder will be legal
shardsHolder = this.shardsVolatile;
HollowObjectTypeReadStateShard shard = shardsHolder.shards[ordinal & shardsHolder.shardNumberMask];
result = shard.readInt(ordinal >> shard.shardOrdinalShift, fieldIndex);
} while(readWasUnsafe(shardsHolder));
Expand Down Expand Up @@ -497,7 +508,7 @@ private boolean readWasUnsafe(ShardsHolder shardsHolder) {
* @return the number of bits required for the field
*/
public int bitsRequiredForField(String fieldName) {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
int maxBitsRequiredForField = shards[0].bitsRequiredForField(fieldName);

for(int i=1;i<shards.length;i++) {
Expand All @@ -516,10 +527,10 @@ public HollowSampler getSampler() {

@Override
protected void invalidate() {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
ShardsHolder shardsHolder = this.shardsVolatile;
stateListeners = EMPTY_LISTENERS;
for(int i=0;i<shards.length;i++)
shards[i].invalidate();
for(int i=0;i<shardsHolder.shards.length;i++)
shardsHolder.shards[i].invalidate();
}

@Override
Expand All @@ -536,9 +547,12 @@ public void setFieldSpecificSamplingDirector(HollowFilterConfig fieldSpec, Hollo
public void ignoreUpdateThreadForSampling(Thread t) {
sampler.setUpdateThread(t);
}


/**
* Warning: Not thread-safe. Should only be called within the update thread.
*/
HollowObjectTypeDataElements[] currentDataElements() {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
HollowObjectTypeDataElements currentDataElements[] = new HollowObjectTypeDataElements[shards.length];

for(int i=0;i<shards.length;i++)
Expand All @@ -549,15 +563,14 @@ HollowObjectTypeDataElements[] currentDataElements() {

@Override
protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema) {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
if(!(withSchema instanceof HollowObjectSchema))
throw new IllegalArgumentException("HollowObjectTypeReadState can only calculate checksum with a HollowObjectSchema: " + getSchema().getName());

BitSet populatedOrdinals = getPopulatedOrdinals();

for(int i=0;i<shards.length;i++) {
// if (shards[i].shardOrdinalShift != 0) { // SNAP: TODO: detect virtual shard
if (false) {
if (shards[i].shardOrdinalShift != (31 - Integer.numberOfLeadingZeros(shards.length))) {
throw new UnsupportedOperationException("applyToChecksum called for virtual shard, unexpected"); // SNAP: TODO: remove this altogether, or support applyToChecksum for virtual shards
}
shards[i].applyToChecksum(checksum, withSchema, populatedOrdinals, i, shards.length);
Expand All @@ -566,7 +579,7 @@ protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema)

@Override
public long getApproximateHeapFootprintInBytes() {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
long totalApproximateHeapFootprintInBytes = 0;

for(int i=0;i<shards.length;i++) {
Expand All @@ -578,7 +591,7 @@ public long getApproximateHeapFootprintInBytes() {

@Override
public long getApproximateHoleCostInBytes() {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
long totalApproximateHoleCostInBytes = 0;

BitSet populatedOrdinals = getPopulatedOrdinals();
Expand All @@ -599,8 +612,7 @@ void setCurrentData(HollowObjectTypeDataElements data) {

@Override
public int numShards() {
HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
return shards.length;
return this.shardsVolatile.shards.length;
}

}

0 comments on commit 83f0544

Please sign in to comment.