From f8c537237a9f93e0ed91be8eec92ec3342b8a0d2 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 6 Aug 2024 09:20:34 +0000 Subject: [PATCH 1/6] Remove duplicate tests from CompactionOnEC2ST --- .../systemtest/suite/CompactionOnEC2ST.java | 152 +++--------------- ...tionOnFargateST.java => CompactionST.java} | 2 +- 2 files changed, 22 insertions(+), 132 deletions(-) rename java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/{CompactionOnFargateST.java => CompactionST.java} (99%) diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java index 0c97cf479c..0783993099 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java @@ -19,13 +19,9 @@ import org.approvaltests.Approvals; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import sleeper.compaction.strategy.impl.BasicCompactionStrategy; -import sleeper.core.partition.PartitionsBuilder; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; import sleeper.systemtest.dsl.extension.AfterTestReports; @@ -41,16 +37,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; -import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; -import static sleeper.configuration.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY; -import static sleeper.configuration.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF; import static sleeper.core.testutils.printers.FileReferencePrinter.printFiles; -import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix; -import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo; -import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.COMPACTION_ON_EC2; -import static sleeper.systemtest.suite.fixtures.SystemTestSchema.DEFAULT_SCHEMA; -import static sleeper.systemtest.suite.fixtures.SystemTestSchema.ROW_KEY_FIELD_NAME; @SystemTest @Slow @@ -70,124 +58,26 @@ void tearDown(SleeperSystemTest sleeper) { sleeper.compaction().scaleToZero(); } - @Nested - @DisplayName("Merge whole files together") - class MergeFiles { - - @Test - void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) { - // Given - sleeper.updateTableProperties(Map.of( - COMPACTION_FILES_BATCH_SIZE, "5")); - // Files with records 9, 9, 9, 9, 10 (which match SizeRatioStrategy criteria) - RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 46)); - sleeper.ingest().direct(tempDir) - .numberedRecords(numbers.range(0, 9)) - .numberedRecords(numbers.range(9, 18)) - .numberedRecords(numbers.range(18, 27)) - .numberedRecords(numbers.range(27, 36)) - .numberedRecords(numbers.range(36, 46)); - - // When - sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs(); - - // Then - assertThat(sleeper.directQuery().allRecordsInTable()) - .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); - } - - @Test - void shouldCompactFilesUsingBasicCompactionStrategy(SleeperSystemTest sleeper) { - // Given - sleeper.updateTableProperties(Map.of( - COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), - COMPACTION_FILES_BATCH_SIZE, "2")); - RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100)); - sleeper.ingest().direct(tempDir) - .numberedRecords(numbers.range(0, 25)) - .numberedRecords(numbers.range(25, 50)) - .numberedRecords(numbers.range(50, 75)) - .numberedRecords(numbers.range(75, 100)); - - // When - sleeper.compaction().createJobs(2).invokeTasks(1).waitForJobs(); - - // Then - assertThat(sleeper.directQuery().allRecordsInTable()) - .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); - } - } - - @Nested - @DisplayName("Merge parts of files referenced on multiple partitions") - class MergePartialFiles { - - @BeforeEach - void setUp(SleeperSystemTest sleeper) { - sleeper.setGeneratorOverrides(overrideField( - ROW_KEY_FIELD_NAME, numberStringAndZeroPadTo(2).then(addPrefix("row-")))); - sleeper.partitioning().setPartitions(new PartitionsBuilder(DEFAULT_SCHEMA) - .rootFirst("root") - .splitToNewChildren("root", "L", "R", "row-50") - .splitToNewChildren("L", "LL", "LR", "row-25") - .splitToNewChildren("R", "RL", "RR", "row-75") - .buildTree()); - } - - @Test - void shouldCompactOneFileIntoExistingFilesOnLeafPartitions(SleeperSystemTest sleeper) throws Exception { - // Given a compaction strategy which will always compact two files together - sleeper.updateTableProperties(Map.of( - COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), - COMPACTION_FILES_BATCH_SIZE, "2")); - // A file which we add to all 4 leaf partitions - sleeper.sourceFiles().inDataBucket().writeSketches() - .createWithNumberedRecords("file.parquet", LongStream.range(0, 50).map(n -> n * 2)); - sleeper.ingest().toStateStore().addFileWithRecordEstimatesOnPartitions( - "file.parquet", Map.of( - "LL", 12L, - "LR", 12L, - "RL", 12L, - "RR", 12L)); - // And a file in each leaf partition - sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString())); - sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1)); - - // When we run compaction - sleeper.compaction().createJobs(4).invokeTasks(1).waitForJobs(); - - // Then the same records should be present, in one file on each leaf partition - assertThat(sleeper.directQuery().allRecordsInTable()) - .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); - } - - @Test - void shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions(SleeperSystemTest sleeper) throws Exception { - // Given a compaction strategy which will always compact two files together - sleeper.updateTableProperties(Map.of( - COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), - COMPACTION_FILES_BATCH_SIZE, "2")); - // And a file which we add to the root partition - sleeper.sourceFiles().inDataBucket().writeSketches() - .createWithNumberedRecords("file.parquet", LongStream.range(0, 50).map(n -> n * 2)); - sleeper.ingest().toStateStore().addFileOnPartition("file.parquet", "root", 50); - // And a file in each leaf partition - sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString())); - sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1)); - - // When we split the file from the root partition into separate references in the leaf partitions - // And we run compaction - sleeper.compaction() - .createJobs(0).createJobs(4) // Split down two levels of the tree - .invokeTasks(1).waitForJobs(); - - // Then the same records should be present, in one file on each leaf partition - assertThat(sleeper.directQuery().allRecordsInTable()) - .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); - } + @Test + void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) { + // Given + sleeper.updateTableProperties(Map.of( + COMPACTION_FILES_BATCH_SIZE, "5")); + // Files with records 9, 9, 9, 9, 10 (which match SizeRatioStrategy criteria) + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 46)); + sleeper.ingest().direct(tempDir) + .numberedRecords(numbers.range(0, 9)) + .numberedRecords(numbers.range(9, 18)) + .numberedRecords(numbers.range(18, 27)) + .numberedRecords(numbers.range(27, 36)) + .numberedRecords(numbers.range(36, 46)); + + // When + sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs(); + + // Then + assertThat(sleeper.directQuery().allRecordsInTable()) + .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46))); + Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); } } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnFargateST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java similarity index 99% rename from java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnFargateST.java rename to java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java index d234150bf2..65c453052f 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnFargateST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java @@ -51,7 +51,7 @@ import static sleeper.systemtest.suite.fixtures.SystemTestSchema.ROW_KEY_FIELD_NAME; @SystemTest -public class CompactionOnFargateST { +public class CompactionST { @TempDir private Path tempDir; From 04a6d731723025b8e2807c50a2a0b9b62dcd0c5b Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 6 Aug 2024 09:31:20 +0000 Subject: [PATCH 2/6] Move CompactionMethod enum to configuration module --- .../job/execution/DefaultSelector.java | 41 +++++++++---------- .../instance/CompactionProperty.java | 4 +- .../validation}/CompactionMethod.java | 8 +++- 3 files changed, 29 insertions(+), 24 deletions(-) rename java/{compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution => configuration/src/main/java/sleeper/configuration/properties/validation}/CompactionMethod.java (81%) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java index f40813c010..43964383b1 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java @@ -25,10 +25,10 @@ import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.properties.table.TableProperties; import sleeper.configuration.properties.table.TablePropertiesProvider; -import sleeper.configuration.properties.table.TableProperty; +import sleeper.configuration.properties.validation.CompactionMethod; import sleeper.statestore.StateStoreProvider; -import java.util.Locale; +import static sleeper.configuration.properties.table.TableProperty.COMPACTION_METHOD; /** * Determines which compaction algorithm should be run based on the table and instance configuration properties and @@ -55,33 +55,30 @@ public DefaultSelector( public CompactionRunner chooseCompactor(CompactionJob job) { TableProperties tableProperties = tablePropertiesProvider .getById(job.getTableId()); - String method = tableProperties.get(TableProperty.COMPACTION_METHOD).toUpperCase(Locale.UK); - - // Convert to enum value and default to Java - CompactionMethod desired; - try { - desired = CompactionMethod.valueOf(method); - } catch (IllegalArgumentException e) { - desired = CompactionMethod.DEFAULT; - } - - CompactionRunner defaultRunner = new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration); - CompactionRunner runner = defaultRunner; - switch (desired) { - case RUST: - runner = new RustCompaction(tablePropertiesProvider, stateStoreProvider); - break; - default: - break; - } + CompactionMethod method = tableProperties.getEnumValue(COMPACTION_METHOD, CompactionMethod.class); + CompactionRunner runner = getRunnerForMethod(method); // Is an iterator specifed? If so can we support this? if (job.getIteratorClassName() != null && !runner.supportsIterators()) { LOGGER.debug("Table has an iterator set, which compactor {} doesn't support, falling back to default", runner.getClass().getSimpleName()); - runner = defaultRunner; + runner = getJavaRunner(); } LOGGER.info("Selecting {} compactor (language {}) for job ID {} table ID {}", runner.getClass().getSimpleName(), runner.implementationLanguage(), job.getId(), job.getTableId()); return runner; } + + private CompactionRunner getRunnerForMethod(CompactionMethod method) { + switch (method) { + case RUST: + return new RustCompaction(tablePropertiesProvider, stateStoreProvider); + case JAVA: + default: + return getJavaRunner(); + } + } + + private CompactionRunner getJavaRunner() { + return new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration); + } } diff --git a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java index 30442bf249..f737cafc27 100644 --- a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java +++ b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CompactionProperty.java @@ -19,6 +19,7 @@ import sleeper.configuration.Utils; import sleeper.configuration.properties.SleeperPropertyIndex; import sleeper.configuration.properties.validation.CompactionECSLaunchType; +import sleeper.configuration.properties.validation.CompactionMethod; import java.util.List; @@ -262,7 +263,8 @@ public interface CompactionProperty { UserDefinedInstanceProperty DEFAULT_COMPACTION_METHOD = Index.propertyBuilder("sleeper.default.table.compaction.method") .description("Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction support is " + "experimental.") - .defaultValue("JAVA") + .defaultValue(CompactionMethod.JAVA.toString()) + .validationPredicate(CompactionMethod::isValid) .propertyGroup(InstancePropertyGroup.COMPACTION).build(); static List getAll() { diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionMethod.java b/java/configuration/src/main/java/sleeper/configuration/properties/validation/CompactionMethod.java similarity index 81% rename from java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionMethod.java rename to java/configuration/src/main/java/sleeper/configuration/properties/validation/CompactionMethod.java index d03600e412..3745f62de7 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionMethod.java +++ b/java/configuration/src/main/java/sleeper/configuration/properties/validation/CompactionMethod.java @@ -13,7 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package sleeper.compaction.job.execution; +package sleeper.configuration.properties.validation; + +import org.apache.commons.lang3.EnumUtils; /** * Different compaction methods for Sleeper which support different capabilities and must be @@ -29,4 +31,8 @@ public enum CompactionMethod { RUST; public static final CompactionMethod DEFAULT = CompactionMethod.JAVA; + + public static boolean isValid(String value) { + return EnumUtils.isValidEnumIgnoreCase(CompactionMethod.class, value); + } } From 9acdfc46d7cebb4a09f784067e229abe76284551 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 6 Aug 2024 09:41:21 +0000 Subject: [PATCH 3/6] Test compaction with Rust --- .../systemtest/suite/CompactionST.java | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java index 65c453052f..af99daa718 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.io.TempDir; import sleeper.compaction.strategy.impl.BasicCompactionStrategy; +import sleeper.configuration.properties.validation.CompactionMethod; import sleeper.core.partition.PartitionsBuilder; import sleeper.systemtest.dsl.SleeperSystemTest; import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues; @@ -39,6 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; +import static sleeper.configuration.properties.table.TableProperty.COMPACTION_METHOD; import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS; import static sleeper.configuration.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY; import static sleeper.configuration.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF; @@ -182,4 +184,64 @@ void shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions(SleeperSystem Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); } } + + @Nested + @DisplayName("Run with Rust") + class RunWithRust { + + @BeforeEach + void setUp(SleeperSystemTest sleeper) { + sleeper.updateTableProperties(Map.of( + COMPACTION_METHOD, CompactionMethod.RUST.toString(), + COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(), + COMPACTION_FILES_BATCH_SIZE, "2")); + } + + @Test + void shouldCompactFilesInSinglePartition(SleeperSystemTest sleeper) { + // Given + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100)); + sleeper.ingest().direct(tempDir) + .numberedRecords(numbers.range(0, 25)) + .numberedRecords(numbers.range(25, 50)) + .numberedRecords(numbers.range(50, 75)) + .numberedRecords(numbers.range(75, 100)); + + // When + sleeper.compaction().createJobs(2).invokeTasks(1).waitForJobs(); + + // Then + assertThat(sleeper.directQuery().allRecordsInTable()) + .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); + Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + } + + @Test + void shouldCompactFilesFromMultiplePartitions(SleeperSystemTest sleeper) throws Exception { + // Given + sleeper.partitioning().setPartitions(new PartitionsBuilder(DEFAULT_SCHEMA) + .rootFirst("root") + .splitToNewChildren("root", "L", "R", "row-50") + .splitToNewChildren("L", "LL", "LR", "row-25") + .splitToNewChildren("R", "RL", "RR", "row-75") + .buildTree()); + sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString())); + RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100)); + sleeper.ingest().direct(tempDir) + .numberedRecords(numbers.range(0, 50)); + sleeper.sourceFiles().inDataBucket().writeSketches() + .createWithNumberedRecords("file.parquet", numbers.range(50, 100)); + sleeper.ingest().toStateStore().addFileOnPartition("file.parquet", "root", 50); + + // When + sleeper.compaction() + .createJobs(0).createJobs(4) // Split down two levels of the tree + .invokeTasks(1).waitForJobs(); + + // Then + assertThat(sleeper.directQuery().allRecordsInTable()) + .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); + Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + } + } } From 22ebd70b196e265d2780ab36a93ba6606a96005d Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 6 Aug 2024 09:55:11 +0000 Subject: [PATCH 4/6] Removed use of Approvals from compaction system tests --- .../systemtest/suite/CompactionOnEC2ST.java | 5 +- .../systemtest/suite/CompactionST.java | 20 ++++--- ...ExistingFilesOnLeafPartitions.approved.txt | 7 --- ...sUsingBasicCompactionStrategy.approved.txt | 6 --- ...singDefaultCompactionStrategy.approved.txt | 4 -- ...ExistingFilesOnLeafPartitions.approved.txt | 7 --- ...ExistingFilesOnLeafPartitions.approved.txt | 7 --- .../suite/testutil/TestResources.java | 53 +++++++++++++++++++ .../compaction/compacted4To2Files.txt} | 0 .../compaction/compacted5ToSingleFile.txt} | 0 .../compactedFileIntoLeafPartitions.txt} | 0 11 files changed, 69 insertions(+), 40 deletions(-) delete mode 100644 java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt delete mode 100644 java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt delete mode 100644 java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt delete mode 100644 java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt delete mode 100644 java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt create mode 100644 java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/TestResources.java rename java/system-test/system-test-suite/src/test/{java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt => resources/compaction/compacted4To2Files.txt} (100%) rename java/system-test/system-test-suite/src/test/{java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt => resources/compaction/compacted5ToSingleFile.txt} (100%) rename java/system-test/system-test-suite/src/test/{java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt => resources/compaction/compactedFileIntoLeafPartitions.txt} (100%) diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java index 0783993099..e0e0e1e4f7 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionOnEC2ST.java @@ -16,7 +16,6 @@ package sleeper.systemtest.suite; -import org.approvaltests.Approvals; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +38,7 @@ import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE; import static sleeper.core.testutils.printers.FileReferencePrinter.printFiles; import static sleeper.systemtest.suite.fixtures.SystemTestInstance.COMPACTION_ON_EC2; +import static sleeper.systemtest.suite.testutil.TestResources.exampleString; @SystemTest @Slow @@ -78,6 +78,7 @@ void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) // Then assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compacted5ToSingleFile.txt")); } } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java index af99daa718..73badd8aad 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java @@ -16,7 +16,6 @@ package sleeper.systemtest.suite; -import org.approvaltests.Approvals; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -51,6 +50,7 @@ import static sleeper.systemtest.suite.fixtures.SystemTestInstance.MAIN; import static sleeper.systemtest.suite.fixtures.SystemTestSchema.DEFAULT_SCHEMA; import static sleeper.systemtest.suite.fixtures.SystemTestSchema.ROW_KEY_FIELD_NAME; +import static sleeper.systemtest.suite.testutil.TestResources.exampleString; @SystemTest public class CompactionST { @@ -88,7 +88,8 @@ void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) // Then assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compacted5ToSingleFile.txt")); } @Test @@ -110,7 +111,8 @@ void shouldCompactFilesUsingBasicCompactionStrategy(SleeperSystemTest sleeper) { // Then assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compacted4To2Files.txt")); } } @@ -155,7 +157,8 @@ void shouldCompactOneFileIntoExistingFilesOnLeafPartitions(SleeperSystemTest sle // Then the same records should be present, in one file on each leaf partition assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compactedFileIntoLeafPartitions.txt")); } @Test @@ -181,7 +184,8 @@ void shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions(SleeperSystem // Then the same records should be present, in one file on each leaf partition assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compactedFileIntoLeafPartitions.txt")); } } @@ -213,7 +217,8 @@ void shouldCompactFilesInSinglePartition(SleeperSystemTest sleeper) { // Then assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compacted4To2Files.txt")); } @Test @@ -241,7 +246,8 @@ void shouldCompactFilesFromMultiplePartitions(SleeperSystemTest sleeper) throws // Then assertThat(sleeper.directQuery().allRecordsInTable()) .containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100))); - Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())); + assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all())) + .isEqualTo(exampleString("compaction/compactedFileIntoLeafPartitions.txt")); } } } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt deleted file mode 100644 index 07c7ba6c84..0000000000 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt +++ /dev/null @@ -1,7 +0,0 @@ -Unreferenced files: 5 -Referenced files: 4 -File references: 4 -Partition at LL: 25 records in file 1 -Partition at LR: 25 records in file 2 -Partition at RL: 25 records in file 3 -Partition at RR: 25 records in file 4 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt deleted file mode 100644 index b8dfb3f73b..0000000000 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt +++ /dev/null @@ -1,6 +0,0 @@ -Unreferenced files: 4 -Referenced files: 2 -File references: 2 -Partition at root: -50 records in file 1 -50 records in file 2 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt deleted file mode 100644 index a90862bc97..0000000000 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt +++ /dev/null @@ -1,4 +0,0 @@ -Unreferenced files: 5 -Referenced files: 1 -File references: 1 -Partition at root: 46 records in file 1 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt deleted file mode 100644 index 07c7ba6c84..0000000000 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt +++ /dev/null @@ -1,7 +0,0 @@ -Unreferenced files: 5 -Referenced files: 4 -File references: 4 -Partition at LL: 25 records in file 1 -Partition at LR: 25 records in file 2 -Partition at RL: 25 records in file 3 -Partition at RR: 25 records in file 4 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt deleted file mode 100644 index 07c7ba6c84..0000000000 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnFargateST.MergePartialFiles.shouldCompactOneFileIntoExistingFilesOnLeafPartitions.approved.txt +++ /dev/null @@ -1,7 +0,0 @@ -Unreferenced files: 5 -Referenced files: 4 -File references: 4 -Partition at LL: 25 records in file 1 -Partition at LR: 25 records in file 2 -Partition at RL: 25 records in file 3 -Partition at RR: 25 records in file 4 diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/TestResources.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/TestResources.java new file mode 100644 index 0000000000..157dcaaff6 --- /dev/null +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/testutil/TestResources.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.suite.testutil; + +import org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URL; +import java.util.Objects; + +public class TestResources { + + private TestResources() { + } + + public static String exampleString(String path) { + try (Reader reader = exampleReader(path)) { + return IOUtils.toString(reader); + } catch (IOException e) { + throw new IllegalStateException("Failed to load example: " + path, e); + } + } + + public static Reader exampleReader(String path) { + return new InputStreamReader(exampleInputStream(path)); + } + + public static InputStream exampleInputStream(String path) { + URL resource = Objects.requireNonNull(TestResources.class.getClassLoader().getResource(path)); + try { + return resource.openStream(); + } catch (IOException e) { + throw new IllegalStateException("Failed to load test example: " + path, e); + } + } + +} diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt b/java/system-test/system-test-suite/src/test/resources/compaction/compacted4To2Files.txt similarity index 100% rename from java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergeFiles.shouldCompactFilesUsingBasicCompactionStrategy.approved.txt rename to java/system-test/system-test-suite/src/test/resources/compaction/compacted4To2Files.txt diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt b/java/system-test/system-test-suite/src/test/resources/compaction/compacted5ToSingleFile.txt similarity index 100% rename from java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergeFiles.shouldCompactFilesUsingDefaultCompactionStrategy.approved.txt rename to java/system-test/system-test-suite/src/test/resources/compaction/compacted5ToSingleFile.txt diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt b/java/system-test/system-test-suite/src/test/resources/compaction/compactedFileIntoLeafPartitions.txt similarity index 100% rename from java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/approvals/CompactionOnEC2ST.MergePartialFiles.shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions.approved.txt rename to java/system-test/system-test-suite/src/test/resources/compaction/compactedFileIntoLeafPartitions.txt From 336a03b0a17a0855c363ddbf407d41f28c8de7aa Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 6 Aug 2024 12:13:19 +0000 Subject: [PATCH 5/6] Rename methods to create compaction runners --- .../compaction/job/execution/DefaultSelector.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java index 43964383b1..99b0fd287b 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/DefaultSelector.java @@ -56,29 +56,29 @@ public CompactionRunner chooseCompactor(CompactionJob job) { TableProperties tableProperties = tablePropertiesProvider .getById(job.getTableId()); CompactionMethod method = tableProperties.getEnumValue(COMPACTION_METHOD, CompactionMethod.class); - CompactionRunner runner = getRunnerForMethod(method); + CompactionRunner runner = createRunnerForMethod(method); // Is an iterator specifed? If so can we support this? if (job.getIteratorClassName() != null && !runner.supportsIterators()) { LOGGER.debug("Table has an iterator set, which compactor {} doesn't support, falling back to default", runner.getClass().getSimpleName()); - runner = getJavaRunner(); + runner = createJavaRunner(); } LOGGER.info("Selecting {} compactor (language {}) for job ID {} table ID {}", runner.getClass().getSimpleName(), runner.implementationLanguage(), job.getId(), job.getTableId()); return runner; } - private CompactionRunner getRunnerForMethod(CompactionMethod method) { + private CompactionRunner createRunnerForMethod(CompactionMethod method) { switch (method) { case RUST: return new RustCompaction(tablePropertiesProvider, stateStoreProvider); case JAVA: default: - return getJavaRunner(); + return createJavaRunner(); } } - private CompactionRunner getJavaRunner() { + private CompactionRunner createJavaRunner() { return new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration); } } From 6b2a8a5124c5f5bb0b4a716ecc77a54cb4a15cef Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 6 Aug 2024 14:17:36 +0000 Subject: [PATCH 6/6] Fix record generator for shouldCompactFilesFromMultiplePartitions --- .../src/test/java/sleeper/systemtest/suite/CompactionST.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java index 73badd8aad..cab8732740 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/CompactionST.java @@ -224,6 +224,8 @@ void shouldCompactFilesInSinglePartition(SleeperSystemTest sleeper) { @Test void shouldCompactFilesFromMultiplePartitions(SleeperSystemTest sleeper) throws Exception { // Given + sleeper.setGeneratorOverrides(overrideField( + ROW_KEY_FIELD_NAME, numberStringAndZeroPadTo(2).then(addPrefix("row-")))); sleeper.partitioning().setPartitions(new PartitionsBuilder(DEFAULT_SCHEMA) .rootFirst("root") .splitToNewChildren("root", "L", "R", "row-50")