Skip to content

Commit

Permalink
Cleanup of object read state
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Oct 11, 2023
1 parent 7bd70c7 commit b3dd295
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class HollowObjectTypeReadState extends HollowTypeReadState implements Ho
private final HollowObjectSchema unfilteredSchema;
private final HollowObjectSampler sampler;

public static class ShardsHolder { // SNAP: TODO: remove
public final HollowObjectTypeReadStateShard shards[]; // SNAP: TODO: remove
static class ShardsHolder {
final HollowObjectTypeReadStateShard shards[];
final int shardNumberMask;

private ShardsHolder(HollowSchema schema, int numShards) {
Expand All @@ -71,7 +71,7 @@ private ShardsHolder(HollowSchema schema, HollowObjectTypeDataElements[] dataEle
}
}

volatile ShardsHolder shardsVolatile; // SNAP: TODO: remove
volatile ShardsHolder shardsVolatile;

private int maxOrdinal;

Expand Down Expand Up @@ -189,7 +189,7 @@ static int shardingFactor(int oldNumShards, int newNumShards) {
*
* @param newNumShards The desired number of shards
*/
public void reshard(int newNumShards) { // TODO: package private
void reshard(int newNumShards) {
int prevNumShards = shardsVolatile.shards.length;
int shardingFactor = shardingFactor(prevNumShards, newNumShards);
HollowObjectTypeDataElements[] newDataElements;
Expand Down Expand Up @@ -558,17 +558,16 @@ HollowObjectTypeDataElements[] currentDataElements() {

@Override
protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema) {
final HollowObjectTypeReadStateShard[] shards = this.shardsVolatile.shards;
final ShardsHolder shardsHolder = this.shardsVolatile;
final HollowObjectTypeReadStateShard[] shards = shardsHolder.shards;
int shardNumberMask = shardsHolder.shardNumberMask;
if(!(withSchema instanceof HollowObjectSchema))
throw new IllegalArgumentException("HollowObjectTypeReadState can only calculate checksum with a HollowObjectSchema: " + getSchema().getName());

BitSet populatedOrdinals = getPopulatedOrdinals();

for(int i=0;i<shards.length;i++) {
if (shards[i].shardOrdinalShift != (31 - Integer.numberOfLeadingZeros(shards.length))) {
throw new UnsupportedOperationException("applyToChecksum called for virtual shard, unexpected"); // SNAP: TODO: remove this altogether, or support applyToChecksum for virtual shards
}
shards[i].applyToChecksum(checksum, withSchema, populatedOrdinals, i, shards.length);
shards[i].applyToChecksum(checksum, withSchema, populatedOrdinals, i, shardNumberMask);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ void setCurrentData(HollowObjectTypeDataElements data) {
this.currentDataVolatile = data;
}

protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema, BitSet populatedOrdinals, int shardNumber, int numShards) {
protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema, BitSet populatedOrdinals, int shardNumber, int shardNumberMask) {
if(!(withSchema instanceof HollowObjectSchema))
throw new IllegalArgumentException("HollowObjectTypeReadState can only calculate checksum with a HollowObjectSchema: " + schema.getName());

Expand All @@ -412,8 +412,8 @@ protected void applyToChecksum(HollowChecksum checksum, HollowSchema withSchema,
HollowObjectTypeDataElements currentData = currentDataVolatile;
int ordinal = populatedOrdinals.nextSetBit(0);
while(ordinal != ORDINAL_NONE) {
if((ordinal & (numShards - 1)) == shardNumber) {
int shardOrdinal = ordinal / numShards;
if((ordinal & shardNumberMask) == shardNumber) {
int shardOrdinal = ordinal >> shardOrdinalShift;
checksum.applyInt(ordinal);
for(int i=0;i<fieldIndexes.length;i++) {
int fieldIdx = fieldIndexes[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public HollowObjectSchema getSchema() {
}

private boolean isDynamicTypeShardingEnabled() { // SNAP: TODO: configurable
// SNAP: TODO: enforce integrity check
// SNAP: TODO: publish metric
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,81 +78,81 @@ public void testJoinDifferentFieldWidths() throws IOException {
}

// TODO: manually invoked, depends on producer side changes for supporting changing numShards in a delta chain
// @Test
// public void testLopsidedShards() {
// InMemoryBlobStore blobStore = new InMemoryBlobStore();
// HollowProducer p = HollowProducer.withPublisher(blobStore)
// .withBlobStager(new HollowInMemoryBlobStager())
// .build();
//
// p.initializeDataModel(schema);
// int targetSize = 64;
// p.getWriteEngine().setTargetMaxTypeShardSize(targetSize);
// long v1 = oneRunCycle(p, new int[] {0, 1, 2, 3, 4, 5, 6, 7});
//
// HollowConsumer c = HollowConsumer
// .withBlobRetriever(blobStore)
// .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() {
// @Override
// public boolean allowDoubleSnapshot() {
// return false;
// }
//
// @Override
// public int maxDeltasBeforeDoubleSnapshot() {
// return Integer.MAX_VALUE;
// }
// })
// .withSkipTypeShardUpdateWithNoAdditions()
// .build();
// c.triggerRefreshTo(v1);
//
// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());
// assertEquals(true, c.getStateEngine().isSkipTypeShardUpdateWithNoAdditions());
//
// long v2 = oneRunCycle(p, new int[] {0, 1, 2, 3, 5, 7});
// c.triggerRefreshTo(v2);
// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());
//
// long v3 = oneRunCycle(p, new int[] { 0, 1, 3, 5}); // drop to 1 ordinal per shard, skipTypeShardWithNoAdds will make it so that maxOrdinal is adjusted
// c.triggerRefreshTo(v3);
// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());
//
// long v4 = oneRunCycle(p, new int[] { 0, 1, 2, 3}); // now add another ordinal to one shard, maxOrdinals will be lopsided
// c.triggerRefreshTo(v4);
// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());
//
// readStateEngine = c.getStateEngine();
// assertDataUnchanged(3);
//
// long v5 = oneRunCycle(p, new int[] {0, 1});
//
// // assert lopsided shards before join
// assertEquals(2, ((HollowObjectTypeReadState) c.getStateEngine().getTypeState("TestObject")).shardsVolatile.shards[0].currentDataElements().maxOrdinal);
// assertEquals(3, ((HollowObjectTypeReadState) c.getStateEngine().getTypeState("TestObject")).shardsVolatile.shards[1].currentDataElements().maxOrdinal);
// c.triggerRefreshTo(v5);
// assertEquals(1, c.getStateEngine().getTypeState("TestObject").numShards()); // joined to 1 shard
// readStateEngine = c.getStateEngine();
// assertDataUnchanged(2);
//
// long v6 = oneRunCycle(p, new int[] {0, 1, 2, 3, 4, 5 });
// c.triggerRefreshTo(v6);
// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); // split to 2 shards
//
// long v7 = oneRunCycle(p, new int[] {8, 9});
// c.triggerRefreshTo(v7);
// assertEquals(4, c.getStateEngine().getTypeState("TestObject").numShards()); // still 2 shards
//
// long v8 = oneRunCycle(p, new int[] {8});
// c.triggerRefreshTo(v8);
// assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); // down to 1 shard
//
// c.triggerRefreshTo(v1);
// assertEquals(v1, c.getCurrentVersionId());
//
// c.triggerRefreshTo(v8);
// assertEquals(v8, c.getCurrentVersionId());
// }
@Test
public void testLopsidedShards() {
InMemoryBlobStore blobStore = new InMemoryBlobStore();
HollowProducer p = HollowProducer.withPublisher(blobStore)
.withBlobStager(new HollowInMemoryBlobStager())
.build();

p.initializeDataModel(schema);
int targetSize = 64;
p.getWriteEngine().setTargetMaxTypeShardSize(targetSize);
long v1 = oneRunCycle(p, new int[] {0, 1, 2, 3, 4, 5, 6, 7});

HollowConsumer c = HollowConsumer
.withBlobRetriever(blobStore)
.withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() {
@Override
public boolean allowDoubleSnapshot() {
return false;
}

@Override
public int maxDeltasBeforeDoubleSnapshot() {
return Integer.MAX_VALUE;
}
})
.withSkipTypeShardUpdateWithNoAdditions()
.build();
c.triggerRefreshTo(v1);

assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());
assertEquals(true, c.getStateEngine().isSkipTypeShardUpdateWithNoAdditions());

long v2 = oneRunCycle(p, new int[] {0, 1, 2, 3, 5, 7});
c.triggerRefreshTo(v2);
assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());

long v3 = oneRunCycle(p, new int[] { 0, 1, 3, 5}); // drop to 1 ordinal per shard, skipTypeShardWithNoAdds will make it so that maxOrdinal is adjusted
c.triggerRefreshTo(v3);
assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());

long v4 = oneRunCycle(p, new int[] { 0, 1, 2, 3}); // now add another ordinal to one shard, maxOrdinals will be lopsided
c.triggerRefreshTo(v4);
assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards());

readStateEngine = c.getStateEngine();
assertDataUnchanged(3);

long v5 = oneRunCycle(p, new int[] {0, 1});

// assert lopsided shards before join
assertEquals(2, ((HollowObjectTypeReadState) c.getStateEngine().getTypeState("TestObject")).shardsVolatile.shards[0].currentDataElements().maxOrdinal);
assertEquals(3, ((HollowObjectTypeReadState) c.getStateEngine().getTypeState("TestObject")).shardsVolatile.shards[1].currentDataElements().maxOrdinal);
c.triggerRefreshTo(v5);
assertEquals(1, c.getStateEngine().getTypeState("TestObject").numShards()); // joined to 1 shard
readStateEngine = c.getStateEngine();
assertDataUnchanged(2);

long v6 = oneRunCycle(p, new int[] {0, 1, 2, 3, 4, 5 });
c.triggerRefreshTo(v6);
assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); // split to 2 shards

long v7 = oneRunCycle(p, new int[] {8, 9});
c.triggerRefreshTo(v7);
assertEquals(4, c.getStateEngine().getTypeState("TestObject").numShards()); // still 2 shards

long v8 = oneRunCycle(p, new int[] {8});
c.triggerRefreshTo(v8);
assertEquals(2, c.getStateEngine().getTypeState("TestObject").numShards()); // down to 1 shard

c.triggerRefreshTo(v1);
assertEquals(v1, c.getCurrentVersionId());

c.triggerRefreshTo(v8);
assertEquals(v8, c.getCurrentVersionId());
}

private long oneRunCycle(HollowProducer p, int recordIds[]) {
return p.runCycle(state -> {
Expand Down

0 comments on commit b3dd295

Please sign in to comment.