Skip to content

Commit

Permalink
Some producer-side tests for resharding collections
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Sep 9, 2024
1 parent 295e8ef commit 4220188
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,6 @@ public HollowObjectSchema getSchema() {
return (HollowObjectSchema)schema;
}

boolean allowTypeResharding() {
if (stateEngine.allowTypeResharding()) {
if (isNumShardsPinned()) {
LOG.warning("Type re-sharding feature was enabled but num shards is pinned (likely using the " +
"HollowShardLargeType annotation in the data model). Proceeding with fixed num shards.");
return false;
}
}
return stateEngine.allowTypeResharding();
}

/**
* Called to perform a state transition.<p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.BitSet;
import java.util.logging.Logger;

/**
* The {@link HollowTypeWriteState} contains and is the root handle to all of the records of a specific type in
* a {@link HollowWriteStateEngine}.
*/
public abstract class HollowTypeWriteState {
private static final Logger LOG = Logger.getLogger(HollowTypeWriteState.class.getName());

protected final HollowSchema schema;

Expand Down Expand Up @@ -389,4 +391,18 @@ public HollowWriteStateEngine getStateEngine() {
return stateEngine;
}

public boolean allowTypeResharding() {
if (this instanceof HollowObjectTypeWriteState) {
if (stateEngine.allowTypeResharding()) {
if (isNumShardsPinned()) {
LOG.warning("Type re-sharding feature was enabled but num shards is pinned (likely using the " +
"HollowShardLargeType annotation in the data model). Proceeding with fixed num shards.");
return false;
}
}
return stateEngine.allowTypeResharding();
} else {
return false; // only supported for object types
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,185 +28,4 @@ public void testCalcMaxShardOrdinal() {
assertTrue(Arrays.equals(new int[] {0, 0, 0, 0}, testState.calcMaxShardOrdinal(3, 4)));
assertTrue(Arrays.equals(new int[] {1, 1, 1, 1}, testState.calcMaxShardOrdinal(7, 4)));
}


@Test
public void testReverseDeltaNumShardsWhenNewType() {
InMemoryBlobStore blobStore = new InMemoryBlobStore();
HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager();

HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager).withTargetMaxTypeShardSize(32).build();
p1.initializeDataModel(String.class);
long v1 = p1.runCycle(ws -> {
ws.add("A");
});

HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager).withTargetMaxTypeShardSize(32).build();
p2.initializeDataModel(String.class, Long.class);
p2.restore(v1, blobStore);
long v2 = p2.runCycle(ws -> {
ws.add("A");
ws.add("B");
for (int i=0; i<50; i++) {
ws.add(new Long(i));
}
});

HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore)
.withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() {
@Override
public boolean allowDoubleSnapshot() {
return false;
}
@Override
public int maxDeltasBeforeDoubleSnapshot() {
return Integer.MAX_VALUE;
}
})
.build();
consumer.triggerRefreshTo(v2);
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards());

consumer.triggerRefreshTo(v1); // reverse delta transition for new type with customNumShards
assertEquals(v1, consumer.getCurrentVersionId());
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards());
}

// SNAP: TODO: add test for collections
@Test
public void testReverseDeltaNumShardsWhenTypeDropsToZeroRecords() {
InMemoryBlobStore blobStore = new InMemoryBlobStore();
HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager();

HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager)
.withTypeResharding(true).withTargetMaxTypeShardSize(32).build();
p1.initializeDataModel(String.class, Long.class);
long v1 = p1.runCycle(ws -> {
// override cycle start time with a strictly incrementing count to work around clock skew
ws.add("A");
for (int i=0; i<50; i++) { // results in 2 shards at shard size 32
ws.add(new Long(i));
}
});

HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore)
.withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() {
@Override
public boolean allowDoubleSnapshot() {
return false;
}
@Override
public int maxDeltasBeforeDoubleSnapshot() {
return Integer.MAX_VALUE;
}
})
.build();
consumer.triggerRefreshTo(v1);
assertEquals(v1, consumer.getCurrentVersionId());
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards());

long v2 = p1.runCycle(ws -> {
ws.add("A");
});
consumer.triggerRefreshTo(v2);
assertEquals(v2, consumer.getCurrentVersionId());
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type has a ghost record

long v3 = p1.runCycle(ws -> {
// override cycle start time with a strictly incrementing count to work around clock skew
ws.add("A");
ws.add("B");
});
consumer.triggerRefreshTo(v3);
assertEquals(v3, consumer.getCurrentVersionId());
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type dropped all records

long v4 = p1.runCycle(ws -> {
// override cycle start time with a strictly incrementing count to work around clock skew
ws.add("A");
for (int i=0; i<50; i++) { // results in 2 shards at shard size 32
ws.add(new Long(i));
}
});
consumer.triggerRefreshTo(v4);
assertEquals(v4, consumer.getCurrentVersionId());
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type has 1 record again
}

// SNAP: TODO: add test for collections
@Test
public void testNoReshardingIfNumShardsPinnedByAnnotation() {
HollowWriteStateEngine wse = new HollowWriteStateEngine();
new HollowObjectMapper(wse).initializeTypeState(TypeWithPinnedNumShards.class);
HollowObjectTypeWriteState typeWriteState = (HollowObjectTypeWriteState) wse.getTypeState("TypeWithPinnedNumShards");
assertFalse(typeWriteState.allowTypeResharding());
}

// SNAP: TODO: add test for collections
@Test
public void testRestoreNumShardsButDoNotPin() {
InMemoryBlobStore blobStore = new InMemoryBlobStore();
HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager();

HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager)
.withTargetMaxTypeShardSize(32).build();
p1.initializeDataModel(Long.class);
long v1 = p1.runCycle(ws -> {
// override cycle start time with a strictly incrementing count to work around clock skew
for (int i=0; i<50; i++) { // results in 2 shards at shard size 32
ws.add(new Long(i));
}
});

HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore)
.withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() {
@Override
public boolean allowDoubleSnapshot() {
return false;
}
@Override
public int maxDeltasBeforeDoubleSnapshot() {
return Integer.MAX_VALUE;
}
})
.build();
consumer.triggerRefreshTo(v1);
assertEquals(v1, consumer.getCurrentVersionId());
assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards());

HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager)
.withTypeResharding(true).withTargetMaxTypeShardSize(32).build();
p2.initializeDataModel(Long.class);
p2.restore(v1, blobStore);
assertEquals(2, p2.getWriteEngine().getTypeState("Long").numShards);
assertFalse(p2.getWriteEngine().getTypeState("Long").isNumShardsPinned());

long v2 = p2.runCycle(ws -> {
for (int i=0; i<100; i++) { // results in 2 shards at shard size 32
ws.add(new Long(i));
}
});

HollowConsumer consumer2 = HollowConsumer.withBlobRetriever(blobStore)
.withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() {
@Override
public boolean allowDoubleSnapshot() {
return false;
}
@Override
public int maxDeltasBeforeDoubleSnapshot() {
return Integer.MAX_VALUE;
}
})
.build();
consumer2.triggerRefreshTo(v2);
int newNumShards = consumer2.getStateEngine().getTypeState("Long").numShards();
assertEquals(v2, consumer2.getCurrentVersionId());
assertEquals(4, newNumShards);
}

@HollowShardLargeType(numShards=4)
private static class TypeWithPinnedNumShards {
private int value;
}
}
Loading

0 comments on commit 4220188

Please sign in to comment.