Skip to content

Commit

Permalink
Merge pull request #3024 from gchq/2875-system-test-rust-compaction
Browse files Browse the repository at this point in the history
Issue 2875 - System test Rust compaction
  • Loading branch information
patchwork01 authored Aug 7, 2024
2 parents c2b19ea + 6b2a8a5 commit 823ec6d
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import sleeper.configuration.jars.ObjectFactory;
import sleeper.configuration.properties.table.TableProperties;
import sleeper.configuration.properties.table.TablePropertiesProvider;
import sleeper.configuration.properties.table.TableProperty;
import sleeper.configuration.properties.validation.CompactionMethod;
import sleeper.statestore.StateStoreProvider;

import java.util.Locale;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_METHOD;

/**
* Determines which compaction algorithm should be run based on the table and instance configuration properties and
Expand All @@ -55,33 +55,30 @@ public DefaultSelector(
public CompactionRunner chooseCompactor(CompactionJob job) {
TableProperties tableProperties = tablePropertiesProvider
.getById(job.getTableId());
String method = tableProperties.get(TableProperty.COMPACTION_METHOD).toUpperCase(Locale.UK);

// Convert to enum value and default to Java
CompactionMethod desired;
try {
desired = CompactionMethod.valueOf(method);
} catch (IllegalArgumentException e) {
desired = CompactionMethod.DEFAULT;
}

CompactionRunner defaultRunner = new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration);
CompactionRunner runner = defaultRunner;
switch (desired) {
case RUST:
runner = new RustCompaction(tablePropertiesProvider, stateStoreProvider);
break;
default:
break;
}
CompactionMethod method = tableProperties.getEnumValue(COMPACTION_METHOD, CompactionMethod.class);
CompactionRunner runner = createRunnerForMethod(method);

// Is an iterator specifed? If so can we support this?
if (job.getIteratorClassName() != null && !runner.supportsIterators()) {
LOGGER.debug("Table has an iterator set, which compactor {} doesn't support, falling back to default", runner.getClass().getSimpleName());
runner = defaultRunner;
runner = createJavaRunner();
}

LOGGER.info("Selecting {} compactor (language {}) for job ID {} table ID {}", runner.getClass().getSimpleName(), runner.implementationLanguage(), job.getId(), job.getTableId());
return runner;
}

private CompactionRunner createRunnerForMethod(CompactionMethod method) {
switch (method) {
case RUST:
return new RustCompaction(tablePropertiesProvider, stateStoreProvider);
case JAVA:
default:
return createJavaRunner();
}
}

private CompactionRunner createJavaRunner() {
return new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sleeper.configuration.Utils;
import sleeper.configuration.properties.SleeperPropertyIndex;
import sleeper.configuration.properties.validation.CompactionECSLaunchType;
import sleeper.configuration.properties.validation.CompactionMethod;

import java.util.List;

Expand Down Expand Up @@ -262,7 +263,8 @@ public interface CompactionProperty {
UserDefinedInstanceProperty DEFAULT_COMPACTION_METHOD = Index.propertyBuilder("sleeper.default.table.compaction.method")
.description("Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction support is " +
"experimental.")
.defaultValue("JAVA")
.defaultValue(CompactionMethod.JAVA.toString())
.validationPredicate(CompactionMethod::isValid)
.propertyGroup(InstancePropertyGroup.COMPACTION).build();

static List<UserDefinedInstanceProperty> getAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.compaction.job.execution;
package sleeper.configuration.properties.validation;

import org.apache.commons.lang3.EnumUtils;

/**
* Different compaction methods for Sleeper which support different capabilities and must be
Expand All @@ -29,4 +31,8 @@ public enum CompactionMethod {
RUST;

public static final CompactionMethod DEFAULT = CompactionMethod.JAVA;

public static boolean isValid(String value) {
return EnumUtils.isValidEnumIgnoreCase(CompactionMethod.class, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,11 @@

package sleeper.systemtest.suite;

import org.approvaltests.Approvals;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import sleeper.compaction.strategy.impl.BasicCompactionStrategy;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues;
import sleeper.systemtest.dsl.extension.AfterTestReports;
Expand All @@ -41,16 +36,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS;
import static sleeper.configuration.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY;
import static sleeper.configuration.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF;
import static sleeper.core.testutils.printers.FileReferencePrinter.printFiles;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.suite.fixtures.SystemTestInstance.COMPACTION_ON_EC2;
import static sleeper.systemtest.suite.fixtures.SystemTestSchema.DEFAULT_SCHEMA;
import static sleeper.systemtest.suite.fixtures.SystemTestSchema.ROW_KEY_FIELD_NAME;
import static sleeper.systemtest.suite.testutil.TestResources.exampleString;

@SystemTest
@Slow
Expand All @@ -70,124 +58,27 @@ void tearDown(SleeperSystemTest sleeper) {
sleeper.compaction().scaleToZero();
}

@Nested
@DisplayName("Merge whole files together")
class MergeFiles {

@Test
void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) {
// Given
sleeper.updateTableProperties(Map.of(
COMPACTION_FILES_BATCH_SIZE, "5"));
// Files with records 9, 9, 9, 9, 10 (which match SizeRatioStrategy criteria)
RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 46));
sleeper.ingest().direct(tempDir)
.numberedRecords(numbers.range(0, 9))
.numberedRecords(numbers.range(9, 18))
.numberedRecords(numbers.range(18, 27))
.numberedRecords(numbers.range(27, 36))
.numberedRecords(numbers.range(36, 46));

// When
sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}

@Test
void shouldCompactFilesUsingBasicCompactionStrategy(SleeperSystemTest sleeper) {
// Given
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100));
sleeper.ingest().direct(tempDir)
.numberedRecords(numbers.range(0, 25))
.numberedRecords(numbers.range(25, 50))
.numberedRecords(numbers.range(50, 75))
.numberedRecords(numbers.range(75, 100));

// When
sleeper.compaction().createJobs(2).invokeTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}
}

@Nested
@DisplayName("Merge parts of files referenced on multiple partitions")
class MergePartialFiles {

@BeforeEach
void setUp(SleeperSystemTest sleeper) {
sleeper.setGeneratorOverrides(overrideField(
ROW_KEY_FIELD_NAME, numberStringAndZeroPadTo(2).then(addPrefix("row-"))));
sleeper.partitioning().setPartitions(new PartitionsBuilder(DEFAULT_SCHEMA)
.rootFirst("root")
.splitToNewChildren("root", "L", "R", "row-50")
.splitToNewChildren("L", "LL", "LR", "row-25")
.splitToNewChildren("R", "RL", "RR", "row-75")
.buildTree());
}

@Test
void shouldCompactOneFileIntoExistingFilesOnLeafPartitions(SleeperSystemTest sleeper) throws Exception {
// Given a compaction strategy which will always compact two files together
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
// A file which we add to all 4 leaf partitions
sleeper.sourceFiles().inDataBucket().writeSketches()
.createWithNumberedRecords("file.parquet", LongStream.range(0, 50).map(n -> n * 2));
sleeper.ingest().toStateStore().addFileWithRecordEstimatesOnPartitions(
"file.parquet", Map.of(
"LL", 12L,
"LR", 12L,
"RL", 12L,
"RR", 12L));
// And a file in each leaf partition
sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString()));
sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1));

// When we run compaction
sleeper.compaction().createJobs(4).invokeTasks(1).waitForJobs();

// Then the same records should be present, in one file on each leaf partition
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}

@Test
void shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions(SleeperSystemTest sleeper) throws Exception {
// Given a compaction strategy which will always compact two files together
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
// And a file which we add to the root partition
sleeper.sourceFiles().inDataBucket().writeSketches()
.createWithNumberedRecords("file.parquet", LongStream.range(0, 50).map(n -> n * 2));
sleeper.ingest().toStateStore().addFileOnPartition("file.parquet", "root", 50);
// And a file in each leaf partition
sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString()));
sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1));

// When we split the file from the root partition into separate references in the leaf partitions
// And we run compaction
sleeper.compaction()
.createJobs(0).createJobs(4) // Split down two levels of the tree
.invokeTasks(1).waitForJobs();

// Then the same records should be present, in one file on each leaf partition
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}
@Test
void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) {
// Given
sleeper.updateTableProperties(Map.of(
COMPACTION_FILES_BATCH_SIZE, "5"));
// Files with records 9, 9, 9, 9, 10 (which match SizeRatioStrategy criteria)
RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 46));
sleeper.ingest().direct(tempDir)
.numberedRecords(numbers.range(0, 9))
.numberedRecords(numbers.range(9, 18))
.numberedRecords(numbers.range(18, 27))
.numberedRecords(numbers.range(27, 36))
.numberedRecords(numbers.range(36, 46));

// When
sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46)));
assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()))
.isEqualTo(exampleString("compaction/compacted5ToSingleFile.txt"));
}
}
Loading

0 comments on commit 823ec6d

Please sign in to comment.