Skip to content

Commit

Permalink
Set up ParallelCompactionsTest
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Mar 12, 2024
1 parent fb44ed9 commit b1b330a
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ static GenerateNumberedValue forField(KeyType keyType, Field field) {
}
if (fieldType instanceof StringType) {
switch (keyType) {
case ROW:
default:
return numberStringAndZeroPadTo(19).then(addPrefix("row-"));
case SORT:
return numberStringAndZeroPadTo(19).then(addPrefix("sort-"));
case VALUE:
return numberStringAndZeroPadTo(19).then(addPrefix("Value "));
case ROW:
default:
return numberStringAndZeroPadTo(19).then(addPrefix("row-"));
case SORT:
return numberStringAndZeroPadTo(19).then(addPrefix("sort-"));
case VALUE:
return numberStringAndZeroPadTo(19).then(addPrefix("Value "));
}
}
if (fieldType instanceof ByteArrayType) {
Expand All @@ -65,7 +65,11 @@ static GenerateNumberedValue forField(KeyType keyType, Field field) {
}

static GenerateNumberedValue numberStringAndZeroPadTo(int size) {
return num -> StringUtils.leftPad(num + "", size, "0");
return num -> numberStringAndZeroPadTo(size, num);
}

static String numberStringAndZeroPadTo(int size, long number) {
return StringUtils.leftPad(number + "", size, "0");
}

static UnaryOperator<Object> addPrefix(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.MAIN;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties;

@InMemoryDslTest
public class CompactionTest {

@BeforeEach
void setUp(SleeperSystemTest sleeper) throws Exception {
sleeper.connectToInstance(withDefaultProperties("main"));
sleeper.connectToInstance(MAIN);
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.dsl.compaction;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import sleeper.compaction.strategy.impl.BasicCompactionStrategy;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.schema.Schema;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.testutil.InMemoryDslTest;

import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.PARALLEL_COMPACTIONS;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME;

@InMemoryDslTest
@Disabled("TODO")
public class ParallelCompactionsTest {
private final Schema schema = DEFAULT_SCHEMA;
public static final int NUMBER_OF_COMPACTIONS = 5;

@BeforeEach
void setUp(SleeperSystemTest sleeper) throws Exception {
sleeper.connectToInstance(PARALLEL_COMPACTIONS);
}

@Test
void shouldApplyOneCompactionPerPartition(SleeperSystemTest sleeper) {
// Given we have partitions split evenly across the intended range of records
sleeper.setGeneratorOverrides(overrideField(ROW_KEY_FIELD_NAME,
numberStringAndZeroPadTo(3).then(addPrefix("row-"))));
sleeper.partitioning().setPartitions(new PartitionsBuilder(schema)
.leavesWithSplits(
IntStream.range(0, NUMBER_OF_COMPACTIONS)
.mapToObj(i -> "" + i)
.collect(toUnmodifiableList()),
IntStream.range(1, NUMBER_OF_COMPACTIONS)
.map(i -> 10000 * i / NUMBER_OF_COMPACTIONS)
.mapToObj(i -> numberStringAndZeroPadTo(3, i))
.collect(toUnmodifiableList()))
.anyTreeJoiningAllLeaves()
.buildTree());
// And we have records spread across all partitions in two files per partition
sleeper.ingest().direct(null)
.numberedRecords(LongStream.range(0, 10000))
.numberedRecords(LongStream.range(0, 10000));
// And we configure to compact every partition
sleeper.updateTableProperties(Map.of(
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));

// When we run compaction
sleeper.compaction().createJobs(NUMBER_OF_COMPACTIONS).waitForJobs();

// Then we have one output file per compaction
assertThat(sleeper.tableFiles().references())
.hasSize(NUMBER_OF_COMPACTIONS);
// And we have the same records afterwards
assertThat(sleeper.directQuery().allRecordsInTable())
.containsExactlyInAnyOrderElementsOf(sleeper.generateNumberedRecords(
LongStream.range(0, 10000)
.flatMap(i -> LongStream.of(i, i))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static sleeper.systemtest.dsl.instance.SystemTestInstanceConfiguration.usingSystemTestDefaults;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.createDslInstanceProperties;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.createDslTableProperties;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties;

@InMemoryDslTest
Expand Down Expand Up @@ -99,8 +100,7 @@ void shouldSetPartitionsForMultipleTables(SleeperSystemTest sleeper) {
.buildTree();

// When
sleeper.tables().forEach(() ->
sleeper.partitioning().setPartitions(partitions));
sleeper.tables().forEach(() -> sleeper.partitioning().setPartitions(partitions));

// Then
assertThat(sleeper.partitioning().treeByTable())
Expand Down Expand Up @@ -199,7 +199,7 @@ void shouldGenerateNameForPredefinedTable(SleeperSystemTest sleeper) {
// When
sleeper.connectToInstance(usingSystemTestDefaults("predeftable", () -> {
InstanceProperties instanceProperties = createDslInstanceProperties();
TableProperties tableProperties = createTestTableProperties(instanceProperties, DEFAULT_SCHEMA);
TableProperties tableProperties = createDslTableProperties(instanceProperties);
tableProperties.set(TABLE_NAME, "predefined-test-table");
return new DeployInstanceConfiguration(instanceProperties, tableProperties);
}));
Expand All @@ -215,7 +215,7 @@ void shouldRefusePredefinedTableWithNoName(SleeperSystemTest sleeper) {
// Given
SystemTestInstanceConfiguration configuration = usingSystemTestDefaults("nonametable", () -> {
InstanceProperties instanceProperties = createDslInstanceProperties();
TableProperties tableProperties = createTestTableProperties(instanceProperties, DEFAULT_SCHEMA);
TableProperties tableProperties = createDslTableProperties(instanceProperties);
tableProperties.unset(TABLE_NAME);
return new DeployInstanceConfiguration(instanceProperties, tableProperties);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.schema.type.StringType;
import sleeper.systemtest.dsl.compaction.ParallelCompactionsTest;
import sleeper.systemtest.dsl.instance.SystemTestInstanceConfiguration;

import java.util.function.Consumer;

import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.INGEST_JOB_QUEUE_URL;
import static sleeper.configuration.properties.instance.CommonProperty.FILE_SYSTEM;
import static sleeper.configuration.properties.instance.CommonProperty.RETAIN_INFRA_AFTER_DESTROY;
import static sleeper.configuration.properties.instance.CompactionProperty.MAXIMUM_CONCURRENT_COMPACTION_TASKS;
import static sleeper.configuration.properties.instance.DefaultProperty.DEFAULT_INGEST_PARTITION_FILE_WRITER_TYPE;
import static sleeper.configuration.properties.table.TablePropertiesTestHelper.createTestTableProperties;
import static sleeper.configuration.properties.table.TableProperty.TABLE_ID;
Expand All @@ -48,16 +52,23 @@ private InMemoryTestInstance() {
.valueFields(new Field(VALUE_FIELD_NAME, new StringType()))
.build();
public static final SystemTestInstanceConfiguration MAIN = withDefaultProperties("main");
public static final SystemTestInstanceConfiguration PARALLEL_COMPACTIONS = withInstanceProperties("compact", properties -> {
properties.setNumber(MAXIMUM_CONCURRENT_COMPACTION_TASKS, ParallelCompactionsTest.NUMBER_OF_COMPACTIONS);
});

public static SystemTestInstanceConfiguration withDefaultProperties(String identifier) {
return withInstanceProperties(identifier, properties -> {
});
}

public static SystemTestInstanceConfiguration withInstanceProperties(
String identifier, Consumer<InstanceProperties> config) {
return usingSystemTestDefaults(identifier, () -> {
InstanceProperties instanceProperties = createDslInstanceProperties();
TableProperties tableProperties = createTestTableProperties(instanceProperties, DEFAULT_SCHEMA);
tableProperties.unset(TABLE_ID);
tableProperties.set(TABLE_NAME, "system-test");
config.accept(instanceProperties);
return DeployInstanceConfiguration.builder()
.instanceProperties(instanceProperties)
.tableProperties(tableProperties)
.tableProperties(createDslTableProperties(instanceProperties))
.build();
});
}
Expand All @@ -70,4 +81,11 @@ public static InstanceProperties createDslInstanceProperties() {
instanceProperties.set(INGEST_JOB_QUEUE_URL, "in-memory-ingest-job-queue-url");
return instanceProperties;
}

public static TableProperties createDslTableProperties(InstanceProperties instanceProperties) {
TableProperties tableProperties = createTestTableProperties(instanceProperties, DEFAULT_SCHEMA);
tableProperties.unset(TABLE_ID);
tableProperties.set(TABLE_NAME, "system-test");
return tableProperties;
}
}

0 comments on commit b1b330a

Please sign in to comment.