From 42201884e92561022853e721f64371c3d1d213f8 Mon Sep 17 00:00:00 2001 From: Sunjeet Singh Date: Mon, 9 Sep 2024 05:57:31 -0700 Subject: [PATCH] Some producer-side tests for resharding collections --- .../write/HollowObjectTypeWriteState.java | 11 - .../core/write/HollowTypeWriteState.java | 16 ++ .../write/HollowObjectTypeWriteStateTest.java | 181 -------------- .../core/write/HollowTypeWriteStateTest.java | 226 ++++++++++++++++++ 4 files changed, 242 insertions(+), 192 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java index 387256813d..36e22e14e7 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowObjectTypeWriteState.java @@ -67,17 +67,6 @@ public HollowObjectSchema getSchema() { return (HollowObjectSchema)schema; } - boolean allowTypeResharding() { - if (stateEngine.allowTypeResharding()) { - if (isNumShardsPinned()) { - LOG.warning("Type re-sharding feature was enabled but num shards is pinned (likely using the " + - "HollowShardLargeType annotation in the data model). Proceeding with fixed num shards."); - return false; - } - } - return stateEngine.allowTypeResharding(); - } - /** * Called to perform a state transition.

* diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java index 9c653082c5..c0308a52c2 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowTypeWriteState.java @@ -32,12 +32,14 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.BitSet; +import java.util.logging.Logger; /** * The {@link HollowTypeWriteState} contains and is the root handle to all of the records of a specific type in * a {@link HollowWriteStateEngine}. */ public abstract class HollowTypeWriteState { + private static final Logger LOG = Logger.getLogger(HollowTypeWriteState.class.getName()); protected final HollowSchema schema; @@ -389,4 +391,18 @@ public HollowWriteStateEngine getStateEngine() { return stateEngine; } + public boolean allowTypeResharding() { + if (this instanceof HollowObjectTypeWriteState) { + if (stateEngine.allowTypeResharding()) { + if (isNumShardsPinned()) { + LOG.warning("Type re-sharding feature was enabled but num shards is pinned (likely using the " + + "HollowShardLargeType annotation in the data model). Proceeding with fixed num shards."); + return false; + } + } + return stateEngine.allowTypeResharding(); + } else { + return false; // only supported for object types + } + } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/write/HollowObjectTypeWriteStateTest.java b/hollow/src/test/java/com/netflix/hollow/core/write/HollowObjectTypeWriteStateTest.java index 698078e793..9ac822d769 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/write/HollowObjectTypeWriteStateTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/write/HollowObjectTypeWriteStateTest.java @@ -28,185 +28,4 @@ public void testCalcMaxShardOrdinal() { assertTrue(Arrays.equals(new int[] {0, 0, 0, 0}, testState.calcMaxShardOrdinal(3, 4))); assertTrue(Arrays.equals(new int[] {1, 1, 1, 1}, testState.calcMaxShardOrdinal(7, 4))); } - - - @Test - public void testReverseDeltaNumShardsWhenNewType() { - InMemoryBlobStore blobStore = new InMemoryBlobStore(); - HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); - - HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager).withTargetMaxTypeShardSize(32).build(); - p1.initializeDataModel(String.class); - long v1 = p1.runCycle(ws -> { - ws.add("A"); - }); - - HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager).withTargetMaxTypeShardSize(32).build(); - p2.initializeDataModel(String.class, Long.class); - p2.restore(v1, blobStore); - long v2 = p2.runCycle(ws -> { - ws.add("A"); - ws.add("B"); - for (int i=0; i<50; i++) { - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer.triggerRefreshTo(v2); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - - consumer.triggerRefreshTo(v1); // reverse delta transition for new type with customNumShards - assertEquals(v1, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - } - - // SNAP: TODO: add test for collections - @Test - public void testReverseDeltaNumShardsWhenTypeDropsToZeroRecords() { - InMemoryBlobStore blobStore = new InMemoryBlobStore(); - HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); - - HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) - .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); - p1.initializeDataModel(String.class, Long.class); - long v1 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - ws.add("A"); - for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer.triggerRefreshTo(v1); - assertEquals(v1, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - - long v2 = p1.runCycle(ws -> { - ws.add("A"); - }); - consumer.triggerRefreshTo(v2); - assertEquals(v2, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type has a ghost record - - long v3 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - ws.add("A"); - ws.add("B"); - }); - consumer.triggerRefreshTo(v3); - assertEquals(v3, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type dropped all records - - long v4 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - ws.add("A"); - for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - consumer.triggerRefreshTo(v4); - assertEquals(v4, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // Long type has 1 record again - } - - // SNAP: TODO: add test for collections - @Test - public void testNoReshardingIfNumShardsPinnedByAnnotation() { - HollowWriteStateEngine wse = new HollowWriteStateEngine(); - new HollowObjectMapper(wse).initializeTypeState(TypeWithPinnedNumShards.class); - HollowObjectTypeWriteState typeWriteState = (HollowObjectTypeWriteState) wse.getTypeState("TypeWithPinnedNumShards"); - assertFalse(typeWriteState.allowTypeResharding()); - } - - // SNAP: TODO: add test for collections - @Test - public void testRestoreNumShardsButDoNotPin() { - InMemoryBlobStore blobStore = new InMemoryBlobStore(); - HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); - - HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) - .withTargetMaxTypeShardSize(32).build(); - p1.initializeDataModel(Long.class); - long v1 = p1.runCycle(ws -> { - // override cycle start time with a strictly incrementing count to work around clock skew - for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer.triggerRefreshTo(v1); - assertEquals(v1, consumer.getCurrentVersionId()); - assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); - - HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) - .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); - p2.initializeDataModel(Long.class); - p2.restore(v1, blobStore); - assertEquals(2, p2.getWriteEngine().getTypeState("Long").numShards); - assertFalse(p2.getWriteEngine().getTypeState("Long").isNumShardsPinned()); - - long v2 = p2.runCycle(ws -> { - for (int i=0; i<100; i++) { // results in 2 shards at shard size 32 - ws.add(new Long(i)); - } - }); - - HollowConsumer consumer2 = HollowConsumer.withBlobRetriever(blobStore) - .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { - @Override - public boolean allowDoubleSnapshot() { - return false; - } - @Override - public int maxDeltasBeforeDoubleSnapshot() { - return Integer.MAX_VALUE; - } - }) - .build(); - consumer2.triggerRefreshTo(v2); - int newNumShards = consumer2.getStateEngine().getTypeState("Long").numShards(); - assertEquals(v2, consumer2.getCurrentVersionId()); - assertEquals(4, newNumShards); - } - - @HollowShardLargeType(numShards=4) - private static class TypeWithPinnedNumShards { - private int value; - } } diff --git a/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java b/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java index 5590beace4..9602d15a14 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/write/HollowTypeWriteStateTest.java @@ -1,12 +1,16 @@ package com.netflix.hollow.core.write; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.api.producer.HollowProducer; import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager; +import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper; +import com.netflix.hollow.core.write.objectmapper.HollowShardLargeType; import com.netflix.hollow.test.InMemoryBlobStore; +import com.netflix.hollow.tools.checksum.HollowChecksum; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -70,6 +74,205 @@ public int maxDeltasBeforeDoubleSnapshot() { assertEquals(numShardsMap, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); } + @Test + public void testNumShardsWhenTypeDropsToZeroRecords() { + InMemoryBlobStore blobStore = new InMemoryBlobStore(); + HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); + + HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) + .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); + p1.initializeDataModel(HasAllTypes.class); + long v1 = p1.runCycle(ws -> { + ws.add("A"); + for (int i=0; i<50; i++) { // results in 2 shards at shard size 32 + final long val = new Long(i); + ws.add(new HasAllTypes( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + + HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) + .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { + @Override + public boolean allowDoubleSnapshot() { + return false; + } + @Override + public int maxDeltasBeforeDoubleSnapshot() { + return Integer.MAX_VALUE; + } + }) + .build(); + consumer.triggerRefreshTo(v1); + assertEquals(v1, consumer.getCurrentVersionId()); + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + HollowChecksum origChecksum = new HollowChecksum().forStateEngineWithCommonSchemas(consumer.getStateEngine(), consumer.getStateEngine()); + + long v2 = p1.runCycle(ws -> { + ws.add("A"); + }); + consumer.triggerRefreshTo(v2); + assertEquals(v2, consumer.getCurrentVersionId()); + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); // all types contain ghost records + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + long v3 = p1.runCycle(ws -> { + ws.add("A"); + ws.add("B"); + }); + consumer.triggerRefreshTo(v3); + assertEquals(v3, consumer.getCurrentVersionId()); + // All types dropped all records, no serialization in delta for these types (irrespective of dynamic type sharding) + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + long v4 = p1.runCycle(ws -> { + ws.add("A"); + for (int i=0; i<50; i++) { // back up to the original shard counts + final long val = new Long(i); + ws.add(new HasAllTypes( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + consumer.triggerRefreshTo(v4); + assertEquals(v4, consumer.getCurrentVersionId()); + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + consumer.triggerRefreshTo(v1); + HollowChecksum finalChecksum = new HollowChecksum().forStateEngineWithCommonSchemas(consumer.getStateEngine(), consumer.getStateEngine()); + + assertEquals(finalChecksum, origChecksum); + } + + @Test + public void testNoReshardingIfNumShardsPinnedByAnnotation() { + HollowWriteStateEngine wse = new HollowWriteStateEngine(); + new HollowObjectMapper(wse).initializeTypeState(TypeWithPinnedNumShards.class); + HollowObjectTypeWriteState typeWriteState = (HollowObjectTypeWriteState) wse.getTypeState("TypeWithPinnedNumShards"); + assertFalse(typeWriteState.allowTypeResharding()); + + wse = new HollowWriteStateEngine(); + new HollowObjectMapper(wse).initializeTypeState(HasAllTypesWithPinnedNumShards.class); + for (HollowTypeWriteState writeState : wse.getOrderedTypeStates()) { + assertFalse(writeState.allowTypeResharding()); + } + } + + @Test + public void testRestoreNumShardsButDoNotPin() { + InMemoryBlobStore blobStore = new InMemoryBlobStore(); + HollowInMemoryBlobStager blobStager = new HollowInMemoryBlobStager(); + + HollowProducer p1 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) + .withTargetMaxTypeShardSize(32).build(); + p1.initializeDataModel(HasAllTypes.class); + long v1 = p1.runCycle(ws -> { + for (int i=0; i<50; i++) { + final long val = new Long(i); + ws.add(new HasAllTypes( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + + HollowConsumer consumer = HollowConsumer.withBlobRetriever(blobStore) + .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { + @Override + public boolean allowDoubleSnapshot() { + return false; + } + @Override + public int maxDeltasBeforeDoubleSnapshot() { + return Integer.MAX_VALUE; + } + }) + .build(); + consumer.triggerRefreshTo(v1); + assertEquals(v1, consumer.getCurrentVersionId()); + // results in following numShards per type at shard size of 32 + assertEquals(2, consumer.getStateEngine().getTypeState("Long").numShards()); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + HollowProducer p2 = HollowProducer.withPublisher(blobStore).withBlobStager(blobStager) + .withTypeResharding(true).withTargetMaxTypeShardSize(32).build(); + p2.initializeDataModel(HasAllTypes.class); + p2.restore(v1, blobStore); + assertEquals(2, p2.getWriteEngine().getTypeState("Long").numShards); + assertEquals(2, consumer.getStateEngine().getTypeState("CustomReferenceType").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + for (HollowTypeWriteState writeState : p2.getWriteEngine().getOrderedTypeStates()) { + assertFalse(writeState.isNumShardsPinned()); + } + + long v2 = p2.runCycle(ws -> { + for (int i=0; i<1000; i++) { // results more shards at same shard size + final long val = new Long(i); + ws.add(new HasAllTypes( + new CustomReferenceType(val), + new HashSet<>(Arrays.asList("e" + val)), + Arrays.asList(i), + new HashMap(){{put("k"+val, new Long(val));}} + )); + } + }); + + HollowConsumer consumer2 = HollowConsumer.withBlobRetriever(blobStore) + .withDoubleSnapshotConfig(new HollowConsumer.DoubleSnapshotConfig() { + @Override + public boolean allowDoubleSnapshot() { + return false; + } + @Override + public int maxDeltasBeforeDoubleSnapshot() { + return Integer.MAX_VALUE; + } + }) + .build(); + consumer2.triggerRefreshTo(v2); + int newNumShards = consumer2.getStateEngine().getTypeState("Long").numShards(); + assertEquals(v2, consumer2.getCurrentVersionId()); + assertTrue(2 < consumer2.getStateEngine().getTypeState("Long").numShards()); + assertTrue(2 < consumer2.getStateEngine().getTypeState("CustomReferenceType").numShards()); + + // producer doesn't support resharding for these types yet + assertEquals(8, consumer.getStateEngine().getTypeState("SetOfString").numShards()); + assertEquals(4, consumer.getStateEngine().getTypeState("ListOfInteger").numShards()); + assertEquals(8, consumer.getStateEngine().getTypeState("MapOfStringToLong").numShards()); + + + } + private class HasAllTypes { CustomReferenceType customReferenceType; Set setOfStrings; @@ -90,4 +293,27 @@ private CustomReferenceType(long id) { this.id = id; } } + + @HollowShardLargeType(numShards=4) + private static class TypeWithPinnedNumShards { + private int value; + } + + private class HasAllTypesWithPinnedNumShards { + @HollowShardLargeType(numShards = 32) + CustomReferenceType customReferenceType; + @HollowShardLargeType(numShards = 32) + Set setOfStrings; + @HollowShardLargeType(numShards = 32) + List listOfInt; + @HollowShardLargeType(numShards = 32) + Map mapOfStringToLong; + + private HasAllTypesWithPinnedNumShards(CustomReferenceType customReferenceType, Set setOfStrings, List listOfInt, Map mapOfStringToLong) { + this.customReferenceType = customReferenceType; + this.setOfStrings = setOfStrings; + this.listOfInt = listOfInt; + this.mapOfStringToLong = mapOfStringToLong; + } + } }