diff --git a/java/cdk/src/main/java/sleeper/cdk/util/Utils.java b/java/cdk/src/main/java/sleeper/cdk/util/Utils.java index 8575c903c0..ed4a145352 100644 --- a/java/cdk/src/main/java/sleeper/cdk/util/Utils.java +++ b/java/cdk/src/main/java/sleeper/cdk/util/Utils.java @@ -112,7 +112,11 @@ private static String createToolOptions(InstanceProperties instanceProperties) { .filter(instanceProperties::isSet) .forEach(s -> sb.append("-D").append(s.getPropertyName()) .append("=").append(instanceProperties.get(s)).append(" ")); - + Stream.of("java.base/java.nio=ALL-UNNAMED", + "java.base/sun.nio.ch=ALL-UNNAMED", + "java.base/java.util=ALL-UNNAMED", + "java.base/java.lang.invoke=ALL-UNNAMED") + .forEach(s -> sb.append("--add-opens=").append(s).append(" ")); return sb.toString(); } diff --git a/java/core/src/main/java/sleeper/core/statestore/AllReferencesToAllFiles.java b/java/core/src/main/java/sleeper/core/statestore/AllReferencesToAllFiles.java index b391a7fa80..78b989adc7 100644 --- a/java/core/src/main/java/sleeper/core/statestore/AllReferencesToAllFiles.java +++ b/java/core/src/main/java/sleeper/core/statestore/AllReferencesToAllFiles.java @@ -24,6 +24,7 @@ import java.util.TreeMap; import java.util.stream.Stream; +import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableList; /** @@ -67,6 +68,18 @@ public List listFileReferences() { .collect(toUnmodifiableList()); } + /** + * Builds a map from filename to the number of referenced records. + * + * @return the map + */ + public Map recordsByFilename() { + return getFilesWithReferences().stream() + .collect(toMap( + AllReferencesToAFile::getFilename, + file -> file.getReferences().stream().mapToLong(FileReference::getNumberOfRecords).sum())); + } + public boolean isMoreThanMax() { return moreThanMax; } diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java index 8442eb87e0..2141c5ca74 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java @@ -17,30 +17,71 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.cloudwatchevents.CloudWatchEventsClient; +import sleeper.core.partition.PartitionTree; import sleeper.core.properties.instance.InstanceProperties; +import sleeper.core.properties.table.TableProperties; +import sleeper.core.statestore.AllReferencesToAllFiles; +import sleeper.core.statestore.transactionlog.StateStoreFiles; +import sleeper.core.statestore.transactionlog.StateStorePartitions; +import sleeper.core.statestore.transactionlog.TransactionLogSnapshot; +import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore; +import sleeper.systemtest.drivers.util.SystemTestClients; import sleeper.systemtest.dsl.snapshot.SnapshotsDriver; +import java.util.Optional; + +import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_RULE; public class AwsSnapshotsDriver implements SnapshotsDriver { public static final Logger LOGGER = LoggerFactory.getLogger(AwsSnapshotsDriver.class); - private final CloudWatchEventsClient cwClient; + private final SystemTestClients clients; - public AwsSnapshotsDriver(CloudWatchEventsClient cwClient) { - this.cwClient = cwClient; + public AwsSnapshotsDriver(SystemTestClients clients) { + this.clients = clients; } @Override public void enableCreation(InstanceProperties instanceProperties) { LOGGER.info("Enabling transaction log snapshot creation"); - cwClient.enableRule(request -> request.name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE))); + clients.getCloudWatchEvents().enableRule(request -> request + .name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE))); } @Override public void disableCreation(InstanceProperties instanceProperties) { LOGGER.info("Disabling transaction log snapshot creation"); - cwClient.disableRule(request -> request.name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE))); + clients.getCloudWatchEvents().disableRule(request -> request + .name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE))); + } + + @Override + public Optional loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + return snapshotStore(instanceProperties, tableProperties) + .loadLatestFilesSnapshotIfAtMinimumTransaction(0) + .map(AwsSnapshotsDriver::readFiles); + } + + @Override + public Optional loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + return snapshotStore(instanceProperties, tableProperties) + .loadLatestPartitionsSnapshotIfAtMinimumTransaction(0) + .map(AwsSnapshotsDriver::readPartitions); + } + + private DynamoDBTransactionLogSnapshotStore snapshotStore(InstanceProperties instanceProperties, TableProperties tableProperties) { + return new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties, + clients.getDynamoDB(), clients.createHadoopConf(instanceProperties, tableProperties)); + } + + private static AllReferencesToAllFiles readFiles(TransactionLogSnapshot snapshot) { + StateStoreFiles state = snapshot.getState(); + return new AllReferencesToAllFiles(state.referencedAndUnreferenced(), false); + } + + private static PartitionTree readPartitions(TransactionLogSnapshot snapshot) { + StateStorePartitions state = snapshot.getState(); + return new PartitionTree(state.all().stream().collect(toUnmodifiableList())); } } diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java index 1299a2d944..e462449ed5 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java @@ -266,7 +266,7 @@ public PurgeQueueDriver purgeQueues(SystemTestContext context) { @Override public SnapshotsDriver snapshots() { - return new AwsSnapshotsDriver(clients.getCloudWatchEvents()); + return new AwsSnapshotsDriver(clients); } } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/DeployedSleeperTablesForTest.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/DeployedSleeperTablesForTest.java index 399e47600a..e0b2686a1b 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/DeployedSleeperTablesForTest.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/DeployedSleeperTablesForTest.java @@ -62,7 +62,7 @@ public void addTablesAndSetCurrent(SleeperTablesDriver driver, List tables) { + private void addTables(SleeperTablesDriver driver, List tables) { LOGGER.info("Adding {} tables with instance ID: {}", tables.size(), instanceProperties.get(ID)); tables.stream().parallel().forEach(tableProperties -> driver.addTable(instanceProperties, tableProperties)); tables.forEach(tableProperties -> { diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestInstanceContext.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestInstanceContext.java index 692a32b8ae..4bac615ced 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestInstanceContext.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestInstanceContext.java @@ -100,10 +100,11 @@ public void createTables(int numberOfTables, Schema schema, Map setProperties) { TableProperties tableProperties = parameters.createTableProperties(getInstanceProperties(), schema); tableProperties.set(TABLE_NAME, name + "-" + UUID.randomUUID()); - currentTables().addTables(tablesDriver(), List.of(tableProperties)); + setProperties.forEach(tableProperties::set); + currentTables().addTablesAndSetCurrent(tablesDriver(), List.of(tableProperties)); tablesByTestName.put(name, tableProperties); testNameByTableId.put(tableProperties.get(TABLE_ID), name); } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTableFiles.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTableFiles.java index 61336ea4d3..bf2aaf98b6 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTableFiles.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTableFiles.java @@ -53,6 +53,10 @@ public List references() { } } + public Map recordsByFilename() { + return all().recordsByFilename(); + } + public Map filesByTable() { return instance.streamTableProperties().parallel() .map(this::getFiles) diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTables.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTables.java index 241f9f3f96..fd3ff78c95 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTables.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestTables.java @@ -37,12 +37,17 @@ public void createMany(int numberOfTables, Schema schema) { } public SystemTestTables create(String name, Schema schema) { - instance.createTable(name, schema); + instance.createTable(name, schema, Map.of()); + return this; + } + + public SystemTestTables createWithProperties(String name, Schema schema, Map setProperties) { + instance.createTable(name, schema, setProperties); return this; } public SystemTestTables create(List names, Schema schema) { - names.forEach(name -> instance.createTable(name, schema)); + names.forEach(name -> instance.createTable(name, schema, Map.of())); return this; } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java index a7e968f9b6..67edcbf435 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java @@ -15,10 +15,29 @@ */ package sleeper.systemtest.dsl.snapshot; +import sleeper.core.partition.PartitionTree; import sleeper.core.properties.instance.InstanceProperties; +import sleeper.core.properties.table.TableProperties; +import sleeper.core.statestore.AllReferencesToAllFiles; + +import java.util.Optional; + +import static sleeper.core.properties.instance.CommonProperty.ID; public interface SnapshotsDriver { void enableCreation(InstanceProperties instanceProperties); void disableCreation(InstanceProperties instanceProperties); + + default Optional loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + throw new UnsupportedOperationException( + "Requested loading files snapshot for instance " + instanceProperties.get(ID) + + ", table " + tableProperties.getStatus() + ", not currently implemented for this driver"); + } + + default Optional loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + throw new UnsupportedOperationException( + "Requested loading partitions snapshot for instance " + instanceProperties.get(ID) + + ", table " + tableProperties.getStatus() + ", not currently implemented for this driver"); + } } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java new file mode 100644 index 0000000000..eaaf13f08e --- /dev/null +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.dsl.snapshot; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sleeper.core.partition.PartitionTree; +import sleeper.core.statestore.AllReferencesToAllFiles; +import sleeper.core.util.PollWithRetries; +import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; + +import java.util.Optional; +import java.util.function.Predicate; + +public class WaitForSnapshot { + public static final Logger LOGGER = LoggerFactory.getLogger(WaitForSnapshot.class); + + private final SystemTestInstanceContext instance; + private final SnapshotsDriver driver; + + public WaitForSnapshot(SystemTestInstanceContext instance, SnapshotsDriver driver) { + this.instance = instance; + this.driver = driver; + } + + public AllReferencesToAllFiles waitForFilesSnapshot(PollWithRetries poll, Predicate condition) throws InterruptedException { + LOGGER.info("Waiting for files snapshot"); + return poll.queryUntil("files snapshot is present", this::loadLatestFilesSnapshot, matches(condition)) + .orElseThrow(); + } + + public PartitionTree waitForPartitionsSnapshot(PollWithRetries poll, Predicate condition) throws InterruptedException { + LOGGER.info("Waiting for partitions snapshot"); + return poll.queryUntil("partitions snapshot is present", this::loadLatestPartitionsSnapshot, matches(condition)) + .orElseThrow(); + } + + private Optional loadLatestFilesSnapshot() { + Optional snapshotOpt = driver.loadLatestFilesSnapshot( + instance.getInstanceProperties(), instance.getTableProperties()); + if (!snapshotOpt.isPresent()) { + LOGGER.info("Found no files snapshot"); + } else { + LOGGER.info("Found {} files in snapshot", snapshotOpt.get().getFiles().size()); + } + return snapshotOpt; + } + + private Optional loadLatestPartitionsSnapshot() { + Optional snapshotOpt = driver.loadLatestPartitionsSnapshot( + instance.getInstanceProperties(), instance.getTableProperties()); + if (!snapshotOpt.isPresent()) { + LOGGER.info("Found no partitions snapshot"); + } else { + LOGGER.info("Found {} partitions in snapshot", snapshotOpt.get().getAllPartitions().size()); + } + return snapshotOpt; + } + + private static Predicate> matches(Predicate condition) { + return opt -> opt.isPresent() && condition.test(opt.get()); + } +} diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java index 52a6df77ad..41cbe49e85 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/SystemTestStateStore.java @@ -18,13 +18,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sleeper.core.partition.PartitionTree; +import sleeper.core.statestore.AllReferencesToAllFiles; import sleeper.core.table.TableStatus; +import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SystemTestContext; import sleeper.systemtest.dsl.SystemTestDrivers; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; +import sleeper.systemtest.dsl.snapshot.SnapshotsDriver; +import sleeper.systemtest.dsl.snapshot.WaitForSnapshot; import java.time.Instant; import java.util.Map; +import java.util.function.Predicate; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; @@ -36,12 +42,14 @@ public class SystemTestStateStore { private final SystemTestContext context; private final StateStoreCommitterDriver driver; private final StateStoreCommitterLogsDriver logsDriver; + private final SnapshotsDriver snapshotsDriver; public SystemTestStateStore(SystemTestContext context) { this.context = context; SystemTestDrivers adminDrivers = context.instance().adminDrivers(); driver = adminDrivers.stateStoreCommitter(context); logsDriver = adminDrivers.stateStoreCommitterLogs(context); + snapshotsDriver = adminDrivers.snapshots(); } public SystemTestStateStoreFakeCommits fakeCommits() { @@ -77,4 +85,14 @@ private Map commitsPerSecondByTableId() { .collect(toSet())); } + public AllReferencesToAllFiles waitForFilesSnapshot(PollWithRetries intervalAndPollingTimeout, Predicate condition) throws InterruptedException { + return new WaitForSnapshot(context.instance(), snapshotsDriver) + .waitForFilesSnapshot(intervalAndPollingTimeout, condition); + } + + public PartitionTree waitForPartitionsSnapshot(PollWithRetries intervalAndPollingTimeout, Predicate condition) throws InterruptedException { + return new WaitForSnapshot(context.instance(), snapshotsDriver) + .waitForPartitionsSnapshot(intervalAndPollingTimeout, condition); + } + } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java index d1e4ca0029..229d7c31c2 100644 --- a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreCommitterST.java @@ -20,7 +20,6 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; -import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.SleeperSystemTest; @@ -28,7 +27,6 @@ import sleeper.systemtest.suite.testutil.SystemTest; import java.time.Duration; -import java.util.Map; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -60,9 +58,7 @@ void shouldAddManyFiles(SleeperSystemTest sleeper) throws Exception { .waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3))); // Then - Map recordsByFilename = sleeper.tableFiles().references().stream() - .collect(toMap(FileReference::getFilename, FileReference::getNumberOfRecords)); - assertThat(recordsByFilename).isEqualTo( + assertThat(sleeper.tableFiles().recordsByFilename()).isEqualTo( LongStream.rangeClosed(1, 1000).mapToObj(i -> i) .collect(toMap(i -> "file-" + i + ".parquet", i -> i))); } diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreSnapshotsST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreSnapshotsST.java new file mode 100644 index 0000000000..85a8c0a30d --- /dev/null +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/StateStoreSnapshotsST.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.suite; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import sleeper.core.partition.PartitionTree; +import sleeper.core.partition.PartitionsBuilder; +import sleeper.core.statestore.AllReferencesToAllFiles; +import sleeper.core.statestore.FileReferenceFactory; +import sleeper.core.util.PollWithRetries; +import sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore; +import sleeper.systemtest.dsl.SleeperSystemTest; +import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage; +import sleeper.systemtest.suite.testutil.SystemTest; + +import java.time.Duration; +import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static java.util.stream.Collectors.toMap; +import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.core.properties.table.TableProperty.STATESTORE_CLASSNAME; +import static sleeper.systemtest.suite.fixtures.SystemTestInstance.MAIN; +import static sleeper.systemtest.suite.fixtures.SystemTestSchema.DEFAULT_SCHEMA; + +@SystemTest +public class StateStoreSnapshotsST { + + @BeforeEach + void setUp(SleeperSystemTest sleeper) { + sleeper.connectToInstanceNoTables(MAIN); + } + + @Test + void shouldAddManyFiles(SleeperSystemTest sleeper) throws Exception { + // Given + sleeper.tables().createWithProperties("snapshots", DEFAULT_SCHEMA, + Map.of(STATESTORE_CLASSNAME, DynamoDBTransactionLogStateStore.class.getName())); + PartitionTree partitions = new PartitionsBuilder(DEFAULT_SCHEMA).singlePartition("root").buildTree(); + sleeper.partitioning().setPartitions(partitions); + + // When + FileReferenceFactory fileFactory = FileReferenceFactory.from(partitions); + sleeper.stateStore().fakeCommits() + .sendBatched(IntStream.rangeClosed(1, 1000) + .mapToObj(i -> fileFactory.rootFile("file-" + i + ".parquet", i)) + .map(StateStoreCommitMessage::addFile)); + + // Then a snapshot will be created + AllReferencesToAllFiles snapshotFiles = sleeper.stateStore().waitForFilesSnapshot( + PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(10)), + files -> files.getFiles().size() == 1000); + Map expectedRecordsByFilename = LongStream + .rangeClosed(1, 1000).mapToObj(i -> i) + .collect(toMap(i -> "file-" + i + ".parquet", i -> i)); + assertThat(snapshotFiles.recordsByFilename()) + .isEqualTo(expectedRecordsByFilename); + assertThat(sleeper.tableFiles().recordsByFilename()) + .isEqualTo(expectedRecordsByFilename); + } + +}