Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3527 - Fix snapshot creation #3530

Merged
merged 10 commits into from
Oct 23, 2024
6 changes: 5 additions & 1 deletion java/cdk/src/main/java/sleeper/cdk/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ private static String createToolOptions(InstanceProperties instanceProperties) {
.filter(instanceProperties::isSet)
.forEach(s -> sb.append("-D").append(s.getPropertyName())
.append("=").append(instanceProperties.get(s)).append(" "));

Stream.of("java.base/java.nio=ALL-UNNAMED",
"java.base/sun.nio.ch=ALL-UNNAMED",
"java.base/java.util=ALL-UNNAMED",
"java.base/java.lang.invoke=ALL-UNNAMED")
.forEach(s -> sb.append("--add-opens=").append(s).append(" "));
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.TreeMap;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableList;

/**
Expand Down Expand Up @@ -67,6 +68,18 @@ public List<FileReference> listFileReferences() {
.collect(toUnmodifiableList());
}

/**
* Builds a map from filename to the number of referenced records.
*
* @return the map
*/
public Map<String, Long> recordsByFilename() {
return getFilesWithReferences().stream()
.collect(toMap(
AllReferencesToAFile::getFilename,
file -> file.getReferences().stream().mapToLong(FileReference::getNumberOfRecords).sum()));
}

public boolean isMoreThanMax() {
return moreThanMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,71 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatchevents.CloudWatchEventsClient;

import sleeper.core.partition.PartitionTree;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.statestore.AllReferencesToAllFiles;
import sleeper.core.statestore.transactionlog.StateStoreFiles;
import sleeper.core.statestore.transactionlog.StateStorePartitions;
import sleeper.core.statestore.transactionlog.TransactionLogSnapshot;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore;
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.snapshot.SnapshotsDriver;

import java.util.Optional;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_RULE;

public class AwsSnapshotsDriver implements SnapshotsDriver {
public static final Logger LOGGER = LoggerFactory.getLogger(AwsSnapshotsDriver.class);
private final CloudWatchEventsClient cwClient;
private final SystemTestClients clients;

public AwsSnapshotsDriver(CloudWatchEventsClient cwClient) {
this.cwClient = cwClient;
public AwsSnapshotsDriver(SystemTestClients clients) {
this.clients = clients;
}

@Override
public void enableCreation(InstanceProperties instanceProperties) {
LOGGER.info("Enabling transaction log snapshot creation");
cwClient.enableRule(request -> request.name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE)));
clients.getCloudWatchEvents().enableRule(request -> request
.name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE)));
}

@Override
public void disableCreation(InstanceProperties instanceProperties) {
LOGGER.info("Disabling transaction log snapshot creation");
cwClient.disableRule(request -> request.name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE)));
clients.getCloudWatchEvents().disableRule(request -> request
.name(instanceProperties.get(TRANSACTION_LOG_SNAPSHOT_CREATION_RULE)));
}

@Override
public Optional<AllReferencesToAllFiles> loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
return snapshotStore(instanceProperties, tableProperties)
.loadLatestFilesSnapshotIfAtMinimumTransaction(0)
.map(AwsSnapshotsDriver::readFiles);
}

@Override
public Optional<PartitionTree> loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
return snapshotStore(instanceProperties, tableProperties)
.loadLatestPartitionsSnapshotIfAtMinimumTransaction(0)
.map(AwsSnapshotsDriver::readPartitions);
}

private DynamoDBTransactionLogSnapshotStore snapshotStore(InstanceProperties instanceProperties, TableProperties tableProperties) {
return new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties,
clients.getDynamoDB(), clients.createHadoopConf(instanceProperties, tableProperties));
}

private static AllReferencesToAllFiles readFiles(TransactionLogSnapshot snapshot) {
StateStoreFiles state = snapshot.getState();
return new AllReferencesToAllFiles(state.referencedAndUnreferenced(), false);
}

private static PartitionTree readPartitions(TransactionLogSnapshot snapshot) {
StateStorePartitions state = snapshot.getState();
return new PartitionTree(state.all().stream().collect(toUnmodifiableList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public PurgeQueueDriver purgeQueues(SystemTestContext context) {

@Override
public SnapshotsDriver snapshots() {
return new AwsSnapshotsDriver(clients.getCloudWatchEvents());
return new AwsSnapshotsDriver(clients);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void addTablesAndSetCurrent(SleeperTablesDriver driver, List<TablePropert
}
}

public void addTables(SleeperTablesDriver driver, List<TableProperties> tables) {
private void addTables(SleeperTablesDriver driver, List<TableProperties> tables) {
LOGGER.info("Adding {} tables with instance ID: {}", tables.size(), instanceProperties.get(ID));
tables.stream().parallel().forEach(tableProperties -> driver.addTable(instanceProperties, tableProperties));
tables.forEach(tableProperties -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ public void createTables(int numberOfTables, Schema schema, Map<TableProperty, S
.collect(toUnmodifiableList()));
}

public void createTable(String name, Schema schema) {
public void createTable(String name, Schema schema, Map<TableProperty, String> setProperties) {
TableProperties tableProperties = parameters.createTableProperties(getInstanceProperties(), schema);
tableProperties.set(TABLE_NAME, name + "-" + UUID.randomUUID());
currentTables().addTables(tablesDriver(), List.of(tableProperties));
setProperties.forEach(tableProperties::set);
currentTables().addTablesAndSetCurrent(tablesDriver(), List.of(tableProperties));
tablesByTestName.put(name, tableProperties);
testNameByTableId.put(tableProperties.get(TABLE_ID), name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public List<FileReference> references() {
}
}

public Map<String, Long> recordsByFilename() {
return all().recordsByFilename();
}

public Map<String, AllReferencesToAllFiles> filesByTable() {
return instance.streamTableProperties().parallel()
.map(this::getFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@ public void createMany(int numberOfTables, Schema schema) {
}

public SystemTestTables create(String name, Schema schema) {
instance.createTable(name, schema);
instance.createTable(name, schema, Map.of());
return this;
}

public SystemTestTables createWithProperties(String name, Schema schema, Map<TableProperty, String> setProperties) {
instance.createTable(name, schema, setProperties);
return this;
}

public SystemTestTables create(List<String> names, Schema schema) {
names.forEach(name -> instance.createTable(name, schema));
names.forEach(name -> instance.createTable(name, schema, Map.of()));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,29 @@
*/
package sleeper.systemtest.dsl.snapshot;

import sleeper.core.partition.PartitionTree;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.statestore.AllReferencesToAllFiles;

import java.util.Optional;

import static sleeper.core.properties.instance.CommonProperty.ID;

public interface SnapshotsDriver {
void enableCreation(InstanceProperties instanceProperties);

void disableCreation(InstanceProperties instanceProperties);

default Optional<AllReferencesToAllFiles> loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
throw new UnsupportedOperationException(
"Requested loading files snapshot for instance " + instanceProperties.get(ID) +
", table " + tableProperties.getStatus() + ", not currently implemented for this driver");
}

default Optional<PartitionTree> loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
throw new UnsupportedOperationException(
"Requested loading partitions snapshot for instance " + instanceProperties.get(ID) +
", table " + tableProperties.getStatus() + ", not currently implemented for this driver");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.dsl.snapshot;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.partition.PartitionTree;
import sleeper.core.statestore.AllReferencesToAllFiles;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;

import java.util.Optional;
import java.util.function.Predicate;

public class WaitForSnapshot {
public static final Logger LOGGER = LoggerFactory.getLogger(WaitForSnapshot.class);

private final SystemTestInstanceContext instance;
private final SnapshotsDriver driver;

public WaitForSnapshot(SystemTestInstanceContext instance, SnapshotsDriver driver) {
this.instance = instance;
this.driver = driver;
}

public AllReferencesToAllFiles waitForFilesSnapshot(PollWithRetries poll, Predicate<AllReferencesToAllFiles> condition) throws InterruptedException {
LOGGER.info("Waiting for files snapshot");
return poll.queryUntil("files snapshot is present", this::loadLatestFilesSnapshot, matches(condition))
.orElseThrow();
}

public PartitionTree waitForPartitionsSnapshot(PollWithRetries poll, Predicate<PartitionTree> condition) throws InterruptedException {
LOGGER.info("Waiting for partitions snapshot");
return poll.queryUntil("partitions snapshot is present", this::loadLatestPartitionsSnapshot, matches(condition))
.orElseThrow();
}

private Optional<AllReferencesToAllFiles> loadLatestFilesSnapshot() {
Optional<AllReferencesToAllFiles> snapshotOpt = driver.loadLatestFilesSnapshot(
instance.getInstanceProperties(), instance.getTableProperties());
if (!snapshotOpt.isPresent()) {
LOGGER.info("Found no files snapshot");
} else {
LOGGER.info("Found {} files in snapshot", snapshotOpt.get().getFiles().size());
}
return snapshotOpt;
}

private Optional<PartitionTree> loadLatestPartitionsSnapshot() {
Optional<PartitionTree> snapshotOpt = driver.loadLatestPartitionsSnapshot(
instance.getInstanceProperties(), instance.getTableProperties());
if (!snapshotOpt.isPresent()) {
LOGGER.info("Found no partitions snapshot");
} else {
LOGGER.info("Found {} partitions in snapshot", snapshotOpt.get().getAllPartitions().size());
}
return snapshotOpt;
}

private static <T> Predicate<Optional<T>> matches(Predicate<T> condition) {
return opt -> opt.isPresent() && condition.test(opt.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.partition.PartitionTree;
import sleeper.core.statestore.AllReferencesToAllFiles;
import sleeper.core.table.TableStatus;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.dsl.SystemTestContext;
import sleeper.systemtest.dsl.SystemTestDrivers;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.systemtest.dsl.snapshot.SnapshotsDriver;
import sleeper.systemtest.dsl.snapshot.WaitForSnapshot;

import java.time.Instant;
import java.util.Map;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
Expand All @@ -36,12 +42,14 @@ public class SystemTestStateStore {
private final SystemTestContext context;
private final StateStoreCommitterDriver driver;
private final StateStoreCommitterLogsDriver logsDriver;
private final SnapshotsDriver snapshotsDriver;

public SystemTestStateStore(SystemTestContext context) {
this.context = context;
SystemTestDrivers adminDrivers = context.instance().adminDrivers();
driver = adminDrivers.stateStoreCommitter(context);
logsDriver = adminDrivers.stateStoreCommitterLogs(context);
snapshotsDriver = adminDrivers.snapshots();
}

public SystemTestStateStoreFakeCommits fakeCommits() {
Expand Down Expand Up @@ -77,4 +85,14 @@ private Map<String, Double> commitsPerSecondByTableId() {
.collect(toSet()));
}

public AllReferencesToAllFiles waitForFilesSnapshot(PollWithRetries intervalAndPollingTimeout, Predicate<AllReferencesToAllFiles> condition) throws InterruptedException {
return new WaitForSnapshot(context.instance(), snapshotsDriver)
.waitForFilesSnapshot(intervalAndPollingTimeout, condition);
}

public PartitionTree waitForPartitionsSnapshot(PollWithRetries intervalAndPollingTimeout, Predicate<PartitionTree> condition) throws InterruptedException {
return new WaitForSnapshot(context.instance(), snapshotsDriver)
.waitForPartitionsSnapshot(intervalAndPollingTimeout, condition);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

import sleeper.core.partition.PartitionTree;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceFactory;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage;
import sleeper.systemtest.suite.testutil.SystemTest;

import java.time.Duration;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -60,9 +58,7 @@ void shouldAddManyFiles(SleeperSystemTest sleeper) throws Exception {
.waitForCommitLogs(PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(20), Duration.ofMinutes(3)));

// Then
Map<String, Long> recordsByFilename = sleeper.tableFiles().references().stream()
.collect(toMap(FileReference::getFilename, FileReference::getNumberOfRecords));
assertThat(recordsByFilename).isEqualTo(
assertThat(sleeper.tableFiles().recordsByFilename()).isEqualTo(
LongStream.rangeClosed(1, 1000).mapToObj(i -> i)
.collect(toMap(i -> "file-" + i + ".parquet", i -> i)));
}
Expand Down
Loading
Loading