diff --git a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java index 01910ab28b3d..930271387650 100644 --- a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -73,31 +74,48 @@ public class AclsImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(AclsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(AclsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(AclsImage image) throws Throwable { + private static void testToImage(AclsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(AclsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(AclsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> AclsImage.EMPTY, + AclsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(AclsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - AclsDelta delta = new AclsDelta(AclsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - AclsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java index 1f656baa2e62..8d1a5883cc41 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -89,31 +90,48 @@ public class ClientQuotasImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ClientQuotasImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ClientQuotasImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ClientQuotasImage image) throws Throwable { + private static void testToImage(ClientQuotasImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ClientQuotasImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ClientQuotasImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ClientQuotasImage.EMPTY, + ClientQuotasDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ClientQuotasImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ClientQuotasDelta delta = new ClientQuotasDelta(ClientQuotasImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ClientQuotasImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java index 256410631aec..e12e1143c881 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java @@ -127,31 +127,48 @@ public class ClusterImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ClusterImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ClusterImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ClusterImage image) throws Throwable { + private static void testToImage(ClusterImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ClusterImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ClusterImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ClusterImage.EMPTY, + ClusterDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ClusterImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ClusterDelta delta = new ClusterDelta(ClusterImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ClusterImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 429fd8d9aa2c..9b7cd39dcd6e 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; @@ -84,31 +85,48 @@ public class ConfigurationsImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ConfigurationsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ConfigurationsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ConfigurationsImage image) throws Throwable { + private static void testToImage(ConfigurationsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ConfigurationsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ConfigurationsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ConfigurationsImage.EMPTY, + ConfigurationsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ConfigurationsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ConfigurationsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java index a510bf1d8553..1ec1d24b6513 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -72,32 +73,49 @@ public class FeaturesImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(FeaturesImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(FeaturesImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(FeaturesImage image) throws Throwable { + private static void testToImage(FeaturesImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(FeaturesImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(FeaturesImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> FeaturesImage.EMPTY, + FeaturesDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(FeaturesImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(image.metadataVersion()).build()); - FeaturesDelta delta = new FeaturesDelta(FeaturesImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - FeaturesImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index 2a6e3e6f3e55..ae247108fd4c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -20,9 +20,13 @@ import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.List; +import java.util.Optional; + import static org.junit.jupiter.api.Assertions.assertEquals; @@ -71,31 +75,69 @@ public class MetadataImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(MetadataImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(MetadataImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply(IMAGE2.provenance())); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + ImageWriterOptions options = new ImageWriterOptions.Builder() + .setMetadataVersion(IMAGE1.features().metadataVersion()) + .build(); + List records = getImageRecords(IMAGE1, options); + records.addAll(FeaturesImageTest.DELTA1_RECORDS); + records.addAll(ClusterImageTest.DELTA1_RECORDS); + records.addAll(TopicsImageTest.DELTA1_RECORDS); + records.addAll(ConfigurationsImageTest.DELTA1_RECORDS); + records.addAll(ClientQuotasImageTest.DELTA1_RECORDS); + records.addAll(ProducerIdsImageTest.DELTA1_RECORDS); + records.addAll(AclsImageTest.DELTA1_RECORDS); + records.addAll(ScramImageTest.DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); + } + + private static void testToImage(MetadataImage image) { + testToImage(image, new ImageWriterOptions.Builder() + .setMetadataVersion(image.features().metadataVersion()) + .build(), Optional.empty()); + } + + private static void testToImage(MetadataImage image, ImageWriterOptions options) { + testToImage(image, options, Optional.empty()); + } + + static void testToImage(MetadataImage image, ImageWriterOptions options, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image, options))); + } + + private static void testToImage(MetadataImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper( + () -> MetadataImage.EMPTY, + MetadataDelta::new + ) { + @Override + public MetadataImage createImageByApplyingDelta(MetadataDelta delta) { + return delta.apply(image.provenance()); + } + }.test(image, fromRecords); } - private void testToImageAndBack(MetadataImage image) throws Throwable { + private static List getImageRecords(MetadataImage image, ImageWriterOptions options) { RecordListWriter writer = new RecordListWriter(); - image.write(writer, new ImageWriterOptions.Builder(image).build()); - MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - MetadataImage nextImage = delta.apply(image.provenance()); - assertEquals(image, nextImage); + image.write(writer, options); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java index 69695473d23d..738582fc108c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -61,31 +62,48 @@ public class ProducerIdsImageTest { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ProducerIdsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ProducerIdsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ProducerIdsImage image) throws Throwable { + private static void testToImage(ProducerIdsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ProducerIdsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ProducerIdsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ProducerIdsImage.EMPTY, + ProducerIdsDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ProducerIdsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ProducerIdsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java index 3400be47b38c..038a5c956c34 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_256; @@ -113,36 +114,53 @@ static ScramCredentialData randomScramCredentialData(Random random) { } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(ScramImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(ScramImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); } - private void testToImageAndBack(ScramImage image) throws Throwable { + private static void testToImage(ScramImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(ScramImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(ScramImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> ScramImage.EMPTY, + ScramDelta::new + ).test(image, fromRecords); + } + + private static List getImageRecords(ScramImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - ScramDelta delta = new ScramDelta(ScramImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - ScramImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } @Test - public void testEmptyWithInvalidIBP() throws Throwable { + public void testEmptyWithInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build(); RecordListWriter writer = new RecordListWriter(); @@ -150,7 +168,7 @@ public void testEmptyWithInvalidIBP() throws Throwable { } @Test - public void testImage1withInvalidIBP() throws Throwable { + public void testImage1withInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build(); RecordListWriter writer = new RecordListWriter(); diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index b3e964da85d1..d9bf08767143 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -41,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD; @@ -225,6 +226,11 @@ public void testBasicLocalChanges() { ), changes.followers().keySet() ); + + TopicsImage finalImage = delta.apply(); + List imageRecords = getImageRecords(IMAGE1); + imageRecords.addAll(topicRecords); + testToImage(finalImage, Optional.of(imageRecords)); } @Test @@ -265,6 +271,11 @@ public void testDeleteAfterChanges() { assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes()); assertEquals(Collections.emptyMap(), changes.leaders()); assertEquals(Collections.emptyMap(), changes.followers()); + + TopicsImage finalImage = delta.apply(); + List imageRecords = getImageRecords(image); + imageRecords.addAll(topicRecords); + testToImage(finalImage, Optional.of(imageRecords)); } @Test @@ -365,35 +376,58 @@ public void testLocalReassignmentChanges() { new HashSet<>(Arrays.asList(new TopicPartition("zoo", 1), new TopicPartition("zoo", 5))), changes.followers().keySet() ); + + + TopicsImage finalImage = delta.apply(); + List imageRecords = getImageRecords(image); + imageRecords.addAll(topicRecords); + testToImage(finalImage, Optional.of(imageRecords)); } @Test - public void testEmptyImageRoundTrip() throws Throwable { - testToImageAndBack(TopicsImage.EMPTY); + public void testEmptyImageRoundTrip() { + testToImage(TopicsImage.EMPTY); } @Test - public void testImage1RoundTrip() throws Throwable { - testToImageAndBack(IMAGE1); + public void testImage1RoundTrip() { + testToImage(IMAGE1); } @Test - public void testApplyDelta1() throws Throwable { + public void testApplyDelta1() { assertEquals(IMAGE2, DELTA1.apply()); + // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 + List records = getImageRecords(IMAGE1); + records.addAll(DELTA1_RECORDS); + testToImage(IMAGE2, records); } @Test - public void testImage2RoundTrip() throws Throwable { - testToImageAndBack(IMAGE2); + public void testImage2RoundTrip() { + testToImage(IMAGE2); + } + + private static void testToImage(TopicsImage image) { + testToImage(image, Optional.empty()); + } + + private static void testToImage(TopicsImage image, Optional> fromRecords) { + testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image))); + } + + private static void testToImage(TopicsImage image, List fromRecords) { + // test from empty image stopping each of the various intermediate images along the way + new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>( + () -> TopicsImage.EMPTY, + TopicsDelta::new + ).test(image, fromRecords); } - private void testToImageAndBack(TopicsImage image) throws Throwable { + private static List getImageRecords(TopicsImage image) { RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder().build()); - TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - TopicsImage nextImage = delta.apply(); - assertEquals(image, nextImage); + return writer.records(); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index 20569dc11732..a1dc2ce4fbb2 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -38,8 +38,11 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,6 +98,74 @@ public static void replayOne( replayAll(target, Collections.singletonList(recordAndVersion)); } + public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper { + private final Supplier emptyImageSupplier; + private final Function deltaUponImageCreator; + + public TestThroughAllIntermediateImagesLeadingToFinalImageHelper( + Supplier emptyImageSupplier, Function deltaUponImageCreator + ) { + this.emptyImageSupplier = Objects.requireNonNull(emptyImageSupplier); + this.deltaUponImageCreator = Objects.requireNonNull(deltaUponImageCreator); + } + + public I getEmptyImage() { + return this.emptyImageSupplier.get(); + } + + public D createDeltaUponImage(I image) { + return this.deltaUponImageCreator.apply(image); + } + + @SuppressWarnings("unchecked") + public I createImageByApplyingDelta(D delta) { + try { + try { + Method method = delta.getClass().getMethod("apply"); + return (I) method.invoke(delta); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } catch (InvocationTargetException e) { + throw new RuntimeException(e.getCause()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public void test(I finalImage, List fromRecords) { + for (int numRecordsForfirstImage = 1; numRecordsForfirstImage <= fromRecords.size(); ++numRecordsForfirstImage) { + // create first image from first numRecordsForfirstImage records + D delta = createDeltaUponImage(getEmptyImage()); + RecordTestUtils.replayAll(delta, fromRecords.subList(0, numRecordsForfirstImage)); + I firstImage = createImageByApplyingDelta(delta); + // for all possible further batch sizes, apply as many batches as it takes to get to the final image + int remainingRecords = fromRecords.size() - numRecordsForfirstImage; + if (remainingRecords == 0) { + assertEquals(finalImage, firstImage); + } else { + // for all possible further batch sizes... + for (int maxRecordsForSuccessiveBatches = 1; maxRecordsForSuccessiveBatches <= remainingRecords; ++maxRecordsForSuccessiveBatches) { + I latestIntermediateImage = firstImage; + // ... apply as many batches as it takes to get to the final image + int numAdditionalBatches = (int) Math.ceil(remainingRecords * 1.0 / maxRecordsForSuccessiveBatches); + for (int additionalBatchNum = 0; additionalBatchNum < numAdditionalBatches; ++additionalBatchNum) { + // apply up to maxRecordsForSuccessiveBatches records on top of the latest intermediate image + // to obtain the next intermediate image. + delta = createDeltaUponImage(latestIntermediateImage); + int applyFromIndex = numRecordsForfirstImage + additionalBatchNum * maxRecordsForSuccessiveBatches; + int applyToIndex = Math.min(fromRecords.size(), applyFromIndex + maxRecordsForSuccessiveBatches); + RecordTestUtils.replayAll(delta, fromRecords.subList(applyFromIndex, applyToIndex)); + latestIntermediateImage = createImageByApplyingDelta(delta); + } + // The final intermediate image received should be the expected final image + assertEquals(finalImage, latestIntermediateImage); + } + } + } + } + } + /** * Replay a list of record batches. *