diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 0904f37e39743..7a5cd297320ba 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -41,6 +41,7 @@ import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -385,7 +386,7 @@ private static ByteSizeValue objectSizeLimit(ByteSizeValue chunkSize, ByteSizeVa @Override public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { final FinalizeSnapshotContext wrappedFinalizeContext; - if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) { + if (SnapshotsServiceUtils.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) { final ListenableFuture metadataDone = new ListenableFuture<>(); wrappedFinalizeContext = new FinalizeSnapshotContext( finalizeSnapshotContext.updatedShardGenerations(), diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java index dc002ea1a44c1..1b2fc46a7ff9d 100644 --- a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -18,7 +18,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.XContentParser; @@ -183,7 +183,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { // incompatibility in the downgrade test step. We verify that it is impossible here and then create the repo using verify=false // to check behavior on other operations below. final boolean verify = TEST_STEP != TestStep.STEP3_OLD_CLUSTER - || SnapshotsService.includesUUIDs(minNodeVersion) + || SnapshotsServiceUtils.includesUUIDs(minNodeVersion) || minNodeVersion.before(IndexVersions.V_7_12_0); if (verify == false) { expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> createRepository(repoName, false, true)); @@ -208,7 +208,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards, index); } } else { - if (SnapshotsService.includesUUIDs(minNodeVersion) == false) { + if (SnapshotsServiceUtils.includesUUIDs(minNodeVersion) == false) { assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> listSnapshots(repoName)); expectThrowsAnyOf(EXPECTED_BWC_EXCEPTIONS, () -> deleteSnapshot(repoName, "snapshot-1")); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index badc7f47b1a0a..c567ffedfe29f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -327,7 +327,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { logger.info("--> verify that repo is assumed in old metadata format"); assertThat( - SnapshotsService.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), + SnapshotsServiceUtils.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), is(SnapshotsService.OLD_SNAPSHOT_FORMAT) ); @@ -336,7 +336,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot"); assertThat( - SnapshotsService.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), + SnapshotsServiceUtils.minCompatibleVersion(IndexVersion.current(), getRepositoryData(repoName), null), is(IndexVersion.current()) ); final RepositoryData finalRepositoryData = getRepositoryData(repoName); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 11a6a6967cb73..a0d4f7b870b5c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -34,7 +34,7 @@ import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -170,7 +170,7 @@ private void cleanupRepo(String repositoryName, ActionListener currentSnapshots = SnapshotsService.currentSnapshots( + List currentSnapshots = SnapshotsServiceUtils.currentSnapshots( snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 9082dad757f8c..b4eb1fe8ed1bb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -50,7 +50,7 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; @@ -305,7 +305,7 @@ private RolloverResult rolloverDataStream( boolean isFailureStoreRollover ) throws Exception { final ProjectMetadata metadata = projectState.metadata(); - Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams( + Set snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams( projectState, Collections.singleton(dataStream.getName()) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 62417cbdd5863..27eac8d2763e9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -35,7 +35,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import java.io.IOException; import java.util.HashSet; @@ -464,7 +464,7 @@ public static ClusterState deleteDataStreams(ProjectState projectState, Set dataStreamNames = dataStreams.stream().map(DataStream::getName).collect(Collectors.toSet()); - Set snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreamNames); + Set snapshottingDataStreams = SnapshotsServiceUtils.snapshottingDataStreams(projectState, dataStreamNames); if (snapshottingDataStreams.isEmpty() == false) { throw new SnapshotInProgressException( "Cannot delete data streams that are being snapshotted: [" diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index 3ba4ad1cd2454..1e6882f54b361 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -35,7 +35,7 @@ import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import java.util.HashMap; import java.util.HashSet; @@ -186,7 +186,7 @@ public static ClusterState deleteIndices(ProjectState projectState, Set i } // Check if index deletion conflicts with any running snapshots - Set snapshottingIndices = SnapshotsService.snapshottingIndices(projectState, indicesToDelete); + Set snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(projectState, indicesToDelete); if (snapshottingIndices.isEmpty() == false) { throw new SnapshotInProgressException( "Cannot delete indices that are being snapshotted: " diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index e26244440fcad..6c480a97ce52d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -74,7 +74,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -343,7 +343,7 @@ static ClusterState addIndexClosedBlocks( } // Check if index closing conflicts with any running snapshots - Set snapshottingIndices = SnapshotsService.snapshottingIndices(currentProjectState, indicesToClose); + Set snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(currentProjectState, indicesToClose); if (snapshottingIndices.isEmpty() == false) { throw new SnapshotInProgressException( "Cannot close indices that are being snapshotted: " @@ -925,7 +925,7 @@ static Tuple> closeRoutingTable( } // Check if index closing conflicts with any running snapshots - Set snapshottingIndices = SnapshotsService.snapshottingIndices(currentProjectState, Set.of(index)); + Set snapshottingIndices = SnapshotsServiceUtils.snapshottingIndices(currentProjectState, Set.of(index)); if (snapshottingIndices.isEmpty() == false) { closingResults.put( result.getKey(), diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index c7b051136eab8..f6d7fa65f6638 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -17,7 +17,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import java.util.Map; import java.util.Set; @@ -103,7 +103,11 @@ public Map> obsoleteShardGenerations() { * Returns a new {@link ClusterState}, based on the given {@code state} with the create-snapshot entry removed. */ public ClusterState updatedClusterState(ClusterState state) { - final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations); + final ClusterState updatedState = SnapshotsServiceUtils.stateWithoutSnapshot( + state, + snapshotInfo.snapshot(), + updatedShardGenerations + ); // Now that the updated cluster state may have changed in-progress shard snapshots' shard generations to the latest shard // generation, let's mark any now unreferenced shard generations as obsolete and ready to be deleted. obsoleteGenerations.set( diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index e89998dc919e6..fce6b92ad1945 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -29,6 +29,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -700,9 +701,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final IndexVersion repoMetaVersion, boolean permitMissingUuid) throws IOException { - final boolean shouldWriteUUIDS = SnapshotsService.includesUUIDs(repoMetaVersion); - final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); - final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); + final boolean shouldWriteUUIDS = SnapshotsServiceUtils.includesUUIDs(repoMetaVersion); + final boolean shouldWriteIndexGens = SnapshotsServiceUtils.useIndexGenerations(repoMetaVersion); + final boolean shouldWriteShardGens = SnapshotsServiceUtils.useShardGenerations(repoMetaVersion); assert Boolean.compare(shouldWriteUUIDS, shouldWriteIndexGens) <= 0; assert Boolean.compare(shouldWriteIndexGens, shouldWriteShardGens) <= 0; @@ -909,7 +910,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g this snapshot repository format requires Elasticsearch version [%s] or later""", versionString)); }; - assert SnapshotsService.useShardGenerations(version); + assert SnapshotsServiceUtils.useShardGenerations(version); } case UUID -> { XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 85039f1b61792..b1e1eb3faf252 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -128,6 +128,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.snapshots.SnapshotsServiceUtils; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.LeakTracker; @@ -974,7 +975,7 @@ private void createSnapshotsDeletion( return new SnapshotsDeletion( snapshotIds, repositoryDataGeneration, - SnapshotsService.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds), + SnapshotsServiceUtils.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds), originalRootBlobs, blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT_DATA), originalRepositoryData @@ -1080,7 +1081,7 @@ class SnapshotsDeletion { this.snapshotIds = snapshotIds; this.originalRepositoryDataGeneration = originalRepositoryDataGeneration; this.repositoryFormatIndexVersion = repositoryFormatIndexVersion; - this.useShardGenerations = SnapshotsService.useShardGenerations(repositoryFormatIndexVersion); + this.useShardGenerations = SnapshotsServiceUtils.useShardGenerations(repositoryFormatIndexVersion); this.originalRootBlobs = originalRootBlobs; this.originalIndexContainers = originalIndexContainers; this.originalRepositoryData = originalRepositoryData; @@ -1750,11 +1751,11 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened // when writing the index-${N} to each shard directory. final IndexVersion repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion(); - final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); + final boolean writeShardGens = SnapshotsServiceUtils.useShardGenerations(repositoryMetaVersion); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); + final boolean writeIndexGens = SnapshotsServiceUtils.useIndexGenerations(repositoryMetaVersion); record MetadataWriteResult( RepositoryData existingRepositoryData, @@ -2522,7 +2523,7 @@ private void cacheRepositoryData(RepositoryData repositoryData, IndexVersion ver return; } final RepositoryData toCache; - if (SnapshotsService.useShardGenerations(version)) { + if (SnapshotsServiceUtils.useShardGenerations(version)) { toCache = repositoryData; } else { // don't cache shard generations here as they may be unreliable @@ -2871,7 +2872,7 @@ public void onFailure(Exception e) { }, true); maybeWriteIndexLatest(newGen); - if (filteredRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) && SnapshotsService.includesUUIDs(version)) { + if (filteredRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) && SnapshotsServiceUtils.includesUUIDs(version)) { assert newRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) == false; logger.info( Strings.format( @@ -2954,7 +2955,7 @@ public String toString() { } private RepositoryData updateRepositoryData(RepositoryData repositoryData, IndexVersion repositoryMetaversion, long newGen) { - if (SnapshotsService.includesUUIDs(repositoryMetaversion)) { + if (SnapshotsServiceUtils.includesUUIDs(repositoryMetaversion)) { final String clusterUUID = clusterService.state().metadata().clusterUUID(); if (repositoryData.getClusterUUID().equals(clusterUUID) == false) { repositoryData = repositoryData.withClusterUuid(clusterUUID); @@ -3089,7 +3090,7 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state, } } updatedDeletionsInProgress = changedDeletions ? SnapshotDeletionsInProgress.of(deletionEntries) : null; - return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress); + return SnapshotsServiceUtils.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress); } private RepositoryMetadata getRepoMetadata(ClusterState state) { @@ -3328,8 +3329,8 @@ private void doSnapshotShard(SnapshotShardContext context) { ); final ShardGeneration indexGeneration; - final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion()); - final boolean writeFileInfoWriterUUID = SnapshotsService.includeFileInfoWriterUUID(context.getRepositoryMetaVersion()); + final boolean writeShardGens = SnapshotsServiceUtils.useShardGenerations(context.getRepositoryMetaVersion()); + final boolean writeFileInfoWriterUUID = SnapshotsServiceUtils.includeFileInfoWriterUUID(context.getRepositoryMetaVersion()); // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = snapshots.withAddedSnapshot( new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier()) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index e840acd049b71..0a445f7c999a6 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -398,7 +398,7 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr newSnapshotShards.put(shardId, snapshotStatus); final IndexId indexId = entry.indices().get(shardId.getIndexName()); assert indexId != null; - assert SnapshotsService.useShardGenerations(entry.version()) + assert SnapshotsServiceUtils.useShardGenerations(entry.version()) || ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null : "Found non-null, non-numeric shard generation [" + snapshotStatus.generation() diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a801a7067ccfe..9ba9d8f11b02a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; @@ -33,8 +32,6 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; -import org.elasticsearch.cluster.ProjectState; -import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -52,7 +49,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -75,13 +71,10 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ListenableFuture; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Predicates; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.shard.ShardId; @@ -93,7 +86,6 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; -import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardGenerations; @@ -113,7 +105,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -129,7 +120,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; import static org.elasticsearch.common.Strings.arrayToCommaDelimitedString; import static org.elasticsearch.core.Strings.format; @@ -137,6 +127,7 @@ /** * Service responsible for creating snapshots. This service runs all the steps executed on the master node during snapshot creation and * deletion. + * * See package level documentation of {@link org.elasticsearch.snapshots} for details. * See {@link SnapshotShardsService} for the data node snapshotting steps. */ @@ -265,7 +256,7 @@ public void executeSnapshot(final CreateSnapshotRequest request, final ActionLis /** * Initializes the snapshotting process. *

- * This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and + * This method is used by clients to start a snapshot. It makes sure that there are no snapshots currently running and * creates a snapshot record in cluster state metadata. * * @param request snapshot request @@ -274,7 +265,7 @@ public void executeSnapshot(final CreateSnapshotRequest request, final ActionLis public void createSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { final String repositoryName = request.repository(); final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); - validate(repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, request.uuid()); Repository repository = repositoriesService.repository(request.repository()); if (repository.isReadOnly()) { @@ -306,12 +297,6 @@ private void submitCreateSnapshotRequest( ); } - private static void ensureSnapshotNameNotRunning(SnapshotsInProgress runningSnapshots, String repositoryName, String snapshotName) { - if (runningSnapshots.forRepo(repositoryName).stream().anyMatch(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName))) { - throw new SnapshotNameAlreadyInUseException(repositoryName, snapshotName, "snapshot with the same name is already in-progress"); - } - } - // TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache // for repository metadata and loading it has predictable performance public void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener) { @@ -322,7 +307,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener lis return; } final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.target()); - validate(repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName); // TODO: create snapshot UUID in CloneSnapshotRequest and make this operation idempotent to cleanly deal with transport layer // retries final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); @@ -334,12 +319,12 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener lis @Override public ClusterState execute(ClusterState currentState) { - ensureRepositoryExists(repositoryName, currentState); - ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot"); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, currentState); + SnapshotsServiceUtils.ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + SnapshotsServiceUtils.ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot"); final SnapshotsInProgress snapshots = SnapshotsInProgress.get(currentState); - ensureSnapshotNameNotRunning(snapshots, repositoryName, snapshotName); - validate(repositoryName, snapshotName, currentState); + SnapshotsServiceUtils.ensureSnapshotNameNotRunning(snapshots, repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName, currentState); final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds() .stream() @@ -382,7 +367,11 @@ public ClusterState execute(ClusterState currentState) { repositoryData.resolveIndices(matchingIndices), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - minCompatibleVersion(currentState.nodes().getMaxDataNodeCompatibleIndexVersion(), repositoryData, null) + SnapshotsServiceUtils.minCompatibleVersion( + currentState.nodes().getMaxDataNodeCompatibleIndexVersion(), + repositoryData, + null + ) // NB minCompatibleVersion iterates over all the snapshots in the current repositoryData, which probably should happen on a // different thread. Also is the _current_ repositoryData the right thing to consider? The minimum repository format version // can only advance during a snapshot delete which today is never concurrent to other writes, but a future version may allow @@ -407,42 +396,6 @@ public void clusterStateProcessed(ClusterState oldState, final ClusterState newS }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure); } - /** - * Checks the cluster state for any in-progress repository cleanup tasks ({@link RepositoryCleanupInProgress}). - */ - private static void ensureNoCleanupInProgress( - final ClusterState currentState, - final String repositoryName, - final String snapshotName, - final String reason - ) { - final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState); - if (repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - snapshotName, - "cannot " - + reason - + " while a repository cleanup is in-progress in " - + repositoryCleanupInProgress.entries() - .stream() - .map(RepositoryCleanupInProgress.Entry::repository) - .collect(Collectors.toSet()) - ); - } - } - - private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) { - // check if the snapshot name already exists in the repository - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new SnapshotNameAlreadyInUseException( - repository.getMetadata().name(), - snapshotName, - "snapshot with the same name already exists" - ); - } - } - /** * Determine the number of shards in each index of a clone operation and update the cluster state accordingly. * @@ -550,7 +503,7 @@ public ClusterState execute(ClusterState currentState) { // shard snapshot state was based on all previous existing operations in progress // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer updatedEntries.add(updatedEntry); - return updateWithSnapshots( + return SnapshotsServiceUtils.updateWithSnapshots( currentState, snapshotsInProgress.withUpdatedEntriesForRepo(repoName, updatedEntries), null @@ -688,146 +641,6 @@ private void ensureBelowConcurrencyLimit( } } - /** - * Throws {@link RepositoryMissingException} if no repository by the given name is found in the given cluster state. - */ - public static void ensureRepositoryExists(String repoName, ClusterState state) { - if (RepositoriesMetadata.get(state).repository(repoName) == null) { - throw new RepositoryMissingException(repoName); - } - } - - /** - * Validates snapshot request - * - * @param repositoryName repository name - * @param snapshotName snapshot name - * @param state current cluster state - */ - private static void validate(String repositoryName, String snapshotName, ClusterState state) { - if (RepositoriesMetadata.get(state).repository(repositoryName) == null) { - throw new RepositoryMissingException(repositoryName); - } - validate(repositoryName, snapshotName); - } - - private static void validate(final String repositoryName, final String snapshotName) { - if (Strings.hasLength(snapshotName) == false) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "cannot be empty"); - } - if (snapshotName.contains(" ")) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain whitespace"); - } - if (snapshotName.contains(",")) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain ','"); - } - if (snapshotName.contains("#")) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain '#'"); - } - if (snapshotName.charAt(0) == '_') { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not start with '_'"); - } - if (snapshotName.toLowerCase(Locale.ROOT).equals(snapshotName) == false) { - throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must be lowercase"); - } - if (Strings.validFileName(snapshotName) == false) { - throw new InvalidSnapshotNameException( - repositoryName, - snapshotName, - "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS - ); - } - } - - private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { - ShardGenerations.Builder builder = ShardGenerations.builder(); - if (snapshot.isClone()) { - snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value)); - } else { - snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { - final Index index = snapshot.indexByName(key.indexName()); - if (metadata.findIndex(index).isEmpty()) { - assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; - return; - } - builder.put(key.index(), key.shardId(), value); - }); - } - return builder.build(); - } - - private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) { - final Metadata.Builder builder; - if (snapshot.includeGlobalState() == false) { - // Remove global state from the cluster state - builder = Metadata.builder(); - for (IndexId index : snapshot.indices().values()) { - final IndexMetadata indexMetadata = metadata.getProject().index(index.getName()); - if (indexMetadata == null) { - assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; - } else { - builder.put(indexMetadata, false); - } - } - } else { - builder = Metadata.builder(metadata); - } - // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have - // all their indices contained in the snapshot - final Map dataStreams = new HashMap<>(); - final Set indicesInSnapshot = snapshot.indices().keySet(); - for (String dataStreamName : snapshot.dataStreams()) { - DataStream dataStream = metadata.getProject().dataStreams().get(dataStreamName); - if (dataStream == null) { - assert snapshot.partial() - : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; - } else { - final DataStream reconciled = dataStream.snapshot(indicesInSnapshot, builder); - if (reconciled != null) { - dataStreams.put(dataStreamName, reconciled); - } - } - } - return builder.dataStreams(dataStreams, filterDataStreamAliases(dataStreams, metadata.getProject().dataStreamAliases())).build(); - } - - /** - * Returns status of the currently running snapshots - *

- * This method is executed on master node - *

- * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repository repository id - * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered - * @return list of metadata for currently running snapshots - */ - public static List currentSnapshots( - @Nullable SnapshotsInProgress snapshotsInProgress, - String repository, - List snapshots - ) { - if (snapshotsInProgress == null || snapshotsInProgress.isEmpty()) { - return Collections.emptyList(); - } - if ("_all".equals(repository)) { - return snapshotsInProgress.asStream().toList(); - } - if (snapshots.isEmpty()) { - return snapshotsInProgress.forRepo(repository); - } - List builder = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repository)) { - for (String snapshot : snapshots) { - if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) { - builder.add(entry); - break; - } - } - } - return unmodifiableList(builder); - } - @Override public void applyClusterState(ClusterChangedEvent event) { try { @@ -836,14 +649,16 @@ public void applyClusterState(ClusterChangedEvent event) { SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(event.state()); final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; processExternalChanges( - newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), + newMaster || SnapshotsServiceUtils.removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), snapshotsInProgress.nodeIdsForRemovalChanged(SnapshotsInProgress.get(event.previousState())) - || (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) + || (event.routingTableChanged() + && SnapshotsServiceUtils.waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) ); if (newMaster || event.state().metadata().nodeShutdowns().equals(event.previousState().metadata().nodeShutdowns()) == false - || supportsNodeRemovalTracking(event.state()) != supportsNodeRemovalTracking(event.previousState())) { + || SnapshotsServiceUtils.supportsNodeRemovalTracking(event.state()) != SnapshotsServiceUtils + .supportsNodeRemovalTracking(event.previousState())) { updateNodeIdsToRemoveQueue.submitTask( "SnapshotsService#updateNodeIdsToRemove", new UpdateNodeIdsForRemovalTask(), @@ -872,7 +687,9 @@ public void applyClusterState(ClusterChangedEvent event) { final Exception cause = new NotMasterException("no longer master"); for (final Iterator>> it = snapshotDeletionListeners.values().iterator(); it.hasNext();) { final List> listeners = it.next(); - readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, cause)); + readyToResolveListeners.add( + () -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, cause, logger) + ); it.remove(); } } @@ -885,7 +702,7 @@ public void applyClusterState(ClusterChangedEvent event) { logger.warn("Failed to update snapshot state ", e); } assert assertConsistentWithClusterState(event.state()); - assert assertNoDanglingSnapshots(event.state()); + assert SnapshotsServiceUtils.assertNoDanglingSnapshots(event.state()); } private boolean assertConsistentWithClusterState(ClusterState state) { @@ -919,35 +736,6 @@ private boolean assertConsistentWithClusterState(ClusterState state) { return true; } - // Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it - // to be assigned - private static boolean assertNoDanglingSnapshots(ClusterState state) { - final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state); - final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state); - final Set reposWithRunningDelete = snapshotDeletionsInProgress.getEntries() - .stream() - .filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.STARTED) - .map(SnapshotDeletionsInProgress.Entry::repository) - .collect(Collectors.toSet()); - for (List repoEntry : snapshotsInProgress.entriesByRepo()) { - final SnapshotsInProgress.Entry entry = repoEntry.get(0); - for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { - if (value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) { - assert reposWithRunningDelete.contains(entry.repository()) - : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete"; - } else if (value.isActive()) { - assert reposWithRunningDelete.contains(entry.repository()) == false - : "Found shard snapshot actively executing in [" - + entry - + "] when it should be blocked by a running delete [" - + Strings.toString(snapshotDeletionsInProgress) - + "]"; - } - } - } - return true; - } - /** * Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (master fail-over or * disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is @@ -1065,13 +853,15 @@ public ClusterState execute(ClusterState currentState) { } else { // Not a clone, and the snapshot is in STARTED or ABORTED state. - ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes( - snapshotEntry, - routingTable, - nodes, - snapshotsInProgress::isNodeIdForRemoval, - knownFailures - ); + ImmutableOpenMap shards = SnapshotsServiceUtils + .processWaitingShardsAndRemovedNodes( + snapshotEntry, + routingTable, + nodes, + snapshotsInProgress::isNodeIdForRemoval, + knownFailures, + logger + ); if (shards != null) { final SnapshotsInProgress.Entry updatedSnapshot = snapshotEntry.withShardStates(shards); changed = true; @@ -1102,7 +892,7 @@ public ClusterState execute(ClusterState currentState) { updatedSnapshots = updatedSnapshots.withUpdatedEntriesForRepo(repositoryName, updatedEntriesForRepo); } } - final ClusterState res = readyDeletions( + final ClusterState res = SnapshotsServiceUtils.readyDeletions( updatedSnapshots != snapshotsInProgress ? ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build() : currentState @@ -1152,176 +942,6 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) }); } - /** - * Walks through the snapshot entries' shard snapshots and creates applies updates from looking at removed nodes or indexes and known - * failed shard snapshots on the same shard IDs. - * - * @param nodeIdRemovalPredicate identify any nodes that are marked for removal / in shutdown mode - * @param knownFailures already known failed shard snapshots, but more may be found in this method - * @return an updated map of shard statuses - */ - private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( - SnapshotsInProgress.Entry snapshotEntry, - RoutingTable routingTable, - DiscoveryNodes nodes, - Predicate nodeIdRemovalPredicate, - Map knownFailures - ) { - assert snapshotEntry.isClone() == false : "clones take a different path"; - boolean snapshotChanged = false; - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - for (Map.Entry shardSnapshotEntry : snapshotEntry.shardSnapshotStatusByRepoShardId() - .entrySet()) { - ShardSnapshotStatus shardStatus = shardSnapshotEntry.getValue(); - ShardId shardId = snapshotEntry.shardId(shardSnapshotEntry.getKey()); - if (shardStatus.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) { - // this shard snapshot is waiting for a previous snapshot to finish execution for this shard - final ShardSnapshotStatus knownFailure = knownFailures.get(shardSnapshotEntry.getKey()); - if (knownFailure == null) { - final IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); - if (indexShardRoutingTable == null) { - // shard became unassigned while queued after a delete or clone operation so we can fail as missing here - assert snapshotEntry.partial(); - snapshotChanged = true; - logger.debug("failing snapshot of shard [{}] because index got deleted", shardId); - shards.put(shardId, ShardSnapshotStatus.MISSING); - knownFailures.put(shardSnapshotEntry.getKey(), ShardSnapshotStatus.MISSING); - } else { - // if no failure is known for the shard we keep waiting - shards.put(shardId, shardStatus); - } - } else { - // If a failure is known for an execution we waited on for this shard then we fail with the same exception here - // as well - snapshotChanged = true; - shards.put(shardId, knownFailure); - } - } else if (shardStatus.state() == ShardState.WAITING || shardStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { - // The shard primary wasn't assigned, or the shard snapshot was paused because the node was shutting down. - IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); - if (indexShardRoutingTable != null) { - IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id()); - if (shardRouting != null) { - final var primaryNodeId = shardRouting.primaryShard().currentNodeId(); - if (nodeIdRemovalPredicate.test(primaryNodeId)) { - if (shardStatus.state() == ShardState.PAUSED_FOR_NODE_REMOVAL) { - // Shard that we are waiting for is on a node marked for removal, keep it as PAUSED_FOR_REMOVAL - shards.put(shardId, shardStatus); - } else { - // Shard that we are waiting for is on a node marked for removal, move it to PAUSED_FOR_REMOVAL - snapshotChanged = true; - shards.put( - shardId, - new ShardSnapshotStatus(primaryNodeId, ShardState.PAUSED_FOR_NODE_REMOVAL, shardStatus.generation()) - ); - } - continue; - } else if (shardRouting.primaryShard().started()) { - // Shard that we were waiting for has started on a node, let's process it - snapshotChanged = true; - logger.debug(""" - Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ - shard state [{}] - """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); - shards.put(shardId, new ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); - continue; - } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { - // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait - shards.put(shardId, shardStatus); - continue; - } - } - } - // Shard that we were waiting for went into unassigned state or disappeared (index or shard is gone) - giving up - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on node [{}] because shard is unassigned", shardId, shardStatus.nodeId()); - final ShardSnapshotStatus failedState = new ShardSnapshotStatus( - shardStatus.nodeId(), - ShardState.FAILED, - shardStatus.generation(), - "shard is unassigned" - ); - shards.put(shardId, failedState); - knownFailures.put(shardSnapshotEntry.getKey(), failedState); - } else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) { - if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardId, shardStatus); - } else { - // TODO: Restart snapshot on another node? - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on departed node [{}]", shardId, shardStatus.nodeId()); - final ShardSnapshotStatus failedState = new ShardSnapshotStatus( - shardStatus.nodeId(), - ShardState.FAILED, - shardStatus.generation(), - "node left the cluster during snapshot" - ); - shards.put(shardId, failedState); - knownFailures.put(shardSnapshotEntry.getKey(), failedState); - } - } else { - shards.put(shardId, shardStatus); - } - } - if (snapshotChanged) { - return shards.build(); - } else { - return null; - } - } - - private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { - for (List entries : snapshotsInProgress.entriesByRepo()) { - for (SnapshotsInProgress.Entry entry : entries) { - if (entry.state() == SnapshotsInProgress.State.STARTED && entry.isClone() == false) { - for (Map.Entry shardStatus : entry.shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardState state = shardStatus.getValue().state(); - if (state != ShardState.WAITING && state != ShardState.QUEUED && state != ShardState.PAUSED_FOR_NODE_REMOVAL) { - continue; - } - final RepositoryShardId shardId = shardStatus.getKey(); - final Index index = entry.indexByName(shardId.indexName()); - if (event.indexRoutingTableChanged(index)) { - IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index); - if (indexShardRoutingTable == null) { - // index got removed concurrently and we have to fail WAITING, QUEUED and PAUSED_FOR_REMOVAL state shards - return true; - } - ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.shardId()).primaryShard(); - if (shardRouting.started() && snapshotsInProgress.isNodeIdForRemoval(shardRouting.currentNodeId()) == false - || shardRouting.unassigned()) { - return true; - } - } - } - } - } - } - return false; - } - - private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { - if (removedNodes.isEmpty()) { - // Nothing to do, no nodes removed - return false; - } - final Set removedNodeIds = removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - return snapshotsInProgress.asStream().anyMatch(snapshot -> { - if (snapshot.state().completed() || snapshot.isClone()) { - // nothing to do for already completed snapshots or clones that run on master anyways - return false; - } - for (ShardSnapshotStatus shardSnapshotStatus : snapshot.shardSnapshotStatusByRepoShardId().values()) { - if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) { - // Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status - return true; - } - } - return false; - }); - } - /** * Finalizes the snapshot in the repository. * @@ -1431,7 +1051,7 @@ protected void doRun() { SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot); final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); - final ShardGenerations shardGenerations = buildGenerations(entry, metadata); + final ShardGenerations shardGenerations = SnapshotsServiceUtils.buildGenerations(entry, metadata); final List finalIndices = shardGenerations.indices().stream().map(IndexId::getName).toList(); final Set indexNames = new HashSet<>(finalIndices); ArrayList shardFailures = new ArrayList<>(); @@ -1473,7 +1093,7 @@ protected void doRun() { dataStreamsToCopy.put(dataStreamEntry.getKey(), dataStreamEntry.getValue()); } } - Map dataStreamAliasesToCopy = filterDataStreamAliases( + Map dataStreamAliasesToCopy = SnapshotsServiceUtils.filterDataStreamAliases( dataStreamsToCopy, existing.getProject().dataStreamAliases() ); @@ -1485,7 +1105,7 @@ protected void doRun() { } metadataListener.addListener(ActionListener.wrap(meta -> { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); - final Metadata metaForSnapshot = metadataForSnapshot(entry, meta); + final Metadata metaForSnapshot = SnapshotsServiceUtils.metadataForSnapshot(entry, meta); final Map indexSnapshotDetails = Maps.newMapWithExpectedSize( finalIndices.size() @@ -1526,7 +1146,7 @@ protected void doRun() { snapshot, finalIndices, entry.dataStreams().stream().filter(metaForSnapshot.getProject().dataStreams()::containsKey).toList(), - entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(), + entry.partial() ? SnapshotsServiceUtils.onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(), failure, threadPool.absoluteTimeInMillis(), entry.partial() ? shardGenerations.totalShards() : entry.shardSnapshotStatusByRepoShardId().size(), @@ -1562,7 +1182,7 @@ protected void doRun() { () -> snapshotListeners.addListener(new ActionListener<>() { @Override public void onResponse(List> actionListeners) { - completeListenersIgnoringException(actionListeners, snapshotInfo); + SnapshotsServiceUtils.completeListenersIgnoringException(actionListeners, snapshotInfo, logger); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); } @@ -1604,32 +1224,6 @@ public void onFailure(Exception e) { } } - /** - * Removes all feature states which have missing or failed shards, as they are no longer safely restorable. - * @param entry The "in progress" entry with a list of feature states and one or more failed shards. - * @param finalIndices The final list of indices in the snapshot, after any indices that were concurrently deleted are removed. - * @return The list of feature states which were completed successfully in the given entry. - */ - private static List onlySuccessfulFeatureStates(SnapshotsInProgress.Entry entry, List finalIndices) { - assert entry.partial() : "should not try to filter feature states from a non-partial entry"; - - // Figure out which indices have unsuccessful shards - Set indicesWithUnsuccessfulShards = new HashSet<>(); - entry.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { - final ShardState shardState = value.state(); - if (shardState.failed() || shardState.completed() == false) { - indicesWithUnsuccessfulShards.add(key.indexName()); - } - }); - - // Now remove any feature states which contain any of those indices, as the feature state is not intact and not safely restorable - return entry.featureStates() - .stream() - .filter(stateInfo -> finalIndices.containsAll(stateInfo.getIndices())) - .filter(stateInfo -> stateInfo.getIndices().stream().anyMatch(indicesWithUnsuccessfulShards::contains) == false) - .toList(); - } - /** * Remove a snapshot from {@link #endingSnapshots} set and return its completion listeners that must be resolved. */ @@ -1709,7 +1303,7 @@ private void runReadyDeletions(RepositoryData repositoryData, String repository) @Override public ClusterState execute(ClusterState currentState) { - assert readyDeletions(currentState).v1() == currentState + assert SnapshotsServiceUtils.readyDeletions(currentState).v1() == currentState : "Deletes should have been set to ready by finished snapshot deletes and finalizations"; for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(currentState).getEntries()) { if (entry.repository().equals(repository) && entry.state() == SnapshotDeletionsInProgress.State.STARTED) { @@ -1737,224 +1331,6 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) }); } - /** - * Finds snapshot delete operations that are ready to execute in the given {@link ClusterState} and computes a new cluster state that - * has all executable deletes marked as executing. Returns a {@link Tuple} of the updated cluster state and all executable deletes. - * This can either be {@link SnapshotDeletionsInProgress.Entry} that were already in state - * {@link SnapshotDeletionsInProgress.State#STARTED} or waiting entries in state {@link SnapshotDeletionsInProgress.State#WAITING} - * that were moved to {@link SnapshotDeletionsInProgress.State#STARTED} in the returned updated cluster state. - * - * @param currentState current cluster state - * @return tuple of an updated cluster state and currently executable snapshot delete operations - */ - private static Tuple> readyDeletions(ClusterState currentState) { - final SnapshotDeletionsInProgress deletions = SnapshotDeletionsInProgress.get(currentState); - if (deletions.hasDeletionsInProgress() == false) { - return Tuple.tuple(currentState, List.of()); - } - final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE); - assert snapshotsInProgress != null; - final Set repositoriesSeen = new HashSet<>(); - boolean changed = false; - final ArrayList readyDeletions = new ArrayList<>(); - final List newDeletes = new ArrayList<>(); - for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { - final String repo = entry.repository(); - if (repositoriesSeen.add(entry.repository()) - && entry.state() == SnapshotDeletionsInProgress.State.WAITING - && snapshotsInProgress.forRepo(repo).stream().noneMatch(SnapshotsService::isWritingToRepository)) { - changed = true; - final SnapshotDeletionsInProgress.Entry newEntry = entry.started(); - readyDeletions.add(newEntry); - newDeletes.add(newEntry); - } else { - newDeletes.add(entry); - } - } - return Tuple.tuple( - changed - ? ClusterState.builder(currentState) - .putCustom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.of(newDeletes)) - .build() - : currentState, - readyDeletions - ); - } - - /** - * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update - * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an - * outdated shard generation. - *

- * For example, shard snapshot X can be taken, but not finalized yet. Shard snapshot Y can then depend upon shard snapshot X. Then shard - * snapshot Y may finalize before shard snapshot X, but including X. However, X does not include Y. Therefore we update X to use Y's - * shard generation file (list of snapshots and dependencies) to avoid overwriting with X's file that is missing Y. - * - * @param state current cluster state - * @param snapshot snapshot for which to remove the snapshot operation - * @return updated cluster state - */ - public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) { - final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state); - ClusterState result = state; - int indexOfEntry = -1; - // Find the in-progress snapshot entry that matches {@code snapshot}. - final List entryList = inProgressSnapshots.forRepo(snapshot.getRepository()); - for (int i = 0; i < entryList.size(); i++) { - SnapshotsInProgress.Entry entry = entryList.get(i); - if (entry.snapshot().equals(snapshot)) { - indexOfEntry = i; - break; - } - } - if (indexOfEntry >= 0) { - final List updatedEntries = new ArrayList<>(entryList.size() - 1); - final SnapshotsInProgress.Entry removedEntry = entryList.get(indexOfEntry); - for (int i = 0; i < indexOfEntry; i++) { - final SnapshotsInProgress.Entry previousEntry = entryList.get(i); - if (removedEntry.isClone()) { - if (previousEntry.isClone()) { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - if (shardState.state() == ShardState.SUCCESS) { - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - finishedShardEntry.getKey(), - previousEntry.shardSnapshotStatusByRepoShardId() - ); - } - } - addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); - } else { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); - if (shardState.state() != ShardState.SUCCESS - || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false) { - continue; - } - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - previousEntry.shardId(repositoryShardId), - previousEntry.shards() - ); - - } - addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); - } - } else { - if (previousEntry.isClone()) { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); - if (shardState.state() != ShardState.SUCCESS - || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false - || shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { - continue; - } - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - repositoryShardId, - previousEntry.shardSnapshotStatusByRepoShardId() - ); - } - addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); - } else { - ImmutableOpenMap.Builder updatedShardAssignments = null; - for (Map.Entry finishedShardEntry : removedEntry - .shardSnapshotStatusByRepoShardId() - .entrySet()) { - final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); - if (shardState.state() == ShardState.SUCCESS - && previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey()) - && shardGenerations.hasShardGen(finishedShardEntry.getKey())) { - updatedShardAssignments = maybeAddUpdatedAssignment( - updatedShardAssignments, - shardState, - previousEntry.shardId(finishedShardEntry.getKey()), - previousEntry.shards() - ); - } - } - addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); - } - } - } - for (int i = indexOfEntry + 1; i < entryList.size(); i++) { - updatedEntries.add(entryList.get(i)); - } - result = ClusterState.builder(state) - .putCustom( - SnapshotsInProgress.TYPE, - inProgressSnapshots.withUpdatedEntriesForRepo(snapshot.getRepository(), updatedEntries) - ) - .build(); - } - return readyDeletions(result).v1(); - } - - private static void addSnapshotEntry( - List entries, - SnapshotsInProgress.Entry entryToUpdate, - @Nullable ImmutableOpenMap.Builder updatedShardAssignments - ) { - if (updatedShardAssignments == null) { - entries.add(entryToUpdate); - } else { - final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder(entryToUpdate.shards()); - updatedStatus.putAllFromMap(updatedShardAssignments.build()); - entries.add(entryToUpdate.withShardStates(updatedStatus.build())); - } - } - - private static void addCloneEntry( - List entries, - SnapshotsInProgress.Entry entryToUpdate, - @Nullable ImmutableOpenMap.Builder updatedShardAssignments - ) { - if (updatedShardAssignments == null) { - entries.add(entryToUpdate); - } else { - final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder( - entryToUpdate.shardSnapshotStatusByRepoShardId() - ); - updatedStatus.putAllFromMap(updatedShardAssignments.build()); - entries.add(entryToUpdate.withClones(updatedStatus.build())); - } - } - - @Nullable - private static ImmutableOpenMap.Builder maybeAddUpdatedAssignment( - @Nullable ImmutableOpenMap.Builder updatedShardAssignments, - ShardSnapshotStatus finishedShardState, - T shardId, - Map statesToUpdate - ) { - final ShardGeneration newGeneration = finishedShardState.generation(); - final ShardSnapshotStatus stateToUpdate = statesToUpdate.get(shardId); - if (stateToUpdate != null - && stateToUpdate.state() == ShardState.SUCCESS - && Objects.equals(newGeneration, stateToUpdate.generation()) == false) { - if (updatedShardAssignments == null) { - updatedShardAssignments = ImmutableOpenMap.builder(); - } - updatedShardAssignments.put(shardId, stateToUpdate.withUpdatedGeneration(newGeneration)); - } - return updatedShardAssignments; - } - /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the @@ -1976,14 +1352,14 @@ private void removeFailedSnapshotFromClusterState( @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations); + final ClusterState updatedState = SnapshotsServiceUtils.stateWithoutSnapshot(currentState, snapshot, shardGenerations); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them - return updateWithSnapshots( + return SnapshotsServiceUtils.updateWithSnapshots( updatedState, null, - deletionsWithoutSnapshots( + SnapshotsServiceUtils.deletionsWithoutSnapshots( SnapshotDeletionsInProgress.get(updatedState), Collections.singletonList(snapshot.getSnapshotId()), snapshot.getRepository() @@ -2021,42 +1397,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) private static final String REMOVE_SNAPSHOT_METADATA_TASK_SOURCE = "remove snapshot metadata"; - /** - * Remove the given {@link SnapshotId}s for the given {@code repository} from an instance of {@link SnapshotDeletionsInProgress}. - * If no deletion contained any of the snapshot ids to remove then return {@code null}. - * - * @param deletions snapshot deletions to update - * @param snapshotIds snapshot ids to remove - * @param repository repository that the snapshot ids belong to - * @return updated {@link SnapshotDeletionsInProgress} or {@code null} if unchanged - */ - @Nullable - private static SnapshotDeletionsInProgress deletionsWithoutSnapshots( - SnapshotDeletionsInProgress deletions, - Collection snapshotIds, - String repository - ) { - boolean changed = false; - List updatedEntries = new ArrayList<>(deletions.getEntries().size()); - for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { - if (entry.repository().equals(repository)) { - final List updatedSnapshotIds = new ArrayList<>(entry.snapshots()); - if (updatedSnapshotIds.removeAll(snapshotIds)) { - changed = true; - updatedEntries.add(entry.withSnapshots(updatedSnapshotIds)); - } else { - updatedEntries.add(entry); - } - } else { - updatedEntries.add(entry); - } - } - return changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null; - } - private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e, Consumer failingListenersConsumer) { final List> listeners = endAndGetListenersToResolve(snapshot); - failingListenersConsumer.accept(() -> failListenersIgnoringException(listeners, e)); + failingListenersConsumer.accept(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, e, logger)); assert repositoryOperations.assertNotQueued(snapshot); } @@ -2090,7 +1433,7 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis @Override public ClusterState execute(ClusterState currentState) { - ensureRepositoryExists(repositoryName, currentState); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, currentState); final Set snapshotIds = new HashSet<>(); // find in-progress snapshots to delete in cluster state @@ -2144,7 +1487,7 @@ public ClusterState execute(ClusterState currentState) { } } - ensureNoCleanupInProgress( + SnapshotsServiceUtils.ensureNoCleanupInProgress( currentState, repositoryName, snapshotIds.stream().findFirst().get().getName(), @@ -2194,7 +1537,7 @@ public ClusterState execute(ClusterState currentState) { ); if (snapshotIdsRequiringCleanup.isEmpty()) { // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions - return updateWithSnapshots(currentState, updatedSnapshots, null); + return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshots, null); } // add the snapshot deletion to the cluster state final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() @@ -2222,7 +1565,7 @@ public ClusterState execute(ClusterState currentState) { List.copyOf(snapshotIdsRequiringCleanup), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - updatedSnapshots.forRepo(repositoryName).stream().noneMatch(SnapshotsService::isWritingToRepository) + updatedSnapshots.forRepo(repositoryName).stream().noneMatch(SnapshotsServiceUtils::isWritingToRepository) && deletionsInProgress.hasExecutingDeletion(repositoryName) == false ? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING @@ -2230,7 +1573,7 @@ public ClusterState execute(ClusterState currentState) { } else { newDelete = replacedEntry.withAddedSnapshots(snapshotIdsRequiringCleanup); } - return updateWithSnapshots( + return SnapshotsServiceUtils.updateWithSnapshots( currentState, updatedSnapshots, (replacedEntry == null ? deletionsInProgress : deletionsInProgress.withRemovedEntry(replacedEntry.uuid())) @@ -2294,104 +1637,11 @@ public String toString() { }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure); } - /** - * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository. - * - * @param entry snapshot entry - * @return true if entry is currently writing to the repository - */ - private static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { - if (entry.state().completed()) { - // Entry is writing to the repo because it's finalizing on master - return true; - } - for (ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { - if (value.isActive()) { - // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard - return true; - } - } - return false; - } - private void addDeleteListener(String deleteUUID, ActionListener listener) { snapshotDeletionListeners.computeIfAbsent(deleteUUID, k -> new CopyOnWriteArrayList<>()) .add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext())); } - /** - * Determines the minimum {@link IndexVersion} that the snapshot repository must be compatible with - * from the current nodes in the cluster and the contents of the repository. - * The minimum version is determined as the lowest version found across all snapshots in the - * repository and all nodes in the cluster. - * - * @param minNodeVersion minimum node version in the cluster - * @param repositoryData current {@link RepositoryData} of that repository - * @param excluded snapshot id to ignore when computing the minimum version - * (used to use newer metadata version after a snapshot delete) - * @return minimum node version that must still be able to read the repository metadata - */ - public static IndexVersion minCompatibleVersion( - IndexVersion minNodeVersion, - RepositoryData repositoryData, - @Nullable Collection excluded - ) { - IndexVersion minCompatVersion = minNodeVersion; - final Collection snapshotIds = repositoryData.getSnapshotIds(); - for (SnapshotId snapshotId : snapshotIds.stream() - .filter(excluded == null ? Predicates.always() : Predicate.not(excluded::contains)) - .toList()) { - final IndexVersion known = repositoryData.getVersion(snapshotId); - // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs - if (known == null) { - assert repositoryData.shardGenerations().totalShards() == 0 - : "Saw shard generations [" - + repositoryData.shardGenerations() - + "] but did not have versions tracked for snapshot [" - + snapshotId - + "]"; - return OLD_SNAPSHOT_FORMAT; - } else { - minCompatVersion = IndexVersion.min(minCompatVersion, known); - } - } - return minCompatVersion; - } - - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useShardGenerations(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); - } - - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useIndexGenerations(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); - } - - /** - * Checks whether the metadata version supports writing the cluster- and repository-uuid to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports writing cluster- and repository-uuid to the repository - */ - public static boolean includesUUIDs(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(UUIDS_IN_REPO_DATA_VERSION); - } - - public static boolean includeFileInfoWriterUUID(IndexVersion repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION); - } - /** Deletes snapshot from repository * * @param deleteEntry delete entry in cluster state @@ -2517,7 +1767,7 @@ private void deleteSnapshotsFromRepository( removeSnapshotDeletionFromClusterState( deleteEntry, repositoryData, - listeners -> completeListenersIgnoringException(listeners, null) + listeners -> SnapshotsServiceUtils.completeListenersIgnoringException(listeners, null, logger) ); return; } @@ -2532,7 +1782,7 @@ public void onResponse(RepositoryData updatedRepoData) { listeners -> doneFuture.addListener(new ActionListener<>() { @Override public void onResponse(Void unused) { - completeListenersIgnoringException(listeners, null); + SnapshotsServiceUtils.completeListenersIgnoringException(listeners, null, logger); } @Override @@ -2561,7 +1811,7 @@ public void onFailure(Exception e) { new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) { @Override protected void handleListeners(List> deleteListeners) { - failListenersIgnoringException(deleteListeners, e); + SnapshotsServiceUtils.failListenersIgnoringException(deleteListeners, e, logger); } } ); @@ -2600,7 +1850,7 @@ private void removeSnapshotDeletionFromClusterState( submitUnbatchedTask("remove snapshot deletion metadata", new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) { @Override protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgress deletions) { - final SnapshotDeletionsInProgress updatedDeletions = deletionsWithoutSnapshots( + final SnapshotDeletionsInProgress updatedDeletions = SnapshotsServiceUtils.deletionsWithoutSnapshots( deletions, deleteEntry.snapshots(), deleteEntry.repository() @@ -2644,7 +1894,7 @@ private void failAllListenersOnMasterFailOver(Exception e) { final Exception wrapped = new RepositoryException("_all", "Failed to update cluster state during repository operation", e); for (final Iterator>> it = snapshotDeletionListeners.values().iterator(); it.hasNext();) { final List> listeners = it.next(); - readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, wrapped)); + readyToResolveListeners.add(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, wrapped, logger)); it.remove(); } assert snapshotDeletionListeners.isEmpty() : "No new listeners should have been added but saw " + snapshotDeletionListeners; @@ -2689,8 +1939,12 @@ public ClusterState execute(ClusterState currentState) { return currentState; } final SnapshotDeletionsInProgress newDeletions = filterDeletions(updatedDeletions); - final Tuple> res = readyDeletions( - updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions) + final Tuple> res = SnapshotsServiceUtils.readyDeletions( + SnapshotsServiceUtils.updateWithSnapshots( + currentState, + updatedSnapshotsInProgress(currentState, newDeletions), + newDeletions + ) ); readyDeletions = res.v2(); return res.v1(); @@ -2846,7 +2100,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState // No shards can be updated in this snapshot so we just add it as is again snapshotEntries.add(entry); } else { - final ImmutableOpenMap shardAssignments = shards( + final ImmutableOpenMap shardAssignments = SnapshotsServiceUtils.shards( snapshotsInProgress, updatedDeletions, currentState, @@ -2908,219 +2162,6 @@ public String toString() { } } - /** - * Shortcut to build new {@link ClusterState} from the current state and updated values of {@link SnapshotsInProgress} and - * {@link SnapshotDeletionsInProgress}. - * - * @param state current cluster state - * @param snapshotsInProgress new value for {@link SnapshotsInProgress} or {@code null} if it's unchanged - * @param snapshotDeletionsInProgress new value for {@link SnapshotDeletionsInProgress} or {@code null} if it's unchanged - * @return updated cluster state - */ - public static ClusterState updateWithSnapshots( - ClusterState state, - @Nullable SnapshotsInProgress snapshotsInProgress, - @Nullable SnapshotDeletionsInProgress snapshotDeletionsInProgress - ) { - if (snapshotsInProgress == null && snapshotDeletionsInProgress == null) { - return state; - } - ClusterState.Builder builder = ClusterState.builder(state); - if (snapshotsInProgress != null) { - builder.putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress); - } - if (snapshotDeletionsInProgress != null) { - builder.putCustom(SnapshotDeletionsInProgress.TYPE, snapshotDeletionsInProgress); - } - return builder.build(); - } - - private static void failListenersIgnoringException(@Nullable List> listeners, Exception failure) { - if (listeners != null) { - try { - ActionListener.onFailure(listeners, failure); - } catch (Exception ex) { - assert false : new AssertionError(ex); - logger.warn("Failed to notify listeners", ex); - } - } - } - - private static void completeListenersIgnoringException(@Nullable List> listeners, T result) { - if (listeners != null) { - try { - ActionListener.onResponse(listeners, result); - } catch (Exception ex) { - assert false : new AssertionError(ex); - logger.warn("Failed to notify listeners", ex); - } - } - } - - /** - * Calculates the assignment of shards to data nodes for a new snapshot based on the given cluster state and the - * indices that should be included in the snapshot. - * - * @param indices Indices to snapshot - * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot - * @return map of shard-id to snapshot-status of all shards included into current snapshot - */ - private static ImmutableOpenMap shards( - SnapshotsInProgress snapshotsInProgress, - SnapshotDeletionsInProgress deletionsInProgress, - ClusterState currentState, - Collection indices, - boolean useShardGenerations, - RepositoryData repositoryData, - String repoName - ) { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forEntries( - snapshotsInProgress.forRepo(repoName) - ); - final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(repoName) == false; - for (IndexId index : indices) { - final String indexName = index.getName(); - final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; - IndexMetadata indexMetadata = currentState.metadata().getProject().index(indexName); - if (indexMetadata == null) { - // The index was deleted before we managed to start the snapshot - mark it as missing. - builder.put(new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0), ShardSnapshotStatus.MISSING); - } else { - final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(indexName); - assert indexRoutingTable != null; - for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - final ShardId shardId = indexRoutingTable.shard(i).shardId(); - final ShardGeneration shardRepoGeneration; - if (useShardGenerations) { - final ShardGeneration inFlightGeneration = inFlightShardStates.generationForShard( - index, - shardId.id(), - shardGenerations - ); - if (inFlightGeneration == null && isNewIndex) { - assert shardGenerations.getShardGen(index, shardId.getId()) == null - : "Found shard generation for new index [" + index + "]"; - shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; - } else { - shardRepoGeneration = inFlightGeneration; - } - } else { - shardRepoGeneration = null; - } - final ShardSnapshotStatus shardSnapshotStatus; - if (readyToExecute == false || inFlightShardStates.isActive(shardId.getIndexName(), shardId.id())) { - shardSnapshotStatus = ShardSnapshotStatus.UNASSIGNED_QUEUED; - } else { - shardSnapshotStatus = initShardSnapshotStatus( - shardRepoGeneration, - indexRoutingTable.shard(i).primaryShard(), - snapshotsInProgress::isNodeIdForRemoval - ); - } - builder.put(shardId, shardSnapshotStatus); - } - } - } - - return builder.build(); - } - - /** - * Compute the snapshot status for a given shard based on the current primary routing entry for the shard. - * - * @param shardRepoGeneration repository generation of the shard in the repository - * @param primary primary routing entry for the shard - * @param nodeIdRemovalPredicate tests whether a node ID is currently marked for removal from the cluster - * @return shard snapshot status - */ - private static ShardSnapshotStatus initShardSnapshotStatus( - ShardGeneration shardRepoGeneration, - ShardRouting primary, - Predicate nodeIdRemovalPredicate - ) { - ShardSnapshotStatus shardSnapshotStatus; - if (primary == null || primary.assignedToNode() == false) { - shardSnapshotStatus = new ShardSnapshotStatus(null, ShardState.MISSING, shardRepoGeneration, "primary shard is not allocated"); - } else if (primary.relocating() || primary.initializing()) { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration); - } else if (nodeIdRemovalPredicate.test(primary.currentNodeId())) { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.PAUSED_FOR_NODE_REMOVAL, shardRepoGeneration); - } else if (primary.started() == false) { - shardSnapshotStatus = new ShardSnapshotStatus( - primary.currentNodeId(), - ShardState.MISSING, - shardRepoGeneration, - "primary shard hasn't been started yet" - ); - } else { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); - } - return shardSnapshotStatus; - } - - /** - * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the - * indices-to-check set. - */ - @FixForMultiProject - public static Set snapshottingDataStreams(final ProjectState projectState, final Set dataStreamsToCheck) { - // TODO multi-project: this will behave incorrectly when there are data streams with equal names in different projects that are - // being snapshotted at the same time. - Map dataStreams = projectState.metadata().dataStreams(); - return SnapshotsInProgress.get(projectState.cluster()) - .asStream() - .filter(e -> e.partial() == false) - .flatMap(e -> e.dataStreams().stream()) - .filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds)) - .collect(Collectors.toSet()); - } - - /** - * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set. - */ - public static Set snapshottingIndices(final ProjectState projectState, final Set indicesToCheck) { - final Set indices = new HashSet<>(); - for (List snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster()).entriesByRepo()) { - for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) { - if (entry.partial() == false && entry.isClone() == false) { - for (String indexName : entry.indices().keySet()) { - IndexMetadata indexMetadata = projectState.metadata().index(indexName); - if (indexMetadata != null && indicesToCheck.contains(indexMetadata.getIndex())) { - indices.add(indexMetadata.getIndex()); - } - } - } - } - } - return indices; - } - - /** - * Filters out the aliases that refer to data streams to do not exist in the provided data streams. - * Also rewrites the list of data streams an alias point to to only contain data streams that exist in the provided data streams. - * - * The purpose of this method is to capture the relevant data stream aliases based on the data streams - * that will be included in a snapshot. - * - * @param dataStreams The provided data streams, which will be included in a snapshot. - * @param dataStreamAliases The data streams aliases that may contain aliases that refer to data streams - * that don't exist in the provided data streams. - * @return The filtered data streams aliases only referring to data streams in the provided data streams. - */ - static Map filterDataStreamAliases( - Map dataStreams, - Map dataStreamAliases - ) { - - return dataStreamAliases.values() - .stream() - .filter(alias -> alias.getDataStreams().stream().anyMatch(dataStreams::containsKey)) - .map(alias -> alias.intersect(dataStreams::containsKey)) - .collect(Collectors.toMap(DataStreamAlias::getName, Function.identity())); - } - /** * Adds snapshot completion listener * @@ -3163,10 +2204,6 @@ public boolean assertAllListenersResolved() { return true; } - private static boolean isQueued(@Nullable ShardSnapshotStatus status) { - return status != null && status.state() == ShardState.QUEUED; - } - /** * State machine for updating existing {@link SnapshotsInProgress.Entry} by applying a given list of {@link ShardSnapshotUpdate} to * them. The algorithm implemented below works as described @@ -3258,7 +2295,9 @@ SnapshotsInProgress computeUpdatedState() { changedCount, startedCount ); - return supportsNodeRemovalTracking(initialState) ? updated.withUpdatedNodeIdsForRemoval(initialState) : updated; + return SnapshotsServiceUtils.supportsNodeRemovalTracking(initialState) + ? updated.withUpdatedNodeIdsForRemoval(initialState) + : updated; } return existing; } @@ -3483,7 +2522,7 @@ private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, Sh // start a shard snapshot or clone operation on the current entry if (entry.isClone() == false) { tryStartSnapshotAfterCloneFinish(repoShardId, updatedState.generation()); - } else if (isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { + } else if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { final String localNodeId = initialState.nodes().getLocalNodeId(); assert updatedState.nodeId().equals(localNodeId) : "Clone updated with node id [" + updatedState.nodeId() + "] but local node id is [" + localNodeId + "]"; @@ -3497,7 +2536,7 @@ private void tryStartNextTaskAfterSnapshotUpdated(ShardId shardId, ShardSnapshot final IndexId indexId = entry.indices().get(shardId.getIndexName()); if (indexId != null) { final RepositoryShardId repoShardId = new RepositoryShardId(indexId, shardId.id()); - if (isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { + if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { if (entry.isClone()) { // shard snapshot was completed, we check if we can start a clone operation for the same repo shard startShardOperation( @@ -3516,7 +2555,7 @@ private void tryStartNextTaskAfterSnapshotUpdated(ShardId shardId, ShardSnapshot private void tryStartSnapshotAfterCloneFinish(RepositoryShardId repoShardId, ShardGeneration generation) { assert entry.source() == null; // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id - if (isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { + if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) { startShardSnapshot(repoShardId, generation); } } @@ -3538,7 +2577,11 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g } else { shardRouting = indexRouting.shard(repoShardId.shardId()).primaryShard(); } - final ShardSnapshotStatus shardSnapshotStatus = initShardSnapshotStatus(generation, shardRouting, nodeIdRemovalPredicate); + final ShardSnapshotStatus shardSnapshotStatus = SnapshotsServiceUtils.initShardSnapshotStatus( + generation, + shardRouting, + nodeIdRemovalPredicate + ); final ShardId routingShardId = shardRouting != null ? shardRouting.shardId() : new ShardId(index, repoShardId.shardId()); if (shardSnapshotStatus.isActive()) { startShardOperation(shardsBuilder(), routingShardId, shardSnapshotStatus); @@ -3815,7 +2858,7 @@ public ClusterState execute(ClusterState currentState) { final SnapshotsInProgress updatedSnapshotsInProgress = changedSnapshots ? snapshotsInProgress.withUpdatedEntriesForRepo(repository, List.of()) : null; - return updateWithSnapshots(currentState, updatedSnapshotsInProgress, updatedDeletions); + return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshotsInProgress, updatedDeletions); } @Override @@ -3843,7 +2886,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) } for (String delete : deletionsToFail) { final List> listeners = snapshotDeletionListeners.remove(delete); - readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, failure)); + readyToResolveListeners.add(() -> SnapshotsServiceUtils.failListenersIgnoringException(listeners, failure, logger)); repositoryOperations.finishDeletion(delete); } } @@ -3963,7 +3006,7 @@ public String toString() { } private static void logSnapshotFailure(String operation, Snapshot snapshot, Exception e) { - final var logLevel = snapshotFailureLogLevel(e); + final var logLevel = SnapshotsServiceUtils.snapshotFailureLogLevel(e); if (logLevel == Level.INFO && logger.isDebugEnabled() == false) { // suppress stack trace at INFO unless extra verbosity is configured logger.info( @@ -3984,29 +3027,6 @@ private static void logSnapshotFailure(String operation, Snapshot snapshot, Exce } } - private static Level snapshotFailureLogLevel(Exception e) { - if (MasterService.isPublishFailureException(e)) { - // no action needed, the new master will take things from here - return Level.INFO; - } else if (e instanceof InvalidSnapshotNameException) { - // no action needed, typically ILM-related, or a user error - return Level.INFO; - } else if (e instanceof IndexNotFoundException) { - // not worrying, most likely a user error - return Level.INFO; - } else if (e instanceof SnapshotException) { - if (e.getMessage().contains(ReferenceDocs.UNASSIGNED_SHARDS.toString())) { - // non-partial snapshot requested but cluster health is not yellow or green; the health is tracked elsewhere so no need to - // make more noise here - return Level.INFO; - } - } else if (e instanceof IllegalArgumentException) { - // some other user error - return Level.INFO; - } - return Level.WARN; - } - private class SnapshotTaskExecutor implements ClusterStateTaskExecutor { @Override public ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception { @@ -4066,13 +3086,13 @@ private SnapshotsInProgress createSnapshot( final Snapshot snapshot = createSnapshotTask.snapshot; final String repositoryName = snapshot.getRepository(); final String snapshotName = snapshot.getSnapshotId().getName(); - ensureRepositoryExists(repositoryName, currentState); + SnapshotsServiceUtils.ensureRepositoryExists(repositoryName, currentState); final Repository repository = createSnapshotTask.repository; - ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); - ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName); - validate(repositoryName, snapshotName, currentState); + SnapshotsServiceUtils.ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + SnapshotsServiceUtils.ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName); + SnapshotsServiceUtils.validate(repositoryName, snapshotName, currentState); final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState); - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot"); + SnapshotsServiceUtils.ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot"); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress); final CreateSnapshotRequest request = createSnapshotTask.createSnapshotRequest; @@ -4181,7 +3201,7 @@ private SnapshotsInProgress createSnapshot( allIndices.putAll(runningSnapshot.indices()); } final Map indexIds = repositoryData.resolveNewIndices(indices, allIndices); - final IndexVersion version = minCompatibleVersion( + final IndexVersion version = SnapshotsServiceUtils.minCompatibleVersion( // NB minCompatibleVersion iterates over all the snapshots in the current repositoryData, which probably should happen on a // different thread. Also is the _current_ repositoryData the right thing to consider? The minimum repository format version // can only advance during a snapshot delete which today is never concurrent to other writes, but a future version may allow @@ -4190,12 +3210,12 @@ private SnapshotsInProgress createSnapshot( repositoryData, null ); - ImmutableOpenMap shards = shards( + ImmutableOpenMap shards = SnapshotsServiceUtils.shards( snapshotsInProgress, deletionsInProgress, currentState, indexIds.values(), - useShardGenerations(version), + SnapshotsServiceUtils.useShardGenerations(version), repositoryData, repositoryName ); @@ -4257,7 +3277,7 @@ static ClusterState executeBatch( } final var clusterState = batchExecutionContext.initialState(); - if (supportsNodeRemovalTracking(clusterState)) { + if (SnapshotsServiceUtils.supportsNodeRemovalTracking(clusterState)) { final var snapshotsInProgress = SnapshotsInProgress.get(clusterState); final var newSnapshotsInProgress = snapshotsInProgress.withUpdatedNodeIdsForRemoval(clusterState); if (newSnapshotsInProgress != snapshotsInProgress) { @@ -4268,9 +3288,5 @@ static ClusterState executeBatch( } } - private static boolean supportsNodeRemovalTracking(ClusterState clusterState) { - return clusterState.getMinTransportVersion().onOrAfter(TransportVersions.V_8_13_0); - } - private final MasterServiceTaskQueue updateNodeIdsToRemoveQueue; } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java new file mode 100644 index 0000000000000..2355f37726491 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java @@ -0,0 +1,1094 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamAlias; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Predicates; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.RepositoryShardId; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.ShardGenerations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Collections.unmodifiableList; + +/** + * A utility class for static snapshotting methods. + */ +public final class SnapshotsServiceUtils { + + static void ensureSnapshotNameNotRunning(SnapshotsInProgress runningSnapshots, String repositoryName, String snapshotName) { + if (runningSnapshots.forRepo(repositoryName).stream().anyMatch(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName))) { + throw new SnapshotNameAlreadyInUseException(repositoryName, snapshotName, "snapshot with the same name is already in-progress"); + } + } + + /** + * Checks the cluster state for any in-progress repository cleanup tasks ({@link RepositoryCleanupInProgress}). + */ + static void ensureNoCleanupInProgress( + final ClusterState currentState, + final String repositoryName, + final String snapshotName, + final String reason + ) { + final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState); + if (repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot " + + reason + + " while a repository cleanup is in-progress in " + + repositoryCleanupInProgress.entries() + .stream() + .map(RepositoryCleanupInProgress.Entry::repository) + .collect(Collectors.toSet()) + ); + } + } + + static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) { + // check if the snapshot name already exists in the repository + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new SnapshotNameAlreadyInUseException( + repository.getMetadata().name(), + snapshotName, + "snapshot with the same name already exists" + ); + } + } + + /** + * Throws {@link RepositoryMissingException} if no repository by the given name is found in the given cluster state. + */ + public static void ensureRepositoryExists(String repoName, ClusterState state) { + if (RepositoriesMetadata.get(state).repository(repoName) == null) { + throw new RepositoryMissingException(repoName); + } + } + + /** + * Validates snapshot request + * + * @param repositoryName repository name + * @param snapshotName snapshot name + * @param state current cluster state + */ + static void validate(String repositoryName, String snapshotName, ClusterState state) { + if (RepositoriesMetadata.get(state).repository(repositoryName) == null) { + throw new RepositoryMissingException(repositoryName); + } + validate(repositoryName, snapshotName); + } + + static void validate(final String repositoryName, final String snapshotName) { + if (Strings.hasLength(snapshotName) == false) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "cannot be empty"); + } + if (snapshotName.contains(" ")) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain whitespace"); + } + if (snapshotName.contains(",")) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain ','"); + } + if (snapshotName.contains("#")) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not contain '#'"); + } + if (snapshotName.charAt(0) == '_') { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must not start with '_'"); + } + if (snapshotName.toLowerCase(Locale.ROOT).equals(snapshotName) == false) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "must be lowercase"); + } + if (Strings.validFileName(snapshotName) == false) { + throw new InvalidSnapshotNameException( + repositoryName, + snapshotName, + "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS + ); + } + } + + /** + * Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it + * to be assigned + */ + static boolean assertNoDanglingSnapshots(ClusterState state) { + final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state); + final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state); + final Set reposWithRunningDelete = snapshotDeletionsInProgress.getEntries() + .stream() + .filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.STARTED) + .map(SnapshotDeletionsInProgress.Entry::repository) + .collect(Collectors.toSet()); + for (List repoEntry : snapshotsInProgress.entriesByRepo()) { + final SnapshotsInProgress.Entry entry = repoEntry.get(0); + for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { + if (value.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) { + assert reposWithRunningDelete.contains(entry.repository()) + : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete"; + } else if (value.isActive()) { + assert reposWithRunningDelete.contains(entry.repository()) == false + : "Found shard snapshot actively executing in [" + + entry + + "] when it should be blocked by a running delete [" + + Strings.toString(snapshotDeletionsInProgress) + + "]"; + } + } + } + return true; + } + + /** + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useShardGenerations(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); + } + + /** + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useIndexGenerations(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION); + } + + /** + * Checks whether the metadata version supports writing the cluster- and repository-uuid to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports writing cluster- and repository-uuid to the repository + */ + public static boolean includesUUIDs(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(SnapshotsService.UUIDS_IN_REPO_DATA_VERSION); + } + + public static boolean includeFileInfoWriterUUID(IndexVersion repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(SnapshotsService.FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION); + } + + static boolean supportsNodeRemovalTracking(ClusterState clusterState) { + return clusterState.getMinTransportVersion().onOrAfter(TransportVersions.V_8_13_0); + } + + /** + * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository. + * + * @param entry snapshot entry + * @return true if entry is currently writing to the repository + */ + static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { + if (entry.state().completed()) { + // Entry is writing to the repo because it's finalizing on master + return true; + } + for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { + if (value.isActive()) { + // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard + return true; + } + } + return false; + } + + static boolean isQueued(@Nullable SnapshotsInProgress.ShardSnapshotStatus status) { + return status != null && status.state() == SnapshotsInProgress.ShardState.QUEUED; + } + + static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { + if (removedNodes.isEmpty()) { + // Nothing to do, no nodes removed + return false; + } + final Set removedNodeIds = removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + return snapshotsInProgress.asStream().anyMatch(snapshot -> { + if (snapshot.state().completed() || snapshot.isClone()) { + // nothing to do for already completed snapshots or clones that run on master anyways + return false; + } + for (SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus : snapshot.shardSnapshotStatusByRepoShardId().values()) { + if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) { + // Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status + return true; + } + } + return false; + }); + } + + static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { + for (List entries : snapshotsInProgress.entriesByRepo()) { + for (SnapshotsInProgress.Entry entry : entries) { + if (entry.state() == SnapshotsInProgress.State.STARTED && entry.isClone() == false) { + for (Map.Entry shardStatus : entry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardState state = shardStatus.getValue().state(); + if (state != SnapshotsInProgress.ShardState.WAITING + && state != SnapshotsInProgress.ShardState.QUEUED + && state != SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { + continue; + } + final RepositoryShardId shardId = shardStatus.getKey(); + final Index index = entry.indexByName(shardId.indexName()); + if (event.indexRoutingTableChanged(index)) { + IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index); + if (indexShardRoutingTable == null) { + // index got removed concurrently and we have to fail WAITING, QUEUED and PAUSED_FOR_REMOVAL state shards + return true; + } + ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.shardId()).primaryShard(); + if (shardRouting.started() && snapshotsInProgress.isNodeIdForRemoval(shardRouting.currentNodeId()) == false + || shardRouting.unassigned()) { + return true; + } + } + } + } + } + } + return false; + } + + static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + if (snapshot.isClone()) { + snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value)); + } else { + snapshot.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { + final Index index = snapshot.indexByName(key.indexName()); + if (metadata.findIndex(index).isEmpty()) { + assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; + return; + } + builder.put(key.index(), key.shardId(), value); + }); + } + return builder.build(); + } + + static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + final Metadata.Builder builder; + if (snapshot.includeGlobalState() == false) { + // Remove global state from the cluster state + builder = Metadata.builder(); + for (IndexId index : snapshot.indices().values()) { + final IndexMetadata indexMetadata = metadata.getProject().index(index.getName()); + if (indexMetadata == null) { + assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; + } else { + builder.put(indexMetadata, false); + } + } + } else { + builder = Metadata.builder(metadata); + } + // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have + // all their indices contained in the snapshot + final Map dataStreams = new HashMap<>(); + final Set indicesInSnapshot = snapshot.indices().keySet(); + for (String dataStreamName : snapshot.dataStreams()) { + DataStream dataStream = metadata.getProject().dataStreams().get(dataStreamName); + if (dataStream == null) { + assert snapshot.partial() + : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; + } else { + final DataStream reconciled = dataStream.snapshot(indicesInSnapshot, builder); + if (reconciled != null) { + dataStreams.put(dataStreamName, reconciled); + } + } + } + return builder.dataStreams(dataStreams, filterDataStreamAliases(dataStreams, metadata.getProject().dataStreamAliases())).build(); + } + + /** + * Returns status of the currently running snapshots + *

+ * This method is executed on master node + *

+ * + * @param snapshotsInProgress snapshots in progress in the cluster state + * @param repository repository id + * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered + * @return list of metadata for currently running snapshots + */ + public static List currentSnapshots( + @Nullable SnapshotsInProgress snapshotsInProgress, + String repository, + List snapshots + ) { + if (snapshotsInProgress == null || snapshotsInProgress.isEmpty()) { + return Collections.emptyList(); + } + if ("_all".equals(repository)) { + return snapshotsInProgress.asStream().toList(); + } + if (snapshots.isEmpty()) { + return snapshotsInProgress.forRepo(repository); + } + List builder = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repository)) { + for (String snapshot : snapshots) { + if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) { + builder.add(entry); + break; + } + } + } + return unmodifiableList(builder); + } + + /** + * Removes all feature states which have missing or failed shards, as they are no longer safely restorable. + * @param entry The "in progress" entry with a list of feature states and one or more failed shards. + * @param finalIndices The final list of indices in the snapshot, after any indices that were concurrently deleted are removed. + * @return The list of feature states which were completed successfully in the given entry. + */ + static List onlySuccessfulFeatureStates(SnapshotsInProgress.Entry entry, List finalIndices) { + assert entry.partial() : "should not try to filter feature states from a non-partial entry"; + + // Figure out which indices have unsuccessful shards + Set indicesWithUnsuccessfulShards = new HashSet<>(); + entry.shardSnapshotStatusByRepoShardId().forEach((key, value) -> { + final SnapshotsInProgress.ShardState shardState = value.state(); + if (shardState.failed() || shardState.completed() == false) { + indicesWithUnsuccessfulShards.add(key.indexName()); + } + }); + + // Now remove any feature states which contain any of those indices, as the feature state is not intact and not safely restorable + return entry.featureStates() + .stream() + .filter(stateInfo -> finalIndices.containsAll(stateInfo.getIndices())) + .filter(stateInfo -> stateInfo.getIndices().stream().anyMatch(indicesWithUnsuccessfulShards::contains) == false) + .toList(); + } + + /** + * Determines the minimum {@link IndexVersion} that the snapshot repository must be compatible with + * from the current nodes in the cluster and the contents of the repository. + * The minimum version is determined as the lowest version found across all snapshots in the + * repository and all nodes in the cluster. + * + * @param minNodeVersion minimum node version in the cluster + * @param repositoryData current {@link RepositoryData} of that repository + * @param excluded snapshot id to ignore when computing the minimum version + * (used to use newer metadata version after a snapshot delete) + * @return minimum node version that must still be able to read the repository metadata + */ + public static IndexVersion minCompatibleVersion( + IndexVersion minNodeVersion, + RepositoryData repositoryData, + @Nullable Collection excluded + ) { + IndexVersion minCompatVersion = minNodeVersion; + final Collection snapshotIds = repositoryData.getSnapshotIds(); + for (SnapshotId snapshotId : snapshotIds.stream() + .filter(excluded == null ? Predicates.always() : Predicate.not(excluded::contains)) + .toList()) { + final IndexVersion known = repositoryData.getVersion(snapshotId); + // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs + if (known == null) { + assert repositoryData.shardGenerations().totalShards() == 0 + : "Saw shard generations [" + + repositoryData.shardGenerations() + + "] but did not have versions tracked for snapshot [" + + snapshotId + + "]"; + return SnapshotsService.OLD_SNAPSHOT_FORMAT; + } else { + minCompatVersion = IndexVersion.min(minCompatVersion, known); + } + } + return minCompatVersion; + } + + /** + * Shortcut to build new {@link ClusterState} from the current state and updated values of {@link SnapshotsInProgress} and + * {@link SnapshotDeletionsInProgress}. + * + * @param state current cluster state + * @param snapshotsInProgress new value for {@link SnapshotsInProgress} or {@code null} if it's unchanged + * @param snapshotDeletionsInProgress new value for {@link SnapshotDeletionsInProgress} or {@code null} if it's unchanged + * @return updated cluster state + */ + public static ClusterState updateWithSnapshots( + ClusterState state, + @Nullable SnapshotsInProgress snapshotsInProgress, + @Nullable SnapshotDeletionsInProgress snapshotDeletionsInProgress + ) { + if (snapshotsInProgress == null && snapshotDeletionsInProgress == null) { + return state; + } + ClusterState.Builder builder = ClusterState.builder(state); + if (snapshotsInProgress != null) { + builder.putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress); + } + if (snapshotDeletionsInProgress != null) { + builder.putCustom(SnapshotDeletionsInProgress.TYPE, snapshotDeletionsInProgress); + } + return builder.build(); + } + + /** + * Compute the snapshot status for a given shard based on the current primary routing entry for the shard. + * + * @param shardRepoGeneration repository generation of the shard in the repository + * @param primary primary routing entry for the shard + * @param nodeIdRemovalPredicate tests whether a node ID is currently marked for removal from the cluster + * @return shard snapshot status + */ + static SnapshotsInProgress.ShardSnapshotStatus initShardSnapshotStatus( + ShardGeneration shardRepoGeneration, + ShardRouting primary, + Predicate nodeIdRemovalPredicate + ) { + SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus; + if (primary == null || primary.assignedToNode() == false) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + null, + SnapshotsInProgress.ShardState.MISSING, + shardRepoGeneration, + "primary shard is not allocated" + ); + } else if (primary.relocating() || primary.initializing()) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + primary.currentNodeId(), + SnapshotsInProgress.ShardState.WAITING, + shardRepoGeneration + ); + } else if (nodeIdRemovalPredicate.test(primary.currentNodeId())) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + primary.currentNodeId(), + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + shardRepoGeneration + ); + } else if (primary.started() == false) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus( + primary.currentNodeId(), + SnapshotsInProgress.ShardState.MISSING, + shardRepoGeneration, + "primary shard hasn't been started yet" + ); + } else { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); + } + return shardSnapshotStatus; + } + + /** + * Calculates the assignment of shards to data nodes for a new snapshot based on the given cluster state and the + * indices that should be included in the snapshot. + * + * @param indices Indices to snapshot + * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot + * @return map of shard-id to snapshot-status of all shards included into current snapshot + */ + static ImmutableOpenMap shards( + SnapshotsInProgress snapshotsInProgress, + SnapshotDeletionsInProgress deletionsInProgress, + ClusterState currentState, + Collection indices, + boolean useShardGenerations, + RepositoryData repositoryData, + String repoName + ) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); + final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forEntries( + snapshotsInProgress.forRepo(repoName) + ); + final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(repoName) == false; + for (IndexId index : indices) { + final String indexName = index.getName(); + final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; + IndexMetadata indexMetadata = currentState.metadata().getProject().index(indexName); + if (indexMetadata == null) { + // The index was deleted before we managed to start the snapshot - mark it as missing. + builder.put(new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0), SnapshotsInProgress.ShardSnapshotStatus.MISSING); + } else { + final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(indexName); + assert indexRoutingTable != null; + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + final ShardId shardId = indexRoutingTable.shard(i).shardId(); + final ShardGeneration shardRepoGeneration; + if (useShardGenerations) { + final ShardGeneration inFlightGeneration = inFlightShardStates.generationForShard( + index, + shardId.id(), + shardGenerations + ); + if (inFlightGeneration == null && isNewIndex) { + assert shardGenerations.getShardGen(index, shardId.getId()) == null + : "Found shard generation for new index [" + index + "]"; + shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; + } else { + shardRepoGeneration = inFlightGeneration; + } + } else { + shardRepoGeneration = null; + } + final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus; + if (readyToExecute == false || inFlightShardStates.isActive(shardId.getIndexName(), shardId.id())) { + shardSnapshotStatus = SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED; + } else { + shardSnapshotStatus = initShardSnapshotStatus( + shardRepoGeneration, + indexRoutingTable.shard(i).primaryShard(), + snapshotsInProgress::isNodeIdForRemoval + ); + } + builder.put(shardId, shardSnapshotStatus); + } + } + } + + return builder.build(); + } + + /** + * Finds snapshot delete operations that are ready to execute in the given {@link ClusterState} and computes a new cluster state that + * has all executable deletes marked as executing. Returns a {@link Tuple} of the updated cluster state and all executable deletes. + * This can either be {@link SnapshotDeletionsInProgress.Entry} that were already in state + * {@link SnapshotDeletionsInProgress.State#STARTED} or waiting entries in state {@link SnapshotDeletionsInProgress.State#WAITING} + * that were moved to {@link SnapshotDeletionsInProgress.State#STARTED} in the returned updated cluster state. + * + * @param currentState current cluster state + * @return tuple of an updated cluster state and currently executable snapshot delete operations + */ + public static Tuple> readyDeletions(ClusterState currentState) { + final SnapshotDeletionsInProgress deletions = SnapshotDeletionsInProgress.get(currentState); + if (deletions.hasDeletionsInProgress() == false) { + return Tuple.tuple(currentState, List.of()); + } + final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE); + assert snapshotsInProgress != null; + final Set repositoriesSeen = new HashSet<>(); + boolean changed = false; + final ArrayList readyDeletions = new ArrayList<>(); + final List newDeletes = new ArrayList<>(); + for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { + final String repo = entry.repository(); + if (repositoriesSeen.add(entry.repository()) + && entry.state() == SnapshotDeletionsInProgress.State.WAITING + && snapshotsInProgress.forRepo(repo).stream().noneMatch(SnapshotsServiceUtils::isWritingToRepository)) { + changed = true; + final SnapshotDeletionsInProgress.Entry newEntry = entry.started(); + readyDeletions.add(newEntry); + newDeletes.add(newEntry); + } else { + newDeletes.add(entry); + } + } + return Tuple.tuple( + changed + ? ClusterState.builder(currentState) + .putCustom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.of(newDeletes)) + .build() + : currentState, + readyDeletions + ); + } + + static void addSnapshotEntry( + List entries, + SnapshotsInProgress.Entry entryToUpdate, + @Nullable ImmutableOpenMap.Builder updatedShardAssignments + ) { + if (updatedShardAssignments == null) { + entries.add(entryToUpdate); + } else { + final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder( + entryToUpdate.shards() + ); + updatedStatus.putAllFromMap(updatedShardAssignments.build()); + entries.add(entryToUpdate.withShardStates(updatedStatus.build())); + } + } + + static void addCloneEntry( + List entries, + SnapshotsInProgress.Entry entryToUpdate, + @Nullable ImmutableOpenMap.Builder updatedShardAssignments + ) { + if (updatedShardAssignments == null) { + entries.add(entryToUpdate); + } else { + final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap + .builder(entryToUpdate.shardSnapshotStatusByRepoShardId()); + updatedStatus.putAllFromMap(updatedShardAssignments.build()); + entries.add(entryToUpdate.withClones(updatedStatus.build())); + } + } + + @Nullable + static ImmutableOpenMap.Builder maybeAddUpdatedAssignment( + @Nullable ImmutableOpenMap.Builder updatedShardAssignments, + SnapshotsInProgress.ShardSnapshotStatus finishedShardState, + T shardId, + Map statesToUpdate + ) { + final ShardGeneration newGeneration = finishedShardState.generation(); + final SnapshotsInProgress.ShardSnapshotStatus stateToUpdate = statesToUpdate.get(shardId); + if (stateToUpdate != null + && stateToUpdate.state() == SnapshotsInProgress.ShardState.SUCCESS + && Objects.equals(newGeneration, stateToUpdate.generation()) == false) { + if (updatedShardAssignments == null) { + updatedShardAssignments = ImmutableOpenMap.builder(); + } + updatedShardAssignments.put(shardId, stateToUpdate.withUpdatedGeneration(newGeneration)); + } + return updatedShardAssignments; + } + + /** + * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update + * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an + * outdated shard generation. + *

+ * For example, shard snapshot X can be taken, but not finalized yet. Shard snapshot Y can then depend upon shard snapshot X. Then shard + * snapshot Y may finalize before shard snapshot X, but including X. However, X does not include Y. Therefore we update X to use Y's + * shard generation file (list of snapshots and dependencies) to avoid overwriting with X's file that is missing Y. + * + * @param state current cluster state + * @param snapshot snapshot for which to remove the snapshot operation + * @return updated cluster state + */ + public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) { + final SnapshotsInProgress inProgressSnapshots = SnapshotsInProgress.get(state); + ClusterState result = state; + int indexOfEntry = -1; + // Find the in-progress snapshot entry that matches {@code snapshot}. + final List entryList = inProgressSnapshots.forRepo(snapshot.getRepository()); + for (int i = 0; i < entryList.size(); i++) { + SnapshotsInProgress.Entry entry = entryList.get(i); + if (entry.snapshot().equals(snapshot)) { + indexOfEntry = i; + break; + } + } + if (indexOfEntry >= 0) { + final List updatedEntries = new ArrayList<>(entryList.size() - 1); + final SnapshotsInProgress.Entry removedEntry = entryList.get(indexOfEntry); + for (int i = 0; i < indexOfEntry; i++) { + final SnapshotsInProgress.Entry previousEntry = entryList.get(i); + if (removedEntry.isClone()) { + if (previousEntry.isClone()) { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) { + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + finishedShardEntry.getKey(), + previousEntry.shardSnapshotStatusByRepoShardId() + ); + } + } + addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); + } else { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); + if (shardState.state() != SnapshotsInProgress.ShardState.SUCCESS + || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false) { + continue; + } + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + previousEntry.shardId(repositoryShardId), + previousEntry.shards() + ); + + } + addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); + } + } else { + if (previousEntry.isClone()) { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); + if (shardState.state() != SnapshotsInProgress.ShardState.SUCCESS + || previousEntry.shardSnapshotStatusByRepoShardId().containsKey(repositoryShardId) == false + || shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { + continue; + } + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + repositoryShardId, + previousEntry.shardSnapshotStatusByRepoShardId() + ); + } + addCloneEntry(updatedEntries, previousEntry, updatedShardAssignments); + } else { + ImmutableOpenMap.Builder updatedShardAssignments = null; + for (Map.Entry finishedShardEntry : removedEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + final SnapshotsInProgress.ShardSnapshotStatus shardState = finishedShardEntry.getValue(); + if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS + && previousEntry.shardSnapshotStatusByRepoShardId().containsKey(finishedShardEntry.getKey()) + && shardGenerations.hasShardGen(finishedShardEntry.getKey())) { + updatedShardAssignments = maybeAddUpdatedAssignment( + updatedShardAssignments, + shardState, + previousEntry.shardId(finishedShardEntry.getKey()), + previousEntry.shards() + ); + } + } + addSnapshotEntry(updatedEntries, previousEntry, updatedShardAssignments); + } + } + } + for (int i = indexOfEntry + 1; i < entryList.size(); i++) { + updatedEntries.add(entryList.get(i)); + } + result = ClusterState.builder(state) + .putCustom( + SnapshotsInProgress.TYPE, + inProgressSnapshots.withUpdatedEntriesForRepo(snapshot.getRepository(), updatedEntries) + ) + .build(); + } + return readyDeletions(result).v1(); + } + + /** + * Remove the given {@link SnapshotId}s for the given {@code repository} from an instance of {@link SnapshotDeletionsInProgress}. + * If no deletion contained any of the snapshot ids to remove then return {@code null}. + * + * @param deletions snapshot deletions to update + * @param snapshotIds snapshot ids to remove + * @param repository repository that the snapshot ids belong to + * @return updated {@link SnapshotDeletionsInProgress} or {@code null} if unchanged + */ + @Nullable + static SnapshotDeletionsInProgress deletionsWithoutSnapshots( + SnapshotDeletionsInProgress deletions, + Collection snapshotIds, + String repository + ) { + boolean changed = false; + List updatedEntries = new ArrayList<>(deletions.getEntries().size()); + for (SnapshotDeletionsInProgress.Entry entry : deletions.getEntries()) { + if (entry.repository().equals(repository)) { + final List updatedSnapshotIds = new ArrayList<>(entry.snapshots()); + if (updatedSnapshotIds.removeAll(snapshotIds)) { + changed = true; + updatedEntries.add(entry.withSnapshots(updatedSnapshotIds)); + } else { + updatedEntries.add(entry); + } + } else { + updatedEntries.add(entry); + } + } + return changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null; + } + + /** + * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the + * indices-to-check set. + */ + @FixForMultiProject + public static Set snapshottingDataStreams(final ProjectState projectState, final Set dataStreamsToCheck) { + // TODO multi-project: this will behave incorrectly when there are data streams with equal names in different projects that are + // being snapshotted at the same time. + Map dataStreams = projectState.metadata().dataStreams(); + return SnapshotsInProgress.get(projectState.cluster()) + .asStream() + .filter(e -> e.partial() == false) + .flatMap(e -> e.dataStreams().stream()) + .filter(ds -> dataStreams.containsKey(ds) && dataStreamsToCheck.contains(ds)) + .collect(Collectors.toSet()); + } + + /** + * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set. + */ + public static Set snapshottingIndices(final ProjectState projectState, final Set indicesToCheck) { + final Set indices = new HashSet<>(); + for (List snapshotsInRepo : SnapshotsInProgress.get(projectState.cluster()).entriesByRepo()) { + for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) { + if (entry.partial() == false && entry.isClone() == false) { + for (String indexName : entry.indices().keySet()) { + IndexMetadata indexMetadata = projectState.metadata().index(indexName); + if (indexMetadata != null && indicesToCheck.contains(indexMetadata.getIndex())) { + indices.add(indexMetadata.getIndex()); + } + } + } + } + } + return indices; + } + + /** + * Filters out the aliases that refer to data streams to do not exist in the provided data streams. + * Also rewrites the list of data streams an alias point to to only contain data streams that exist in the provided data streams. + * + * The purpose of this method is to capture the relevant data stream aliases based on the data streams + * that will be included in a snapshot. + * + * @param dataStreams The provided data streams, which will be included in a snapshot. + * @param dataStreamAliases The data streams aliases that may contain aliases that refer to data streams + * that don't exist in the provided data streams. + * @return The filtered data streams aliases only referring to data streams in the provided data streams. + */ + static Map filterDataStreamAliases( + Map dataStreams, + Map dataStreamAliases + ) { + + return dataStreamAliases.values() + .stream() + .filter(alias -> alias.getDataStreams().stream().anyMatch(dataStreams::containsKey)) + .map(alias -> alias.intersect(dataStreams::containsKey)) + .collect(Collectors.toMap(DataStreamAlias::getName, Function.identity())); + } + + /** + * Walks through the snapshot entries' shard snapshots and creates applies updates from looking at removed nodes or indexes and known + * failed shard snapshots on the same shard IDs. + * + * @param nodeIdRemovalPredicate identify any nodes that are marked for removal / in shutdown mode + * @param knownFailures already known failed shard snapshots, but more may be found in this method + * @return an updated map of shard statuses + */ + public static ImmutableOpenMap processWaitingShardsAndRemovedNodes( + SnapshotsInProgress.Entry snapshotEntry, + RoutingTable routingTable, + DiscoveryNodes nodes, + Predicate nodeIdRemovalPredicate, + Map knownFailures, + Logger logger + ) { + assert snapshotEntry.isClone() == false : "clones take a different path"; + boolean snapshotChanged = false; + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + for (Map.Entry shardSnapshotEntry : snapshotEntry + .shardSnapshotStatusByRepoShardId() + .entrySet()) { + SnapshotsInProgress.ShardSnapshotStatus shardStatus = shardSnapshotEntry.getValue(); + ShardId shardId = snapshotEntry.shardId(shardSnapshotEntry.getKey()); + if (shardStatus.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) { + // this shard snapshot is waiting for a previous snapshot to finish execution for this shard + final SnapshotsInProgress.ShardSnapshotStatus knownFailure = knownFailures.get(shardSnapshotEntry.getKey()); + if (knownFailure == null) { + final IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); + if (indexShardRoutingTable == null) { + // shard became unassigned while queued after a delete or clone operation so we can fail as missing here + assert snapshotEntry.partial(); + snapshotChanged = true; + logger.debug("failing snapshot of shard [{}] because index got deleted", shardId); + shards.put(shardId, SnapshotsInProgress.ShardSnapshotStatus.MISSING); + knownFailures.put(shardSnapshotEntry.getKey(), SnapshotsInProgress.ShardSnapshotStatus.MISSING); + } else { + // if no failure is known for the shard we keep waiting + shards.put(shardId, shardStatus); + } + } else { + // If a failure is known for an execution we waited on for this shard then we fail with the same exception here + // as well + snapshotChanged = true; + shards.put(shardId, knownFailure); + } + } else if (shardStatus.state() == SnapshotsInProgress.ShardState.WAITING + || shardStatus.state() == SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { + // The shard primary wasn't assigned, or the shard snapshot was paused because the node was shutting down. + IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); + if (indexShardRoutingTable != null) { + IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id()); + if (shardRouting != null) { + final var primaryNodeId = shardRouting.primaryShard().currentNodeId(); + if (nodeIdRemovalPredicate.test(primaryNodeId)) { + if (shardStatus.state() == SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { + // Shard that we are waiting for is on a node marked for removal, keep it as PAUSED_FOR_REMOVAL + shards.put(shardId, shardStatus); + } else { + // Shard that we are waiting for is on a node marked for removal, move it to PAUSED_FOR_REMOVAL + snapshotChanged = true; + shards.put( + shardId, + new SnapshotsInProgress.ShardSnapshotStatus( + primaryNodeId, + SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, + shardStatus.generation() + ) + ); + } + continue; + } else if (shardRouting.primaryShard().started()) { + // Shard that we were waiting for has started on a node, let's process it + snapshotChanged = true; + logger.debug(""" + Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ + shard state [{}] + """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); + shards.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); + continue; + } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { + // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait + shards.put(shardId, shardStatus); + continue; + } + } + } + // Shard that we were waiting for went into unassigned state or disappeared (index or shard is gone) - giving up + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on node [{}] because shard is unassigned", shardId, shardStatus.nodeId()); + final SnapshotsInProgress.ShardSnapshotStatus failedState = new SnapshotsInProgress.ShardSnapshotStatus( + shardStatus.nodeId(), + SnapshotsInProgress.ShardState.FAILED, + shardStatus.generation(), + "shard is unassigned" + ); + shards.put(shardId, failedState); + knownFailures.put(shardSnapshotEntry.getKey(), failedState); + } else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) { + if (nodes.nodeExists(shardStatus.nodeId())) { + shards.put(shardId, shardStatus); + } else { + // TODO: Restart snapshot on another node? + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on departed node [{}]", shardId, shardStatus.nodeId()); + final SnapshotsInProgress.ShardSnapshotStatus failedState = new SnapshotsInProgress.ShardSnapshotStatus( + shardStatus.nodeId(), + SnapshotsInProgress.ShardState.FAILED, + shardStatus.generation(), + "node left the cluster during snapshot" + ); + shards.put(shardId, failedState); + knownFailures.put(shardSnapshotEntry.getKey(), failedState); + } + } else { + shards.put(shardId, shardStatus); + } + } + if (snapshotChanged) { + return shards.build(); + } else { + return null; + } + } + + static void failListenersIgnoringException(@Nullable List> listeners, Exception failure, Logger logger) { + if (listeners != null) { + try { + ActionListener.onFailure(listeners, failure); + } catch (Exception ex) { + assert false : new AssertionError(ex); + logger.warn("Failed to notify listeners", ex); + } + } + } + + static void completeListenersIgnoringException(@Nullable List> listeners, T result, Logger logger) { + if (listeners != null) { + try { + ActionListener.onResponse(listeners, result); + } catch (Exception ex) { + assert false : new AssertionError(ex); + logger.warn("Failed to notify listeners", ex); + } + } + } + + static Level snapshotFailureLogLevel(Exception e) { + if (MasterService.isPublishFailureException(e)) { + // no action needed, the new master will take things from here + return Level.INFO; + } else if (e instanceof InvalidSnapshotNameException) { + // no action needed, typically ILM-related, or a user error + return Level.INFO; + } else if (e instanceof IndexNotFoundException) { + // not worrying, most likely a user error + return Level.INFO; + } else if (e instanceof SnapshotException) { + if (e.getMessage().contains(ReferenceDocs.UNASSIGNED_SHARDS.toString())) { + // non-partial snapshot requested but cluster health is not yellow or green; the health is tracked elsewhere so no need to + // make more noise here + return Level.INFO; + } + } else if (e instanceof IllegalArgumentException) { + // some other user error + return Level.INFO; + } + return Level.WARN; + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index e51df3521ee9b..76908952f3f3d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -640,7 +640,7 @@ public void testSnapshottingIndicesExcludesClones() { ); assertThat( - SnapshotsService.snapshottingIndices( + SnapshotsServiceUtils.snapshottingIndices( clusterState.projectState(), singleton(clusterState.metadata().getProject().index(indexName).getIndex()) ), diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index dc500220850e3..41ec63e4310ac 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -165,7 +165,7 @@ protected void disableRepoConsistencyCheck(String reason) { protected RepositoryData getRepositoryData(String repoName, IndexVersion version) { final RepositoryData repositoryData = getRepositoryData(repoName); - if (SnapshotsService.includesUUIDs(version) == false) { + if (SnapshotsServiceUtils.includesUUIDs(version) == false) { return repositoryData.withoutUUIDs(); } else { return repositoryData;