Skip to content

Create a utility class for static SnapshotsService methods #127419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -385,7 +386,7 @@ private static ByteSizeValue objectSizeLimit(ByteSizeValue chunkSize, ByteSizeVa
@Override
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
final FinalizeSnapshotContext wrappedFinalizeContext;
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
if (SnapshotsServiceUtils.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
final ListenableFuture<Void> metadataDone = new ListenableFuture<>();
wrappedFinalizeContext = new FinalizeSnapshotContext(
finalizeSnapshotContext.updatedShardGenerations(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
// incompatibility in the downgrade test step. We verify that it is impossible here and then create the repo using verify=false
// to check behavior on other operations below.
final boolean verify = TEST_STEP != TestStep.STEP3_OLD_CLUSTER
|| SnapshotsService.includesUUIDs(minNodeVersion)
|| SnapshotsServiceUtils.includesUUIDs(minNodeVersion)
|| minNodeVersion.before(IndexVersions.V_7_12_0);
if (verify == false) {
expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> createRepository(repoName, false, true));
Expand All @@ -208,7 +208,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards, index);
}
} else {
if (SnapshotsService.includesUUIDs(minNodeVersion) == false) {
if (SnapshotsServiceUtils.includesUUIDs(minNodeVersion) == false) {
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> listSnapshots(repoName));
expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> deleteSnapshot(repoName, "snapshot-1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {

logger.info("--> verify that repo is assumed in old metadata format");
assertThat(
SnapshotsService.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null),
SnapshotsServiceUtils.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null),
is(SnapshotsService.OLD_SNAPSHOT_FORMAT)
);

Expand All @@ -336,7 +336,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {

logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
assertThat(
SnapshotsService.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null),
SnapshotsServiceUtils.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null),
is(IndexVersion.current())
);
final RepositoryData finalRepositoryData = getRepositoryData(repoName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -170,7 +170,7 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsService.ensureRepositoryExists(repositoryName, currentState);
SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, currentState);
final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -113,7 +113,7 @@ protected void masterOperation(
final CancellableTask cancellableTask = (CancellableTask) task;

final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
List<SnapshotsInProgress.Entry> currentSnapshots = SnapshotsService.currentSnapshots(
List<SnapshotsInProgress.Entry> currentSnapshots = SnapshotsServiceUtils.currentSnapshots(
snapshotsInProgress,
request.repository(),
Arrays.asList(request.snapshots())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -305,7 +305,7 @@ private RolloverResult rolloverDataStream(
boolean isFailureStoreRollover
) throws Exception {
final ProjectMetadata metadata = projectState.metadata();
Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(
Set<String> snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams(
projectState,
Collections.singleton(dataStream.getName())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;

import java.io.IOException;
import java.util.HashSet;
Expand Down Expand Up @@ -464,7 +464,7 @@ public static ClusterState deleteDataStreams(ProjectState projectState, Set<Data
}

Set<String> dataStreamNames = dataStreams.stream().map(DataStream::getName).collect(Collectors.toSet());
Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreamNames);
Set<String> snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams(projectState, dataStreamNames);
if (snapshottingDataStreams.isEmpty() == false) {
throw new SnapshotInProgressException(
"Cannot delete data streams that are being snapshotted: ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -186,7 +186,7 @@ public static ClusterState deleteIndices(ProjectState projectState, Set<Index> i
}

// Check if index deletion conflicts with any running snapshots
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(projectState, indicesToDelete);
Set<Index> snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(projectState, indicesToDelete);
if (snapshottingIndices.isEmpty() == false) {
throw new SnapshotInProgressException(
"Cannot delete indices that are being snapshotted: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -343,7 +343,7 @@ static ClusterState addIndexClosedBlocks(
}

// Check if index closing conflicts with any running snapshots
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentProjectState, indicesToClose);
Set<Index> snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(currentProjectState, indicesToClose);
if (snapshottingIndices.isEmpty() == false) {
throw new SnapshotInProgressException(
"Cannot close indices that are being snapshotted: "
Expand Down Expand Up @@ -925,7 +925,7 @@ static Tuple<ClusterState, List<IndexResult>> closeRoutingTable(
}

// Check if index closing conflicts with any running snapshots
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentProjectState, Set.of(index));
Set<Index> snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(currentProjectState, Set.of(index));
if (snapshottingIndices.isEmpty() == false) {
closingResults.put(
result.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;

import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -103,7 +103,11 @@ public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteShardGenerations() {
* Returns a new {@link ClusterState}, based on the given {@code state} with the create-snapshot entry removed.
*/
public ClusterState updatedClusterState(ClusterState state) {
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations);
final ClusterState updatedState = SnapshotsServiceUtils.stateWithoutSnapshot(
state,
snapshotInfo.snapshot(),
updatedShardGenerations
);
// Now that the updated cluster state may have changed in-progress shard snapshots' shard generations to the latest shard
// generation, let's mark any now unreferenced shard generations as obsolete and ready to be deleted.
obsoleteGenerations.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -700,9 +701,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final IndexVersion repoMetaVersion, boolean permitMissingUuid)
throws IOException {

final boolean shouldWriteUUIDS = SnapshotsService.includesUUIDs(repoMetaVersion);
final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion);
final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion);
final boolean shouldWriteUUIDS = SnapshotsServiceUtils.includesUUIDs(repoMetaVersion);
final boolean shouldWriteIndexGens = SnapshotsServiceUtils.useIndexGenerations(repoMetaVersion);
final boolean shouldWriteShardGens = SnapshotsServiceUtils.useShardGenerations(repoMetaVersion);

assert Boolean.compare(shouldWriteUUIDS, shouldWriteIndexGens) <= 0;
assert Boolean.compare(shouldWriteIndexGens, shouldWriteShardGens) <= 0;
Expand Down Expand Up @@ -909,7 +910,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
this snapshot repository format requires Elasticsearch version [%s] or later""", versionString));
};

assert SnapshotsService.useShardGenerations(version);
assert SnapshotsServiceUtils.useShardGenerations(version);
}
case UUID -> {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.SnapshotsServiceUtils;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LeakTracker;
Expand Down Expand Up @@ -974,7 +975,7 @@ private void createSnapshotsDeletion(
return new SnapshotsDeletion(
snapshotIds,
repositoryDataGeneration,
SnapshotsService.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds),
SnapshotsServiceUtils.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds),
originalRootBlobs,
blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT_DATA),
originalRepositoryData
Expand Down Expand Up @@ -1080,7 +1081,7 @@ class SnapshotsDeletion {
this.snapshotIds = snapshotIds;
this.originalRepositoryDataGeneration = originalRepositoryDataGeneration;
this.repositoryFormatIndexVersion = repositoryFormatIndexVersion;
this.useShardGenerations = SnapshotsService.useShardGenerations(repositoryFormatIndexVersion);
this.useShardGenerations = SnapshotsServiceUtils.useShardGenerations(repositoryFormatIndexVersion);
this.originalRootBlobs = originalRootBlobs;
this.originalIndexContainers = originalIndexContainers;
this.originalRepositoryData = originalRepositoryData;
Expand Down Expand Up @@ -1750,11 +1751,11 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
// If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
// when writing the index-${N} to each shard directory.
final IndexVersion repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion();
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
final boolean writeShardGens = SnapshotsServiceUtils.useShardGenerations(repositoryMetaVersion);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);

final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);
final boolean writeIndexGens = SnapshotsServiceUtils.useIndexGenerations(repositoryMetaVersion);

record MetadataWriteResult(
RepositoryData existingRepositoryData,
Expand Down Expand Up @@ -2522,7 +2523,7 @@ private void cacheRepositoryData(RepositoryData repositoryData, IndexVersion ver
return;
}
final RepositoryData toCache;
if (SnapshotsService.useShardGenerations(version)) {
if (SnapshotsServiceUtils.useShardGenerations(version)) {
toCache = repositoryData;
} else {
// don't cache shard generations here as they may be unreliable
Expand Down Expand Up @@ -2871,7 +2872,7 @@ public void onFailure(Exception e) {
}, true);
maybeWriteIndexLatest(newGen);

if (filteredRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) && SnapshotsService.includesUUIDs(version)) {
if (filteredRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) && SnapshotsServiceUtils.includesUUIDs(version)) {
assert newRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) == false;
logger.info(
Strings.format(
Expand Down Expand Up @@ -2954,7 +2955,7 @@ public String toString() {
}

private RepositoryData updateRepositoryData(RepositoryData repositoryData, IndexVersion repositoryMetaversion, long newGen) {
if (SnapshotsService.includesUUIDs(repositoryMetaversion)) {
if (SnapshotsServiceUtils.includesUUIDs(repositoryMetaversion)) {
final String clusterUUID = clusterService.state().metadata().clusterUUID();
if (repositoryData.getClusterUUID().equals(clusterUUID) == false) {
repositoryData = repositoryData.withClusterUuid(clusterUUID);
Expand Down Expand Up @@ -3089,7 +3090,7 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state,
}
}
updatedDeletionsInProgress = changedDeletions ? SnapshotDeletionsInProgress.of(deletionEntries) : null;
return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress);
return SnapshotsServiceUtils.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress);
}

private RepositoryMetadata getRepoMetadata(ClusterState state) {
Expand Down Expand Up @@ -3328,8 +3329,8 @@ private void doSnapshotShard(SnapshotShardContext context) {
);

final ShardGeneration indexGeneration;
final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion());
final boolean writeFileInfoWriterUUID = SnapshotsService.includeFileInfoWriterUUID(context.getRepositoryMetaVersion());
final boolean writeShardGens = SnapshotsServiceUtils.useShardGenerations(context.getRepositoryMetaVersion());
final boolean writeFileInfoWriterUUID = SnapshotsServiceUtils.includeFileInfoWriterUUID(context.getRepositoryMetaVersion());
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = snapshots.withAddedSnapshot(
new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr
newSnapshotShards.put(shardId, snapshotStatus);
final IndexId indexId = entry.indices().get(shardId.getIndexName());
assert indexId != null;
assert SnapshotsService.useShardGenerations(entry.version())
assert SnapshotsServiceUtils.useShardGenerations(entry.version())
|| ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null
: "Found non-null, non-numeric shard generation ["
+ snapshotStatus.generation()
Expand Down
Loading