Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2875 - System test Rust compaction #3024

Merged
merged 6 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import sleeper.configuration.jars.ObjectFactory;
import sleeper.configuration.properties.table.TableProperties;
import sleeper.configuration.properties.table.TablePropertiesProvider;
import sleeper.configuration.properties.table.TableProperty;
import sleeper.configuration.properties.validation.CompactionMethod;
import sleeper.statestore.StateStoreProvider;

import java.util.Locale;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_METHOD;

/**
* Determines which compaction algorithm should be run based on the table and instance configuration properties and
Expand All @@ -55,33 +55,30 @@ public DefaultSelector(
public CompactionRunner chooseCompactor(CompactionJob job) {
TableProperties tableProperties = tablePropertiesProvider
.getById(job.getTableId());
String method = tableProperties.get(TableProperty.COMPACTION_METHOD).toUpperCase(Locale.UK);

// Convert to enum value and default to Java
CompactionMethod desired;
try {
desired = CompactionMethod.valueOf(method);
} catch (IllegalArgumentException e) {
desired = CompactionMethod.DEFAULT;
}

CompactionRunner defaultRunner = new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration);
CompactionRunner runner = defaultRunner;
switch (desired) {
case RUST:
runner = new RustCompaction(tablePropertiesProvider, stateStoreProvider);
break;
default:
break;
}
CompactionMethod method = tableProperties.getEnumValue(COMPACTION_METHOD, CompactionMethod.class);
CompactionRunner runner = createRunnerForMethod(method);

// Is an iterator specifed? If so can we support this?
if (job.getIteratorClassName() != null && !runner.supportsIterators()) {
LOGGER.debug("Table has an iterator set, which compactor {} doesn't support, falling back to default", runner.getClass().getSimpleName());
runner = defaultRunner;
runner = createJavaRunner();
}

LOGGER.info("Selecting {} compactor (language {}) for job ID {} table ID {}", runner.getClass().getSimpleName(), runner.implementationLanguage(), job.getId(), job.getTableId());
return runner;
}

private CompactionRunner createRunnerForMethod(CompactionMethod method) {
switch (method) {
case RUST:
return new RustCompaction(tablePropertiesProvider, stateStoreProvider);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any scenario where we'd need to create the RustCompaction outside this method in the future, if so moving it out to a separate private method like the java runner might be worth it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that'd be easy enough that there's no reason to do it now, unless it'd actually read better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just a note for future ideas, approved as it was fine without

case JAVA:
default:
return createJavaRunner();
}
}

private CompactionRunner createJavaRunner() {
return new StandardCompactor(tablePropertiesProvider, stateStoreProvider, objectFactory, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sleeper.configuration.Utils;
import sleeper.configuration.properties.SleeperPropertyIndex;
import sleeper.configuration.properties.validation.CompactionECSLaunchType;
import sleeper.configuration.properties.validation.CompactionMethod;

import java.util.List;

Expand Down Expand Up @@ -262,7 +263,8 @@ public interface CompactionProperty {
UserDefinedInstanceProperty DEFAULT_COMPACTION_METHOD = Index.propertyBuilder("sleeper.default.table.compaction.method")
.description("Select what compaction method to use on a table. Current options are JAVA and RUST. Rust compaction support is " +
"experimental.")
.defaultValue("JAVA")
.defaultValue(CompactionMethod.JAVA.toString())
.validationPredicate(CompactionMethod::isValid)
.propertyGroup(InstancePropertyGroup.COMPACTION).build();

static List<UserDefinedInstanceProperty> getAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.compaction.job.execution;
package sleeper.configuration.properties.validation;

import org.apache.commons.lang3.EnumUtils;

/**
* Different compaction methods for Sleeper which support different capabilities and must be
Expand All @@ -29,4 +31,8 @@ public enum CompactionMethod {
RUST;

public static final CompactionMethod DEFAULT = CompactionMethod.JAVA;

public static boolean isValid(String value) {
return EnumUtils.isValidEnumIgnoreCase(CompactionMethod.class, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,11 @@

package sleeper.systemtest.suite;

import org.approvaltests.Approvals;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import sleeper.compaction.strategy.impl.BasicCompactionStrategy;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.extension.AfterTestPurgeQueues;
import sleeper.systemtest.dsl.extension.AfterTestReports;
Expand All @@ -41,16 +36,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS;
import static sleeper.configuration.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY;
import static sleeper.configuration.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF;
import static sleeper.core.testutils.printers.FileReferencePrinter.printFiles;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.suite.fixtures.SystemTestInstance.COMPACTION_ON_EC2;
import static sleeper.systemtest.suite.fixtures.SystemTestSchema.DEFAULT_SCHEMA;
import static sleeper.systemtest.suite.fixtures.SystemTestSchema.ROW_KEY_FIELD_NAME;
import static sleeper.systemtest.suite.testutil.TestResources.exampleString;

@SystemTest
@Slow
Expand All @@ -70,124 +58,27 @@ void tearDown(SleeperSystemTest sleeper) {
sleeper.compaction().scaleToZero();
}

@Nested
@DisplayName("Merge whole files together")
class MergeFiles {

@Test
void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) {
// Given
sleeper.updateTableProperties(Map.of(
COMPACTION_FILES_BATCH_SIZE, "5"));
// Files with records 9, 9, 9, 9, 10 (which match SizeRatioStrategy criteria)
RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 46));
sleeper.ingest().direct(tempDir)
.numberedRecords(numbers.range(0, 9))
.numberedRecords(numbers.range(9, 18))
.numberedRecords(numbers.range(18, 27))
.numberedRecords(numbers.range(27, 36))
.numberedRecords(numbers.range(36, 46));

// When
sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}

@Test
void shouldCompactFilesUsingBasicCompactionStrategy(SleeperSystemTest sleeper) {
// Given
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 100));
sleeper.ingest().direct(tempDir)
.numberedRecords(numbers.range(0, 25))
.numberedRecords(numbers.range(25, 50))
.numberedRecords(numbers.range(50, 75))
.numberedRecords(numbers.range(75, 100));

// When
sleeper.compaction().createJobs(2).invokeTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}
}

@Nested
@DisplayName("Merge parts of files referenced on multiple partitions")
class MergePartialFiles {

@BeforeEach
void setUp(SleeperSystemTest sleeper) {
sleeper.setGeneratorOverrides(overrideField(
ROW_KEY_FIELD_NAME, numberStringAndZeroPadTo(2).then(addPrefix("row-"))));
sleeper.partitioning().setPartitions(new PartitionsBuilder(DEFAULT_SCHEMA)
.rootFirst("root")
.splitToNewChildren("root", "L", "R", "row-50")
.splitToNewChildren("L", "LL", "LR", "row-25")
.splitToNewChildren("R", "RL", "RR", "row-75")
.buildTree());
}

@Test
void shouldCompactOneFileIntoExistingFilesOnLeafPartitions(SleeperSystemTest sleeper) throws Exception {
// Given a compaction strategy which will always compact two files together
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
// A file which we add to all 4 leaf partitions
sleeper.sourceFiles().inDataBucket().writeSketches()
.createWithNumberedRecords("file.parquet", LongStream.range(0, 50).map(n -> n * 2));
sleeper.ingest().toStateStore().addFileWithRecordEstimatesOnPartitions(
"file.parquet", Map.of(
"LL", 12L,
"LR", 12L,
"RL", 12L,
"RR", 12L));
// And a file in each leaf partition
sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString()));
sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1));

// When we run compaction
sleeper.compaction().createJobs(4).invokeTasks(1).waitForJobs();

// Then the same records should be present, in one file on each leaf partition
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}

@Test
void shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions(SleeperSystemTest sleeper) throws Exception {
// Given a compaction strategy which will always compact two files together
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
// And a file which we add to the root partition
sleeper.sourceFiles().inDataBucket().writeSketches()
.createWithNumberedRecords("file.parquet", LongStream.range(0, 50).map(n -> n * 2));
sleeper.ingest().toStateStore().addFileOnPartition("file.parquet", "root", 50);
// And a file in each leaf partition
sleeper.updateTableProperties(Map.of(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF.toString()));
sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1));

// When we split the file from the root partition into separate references in the leaf partitions
// And we run compaction
sleeper.compaction()
.createJobs(0).createJobs(4) // Split down two levels of the tree
.invokeTasks(1).waitForJobs();

// Then the same records should be present, in one file on each leaf partition
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 100)));
Approvals.verify(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()));
}
@Test
void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper) {
// Given
sleeper.updateTableProperties(Map.of(
COMPACTION_FILES_BATCH_SIZE, "5"));
// Files with records 9, 9, 9, 9, 10 (which match SizeRatioStrategy criteria)
RecordNumbers numbers = sleeper.scrambleNumberedRecords(LongStream.range(0, 46));
sleeper.ingest().direct(tempDir)
.numberedRecords(numbers.range(0, 9))
.numberedRecords(numbers.range(9, 18))
.numberedRecords(numbers.range(18, 27))
.numberedRecords(numbers.range(27, 36))
.numberedRecords(numbers.range(36, 46));

// When
sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(LongStream.range(0, 46)));
assertThat(printFiles(sleeper.partitioning().tree(), sleeper.tableFiles().all()))
.isEqualTo(exampleString("compaction/compacted5ToSingleFile.txt"));
}
}
Loading
Loading