diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java index 4d37b2a1feb88..cb701987a2cdf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java @@ -11,6 +11,8 @@ import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -26,6 +28,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; @@ -98,7 +101,7 @@ public void testMigrationDirections() { assertThrows(IllegalArgumentException.class, () -> client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } - public void testNoShallowSnapshotInMixedMode() throws Exception { + public void testSnapshotStatusAPIForRemoteStoreEnabledCluster() throws Exception { logger.info("Initialize remote cluster"); addRemote = true; internalCluster().setBootstrapClusterManagerNodeIndex(0); @@ -110,8 +113,18 @@ public void testNoShallowSnapshotInMixedMode() throws Exception { internalCluster().validateClusterFormed(); logger.info("Create remote backed index"); - RemoteStoreMigrationShardAllocationBaseTestCase.createIndex("test", 0); - RemoteStoreMigrationShardAllocationBaseTestCase.assertRemoteStoreBackedIndex("test"); + String index1 = "index1"; + String index2 = "index2"; + String index3 = "index3"; + RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index1, 0); + RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index2, 0); + // RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index3, 0); + + logger.info("--> indexing some data"); + for (int i = 0; i < 10; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); logger.info("Create shallow snapshot setting enabled repo"); String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; @@ -131,19 +144,23 @@ public void testNoShallowSnapshotInMixedMode() throws Exception { SnapshotInfo snapshotInfo1 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot( shallowSnapshotRepoName, snapshot1, - "test" + index1, + index2 ); - assertEquals(snapshotInfo1.isRemoteStoreIndexShallowCopyEnabled(), true); - logger.info("Set MIXED compatibility mode"); - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + assertBusy(() -> { + SnapshotsStatusResponse snapshotsStatusResponse = client().admin() + .cluster() + .prepareSnapshotStatus(shallowSnapshotRepoName) + .setSnapshots(snapshot1) + .setIndices(index1, index2, index3) + .setIgnoreUnavailable(false) + .get(); + SnapshotStatus snapshotStatus1 = snapshotsStatusResponse.getSnapshots().get(0); + logger.info("*** current snapshot status - totalSize [{}]", snapshotStatus1.getStats().getTotalSize()); + assertNotEquals(0L, snapshotStatus1.getStats().getTotalSize()); + }, 1, TimeUnit.MINUTES); - logger.info("Verify that new snapshot is not shallow"); - final String snapshot2 = "snapshot2"; - SnapshotInfo snapshotInfo2 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot(shallowSnapshotRepoName, snapshot2); - assertEquals(snapshotInfo2.isRemoteStoreIndexShallowCopyEnabled(), false); } /* diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java index 8e2580aba1745..c063ce2c8b6c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java @@ -32,10 +32,12 @@ package org.opensearch.snapshots; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.opensearch.action.admin.cluster.stats.ClusterStatsIndices; import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; @@ -46,6 +48,9 @@ import java.nio.file.Path; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API; +import static org.opensearch.snapshots.SnapshotsService.SHALLOW_SNAPSHOT_V2; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -78,14 +83,17 @@ public void testStatusAPICallForShallowCopySnapshot() throws Exception { final String snapshotRepoName = "snapshot-repo-name"; createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); - final String remoteStoreEnabledIndexName = "remote-index-1"; + final String remoteStoreEnabledIndexName = "remote-index"; + final String remoteStoreEnabledIndexName1 = "remote-index-1"; final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + createIndex(remoteStoreEnabledIndexName1, remoteStoreEnabledIndexSettings); ensureGreen(); logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { index(remoteStoreEnabledIndexName, "_doc", Integer.toString(i), "foo", "bar" + i); + index(remoteStoreEnabledIndexName1, "_doc", Integer.toString(i), "foo", "bar" + i); } refresh(); @@ -93,16 +101,19 @@ public void testStatusAPICallForShallowCopySnapshot() throws Exception { createFullSnapshot(snapshotRepoName, snapshot); assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1); - final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot); - assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); - - // Validating that the incremental file count and incremental file size is zero for shallow copy - final SnapshotIndexShardStatus shallowSnapshotShardState = stateFirstShard(snapshotStatus, remoteStoreEnabledIndexName); - assertThat(shallowSnapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); - assertThat(shallowSnapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); - assertThat(shallowSnapshotShardState.getStats().getTotalSize(), greaterThan(0L)); - assertThat(shallowSnapshotShardState.getStats().getIncrementalFileCount(), is(0)); - assertThat(shallowSnapshotShardState.getStats().getIncrementalSize(), is(0L)); + assertBusy(() -> { + // although no. of shards in snapshot (3) is greater than the max value allowed in a status api call, the request does not fail + // TODO: currently this gives snapshot not found at repository issue --> discuss what to do here + SnapshotStatus snapshotsStatus = client().admin() + .cluster() + .prepareSnapshotStatus(snapshotRepoName) + .setSnapshots(snapshot) + .setIndices(remoteStoreEnabledIndexName1) + .get() + .getSnapshots() + .get(0); + logger.info("*** snapshotsStatus = {}", snapshotsStatus); + }); } public void testStatusAPIStatsForBackToBackShallowSnapshot() throws Exception { @@ -192,6 +203,77 @@ public void testStatusAPICallInProgressShallowSnapshot() throws Exception { createSnapshotResponseActionFuture.actionGet(); } + public void testStatusAPICallForShallowV2Snapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used for the test"); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(2); + + final String snapshotRepoName = "snapshot-repo-name"; + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + + final String index1 = "remote-index-1"; + final String index2 = "remote-index-2"; + final String index3 = "remote-index-3"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(index1, remoteStoreEnabledIndexSettings); + createIndex(index2, remoteStoreEnabledIndexSettings); + createIndex(index3, remoteStoreEnabledIndexSettings); + ensureGreen(); + + logger.info("Indexing some data"); + for (int i = 0; i < 50; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index2, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index3, "_doc", Integer.toString(i), "foo", "bar" + i); + } + logger.info("*** triggering refresh from "); + refresh(); + + logger.info("Set SNAPSHOT_V2 as true and MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value"); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2).put(SHALLOW_SNAPSHOT_V2.getKey(), true) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final String snapshot = "snapshot"; + SnapshotInfo snapshotInfo = createFullSnapshot(snapshotRepoName, snapshot); + assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot + logger.info( + "*** primarySize = {}", + client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size() + ); + ClusterStatsIndices clusterStatsIndices = client().admin().cluster().prepareClusterStats().get().getIndicesStats(); + logger.info("*** clusterStatsIndices = {}", clusterStatsIndices.toString()); + logger.info("*** clusterStatsIndices.getStore().sizeInBytes() = {}", clusterStatsIndices.getStore().sizeInBytes()); + + logger.info("Indexing some data"); + for (int i = 0; i < 50; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index2, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index3, "_doc", Integer.toString(i), "foo", "bar" + i); + } + logger.info("*** triggering refresh from "); + refresh(); + + final String snapshot1 = "snapshot1"; + SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, snapshot1); + assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot + logger.info( + "*** primarySize1 = {}", + client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size() + ); + ClusterStatsIndices clusterStatsIndices1 = client().admin().cluster().prepareClusterStats().get().getIndicesStats(); + logger.info("*** clusterStatsIndices1 = {}", clusterStatsIndices.toString()); + logger.info("*** clusterStatsIndices.getStore().sizeInBytes()1 = {}", clusterStatsIndices.getStore().sizeInBytes()); + + logger.info("Reset SNAPSHOT_V2 and MAX_SHARDS_ALLOWED_IN_STATUS_API to default values"); + updateSettingsRequest.persistentSettings( + Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()).putNull(SHALLOW_SNAPSHOT_V2.getKey()) + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { return snapshotStatus.getIndices().get(indexName).getShards().get(0); } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java index fb69209f7adda..e0295b89a596a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java @@ -33,11 +33,14 @@ package org.opensearch.snapshots; import org.opensearch.Version; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus; +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStats; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; @@ -49,6 +52,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.rest.RestStatus; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -59,9 +63,12 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -564,6 +571,193 @@ public void testGetSnapshotsRequest() throws Exception { waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60)); } + public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws Exception { + String repositoryName = "test-repo"; + String index1 = "test-idx-1"; + String index2 = "test-idx-2"; + String index3 = "test-idx-3"; + createRepository(repositoryName, "fs"); + + logger.info("Create indices"); + createIndex(index1, index2, index3); + ensureGreen(); + + logger.info("Indexing some data"); + for (int i = 0; i < 10; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index2, "_doc", Integer.toString(i), "foo", "baz" + i); + index(index3, "_doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + String snapshot1 = "test-snap-1"; + String snapshot2 = "test-snap-2"; + createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3)); + createSnapshot(repositoryName, snapshot2, List.of(index1, index2)); + + logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value"); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // across a single snapshot + assertBusy(() -> { + SnapshotException snapshotException = expectThrows( + SnapshotException.class, + () -> client().admin().cluster().prepareSnapshotStatus(repositoryName).setSnapshots(snapshot1).execute().actionGet() + ); + assertEquals(snapshotException.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE); + assertTrue( + snapshotException.getMessage() + .endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request") + ); + }, 1, TimeUnit.MINUTES); + + // across multiple snapshots + assertBusy(() -> { + SnapshotException snapshotException = expectThrows( + SnapshotException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshot1, snapshot2) + .execute() + .actionGet() + ); + assertEquals(snapshotException.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE); + assertTrue( + snapshotException.getMessage() + .endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request") + ); + }, 1, TimeUnit.MINUTES); + + logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value"); + updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey())); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + public void testSnapshotStatusForIndexFilter() throws Exception { + String repositoryName = "test-repo"; + String index1 = "test-idx-1"; + String index2 = "test-idx-2"; + String index3 = "test-idx-3"; + createRepository(repositoryName, "fs"); + + logger.info("Create indices"); + createIndex(index1, index2, index3); + ensureGreen(); + + logger.info("Indexing some data"); + for (int i = 0; i < 10; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index2, "_doc", Integer.toString(i), "foo", "baz" + i); + index(index3, "_doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + String snapshot = "test-snap-1"; + createSnapshot(repositoryName, snapshot, List.of(index1, index2, index3)); + + assertBusy(() -> { + SnapshotStatus snapshotsStatus = client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshot) + .setIndices(index1, index2) + .get() + .getSnapshots() + .get(0); + Map snapshotIndexStatusMap = snapshotsStatus.getIndices(); + // Although the snapshot contains 3 indices, the response of status api call only contains results for 2 + assertEquals(snapshotIndexStatusMap.size(), 2); + assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2)); + }, 1, TimeUnit.MINUTES); + } + + public void testSnapshotStatusFailuresWithIndexFilter() throws Exception { + String repositoryName = "test-repo"; + String index1 = "test-idx-1"; + String index2 = "test-idx-2"; + String index3 = "test-idx-3"; + createRepository(repositoryName, "fs"); + + logger.info("Create indices"); + createIndex(index1, index2, index3); + ensureGreen(); + + logger.info("Indexing some data"); + for (int i = 0; i < 10; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index2, "_doc", Integer.toString(i), "foo", "baz" + i); + index(index3, "_doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + String snapshot1 = "test-snap-1"; + String snapshot2 = "test-snap-2"; + createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3)); + createSnapshot(repositoryName, snapshot2, List.of(index1)); + + assertBusy(() -> { + // failure due to passing index filter for multiple snapshots + ActionRequestValidationException ex1 = expectThrows( + ActionRequestValidationException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshot1, snapshot2) + .setIndices(index1, index2, index3) + .execute() + .actionGet() + ); + String cause = "index list filter is supported only for a single snapshot"; + assertTrue(ex1.getMessage().contains(cause)); + + // failure due to index not found in snapshot + SnapshotException ex2 = expectThrows( + SnapshotException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshot2) + .setIndices(index1, index2, index3) + .execute() + .actionGet() + ); + assertEquals(ex2.status(), RestStatus.NOT_FOUND); + cause = String.format( + "[%s:%s] indices [%s] missing in snapshot [%s]", + repositoryName, + snapshot2, + String.join(", ", List.of(index2, index3)), + snapshot2 + ); + assertEquals(cause, ex2.getMessage()); + + // failure due to too many shards requested + logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value"); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + SnapshotException ex3 = expectThrows( + SnapshotException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshot1) + .setIndices(index1, index2, index3) + .execute() + .actionGet() + ); + logger.info("*** ex3 = {}", ex3.getMessage()); + assertEquals(ex3.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE); + assertTrue(ex3.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")); + + logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value"); + updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey())); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + }, 1, TimeUnit.MINUTES); + } + private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { return snapshotStatus.getIndices().get(indexName).getShards().get(0); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index e0f380b3ebbe6..e9de019085659 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -96,7 +96,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable { includeGlobalState = in.readOptionalBoolean(); final long startTime = in.readLong(); final long time = in.readLong(); - updateShardStats(startTime, time); + updateShardStats(startTime, time, 0L); } SnapshotStatus( @@ -105,7 +105,8 @@ public class SnapshotStatus implements ToXContentObject, Writeable { List shards, Boolean includeGlobalState, long startTime, - long time + long time, + long initialTotalSize ) { this.snapshot = Objects.requireNonNull(snapshot); this.state = Objects.requireNonNull(state); @@ -113,7 +114,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable { this.includeGlobalState = includeGlobalState; shardsStats = new SnapshotShardsStats(shards); assert time >= 0 : "time must be >= 0 but received [" + time + "]"; - updateShardStats(startTime, time); + updateShardStats(startTime, time, initialTotalSize); } private SnapshotStatus( @@ -299,8 +300,8 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept return PARSER.parse(parser, null); } - private void updateShardStats(long startTime, long time) { - stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0); + private void updateShardStats(long startTime, long time, long initialTotalSize) { + stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSize, 0); shardsStats = new SnapshotShardsStats(shards); for (SnapshotIndexShardStatus shard : shards) { // BWC: only update timestamps when we did not get a start time from an old node diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java index 061e73f1094b5..029a64f9b03b5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java @@ -54,6 +54,7 @@ public class SnapshotsStatusRequest extends ClusterManagerNodeRequesttrue to ignore unavailable snapshots, instead of throwing an exception. - * Defaults to false, which means unavailable snapshots cause an exception to be thrown. + * Returns the names of the indices. + * + * @return the names of indices + */ + public String[] indices() { + return this.indices; + } + + /** + * Sets the list of indices to be returned + * + * @return this request + */ + public SnapshotsStatusRequest indices(String[] indices) { + this.indices = indices; + return this; + } + + /** + * Set to true to ignore unavailable snapshots and indices, instead of throwing an exception. + * Defaults to false, which means unavailable snapshots and indices cause an exception to be thrown. * - * @param ignoreUnavailable whether to ignore unavailable snapshots + * @param ignoreUnavailable whether to ignore unavailable snapshots and indices * @return this request */ public SnapshotsStatusRequest ignoreUnavailable(boolean ignoreUnavailable) { @@ -158,9 +184,9 @@ public SnapshotsStatusRequest ignoreUnavailable(boolean ignoreUnavailable) { } /** - * Returns whether the request permits unavailable snapshots to be ignored. + * Returns whether the request permits unavailable snapshots and indices to be ignored. * - * @return true if the request will ignore unavailable snapshots, false if it will throw an exception on unavailable snapshots + * @return true if the request will ignore unavailable snapshots and indices, false if it will throw an exception on unavailable snapshots and indices */ public boolean ignoreUnavailable() { return ignoreUnavailable; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java index 9377eca60e353..6f0ac278d01c4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java @@ -96,10 +96,32 @@ public SnapshotsStatusRequestBuilder addSnapshots(String... snapshots) { } /** - * Set to true to ignore unavailable snapshots, instead of throwing an exception. - * Defaults to false, which means unavailable snapshots cause an exception to be thrown. + * Sets list of indices to return * - * @param ignoreUnavailable whether to ignore unavailable snapshots. + * @param indices list of indices + * @return this builder + */ + public SnapshotsStatusRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + /** + * Adds additional indices to the list of indices to return + * + * @param indices additional indices + * @return this builder + */ + public SnapshotsStatusRequestBuilder addIndices(String... indices) { + request.indices(ArrayUtils.concat(request.indices(), indices)); + return this; + } + + /** + * Set to true to ignore unavailable snapshots and indices, instead of throwing an exception. + * Defaults to false, which means unavailable snapshots and indices cause an exception to be thrown. + * + * @param ignoreUnavailable whether to ignore unavailable snapshots and indices. * @return this builder */ public SnapshotsStatusRequestBuilder setIgnoreUnavailable(boolean ignoreUnavailable) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 8cd9198686154..cc31d5d0b2178 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -52,12 +52,14 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryData; import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotException; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotMissingException; @@ -81,6 +83,7 @@ import java.util.stream.Collectors; import static java.util.Collections.unmodifiableMap; +import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API; /** * Transport action for accessing snapshot status @@ -95,6 +98,8 @@ public class TransportSnapshotsStatusAction extends TransportClusterManagerNodeA private final TransportNodesSnapshotsStatus transportNodesSnapshotsStatus; + private long maximumAllowedShardCount; + @Inject public TransportSnapshotsStatusAction( TransportService transportService, @@ -285,6 +290,7 @@ private void buildResponse( } shardStatusBuilder.add(shardStatus); } + // TODO: this extra parameter might not be required builder.add( new SnapshotStatus( entry.snapshot(), @@ -292,7 +298,8 @@ private void buildResponse( Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState(), entry.startTime(), - Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L) + Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L), + 0L ) ); } @@ -314,38 +321,38 @@ private void loadRepositoryData( String repositoryName, ActionListener listener ) { - final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); + maximumAllowedShardCount = clusterService.getClusterSettings().get(MAX_SHARDS_ALLOWED_IN_STATUS_API); final StepListener repositoryDataListener = new StepListener<>(); repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); repositoryDataListener.whenComplete(repositoryData -> { - final Map matchedSnapshotIds = repositoryData.getSnapshotIds() - .stream() - .filter(s -> requestedSnapshotNames.contains(s.getName())) - .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); - for (final String snapshotName : request.snapshots()) { - if (currentSnapshotNames.contains(snapshotName)) { - // we've already found this snapshot in the current snapshot entries, so skip over - continue; - } - SnapshotId snapshotId = matchedSnapshotIds.get(snapshotName); - if (snapshotId == null) { - // neither in the current snapshot entries nor found in the repository - if (request.ignoreUnavailable()) { - // ignoring unavailable snapshots, so skip over - logger.debug( - "snapshot status request ignoring snapshot [{}], not found in repository [{}]", - snapshotName, - repositoryName - ); - continue; - } else { - throw new SnapshotMissingException(repositoryName, snapshotName); - } - } - SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId); + Map snapshotsInfoMap = snapshotsInfo( + request, + repositoryName, + repositoryData, + snapshotsInProgress, + currentSnapshotNames + ); + for (Map.Entry entry : snapshotsInfoMap.entrySet()) { + SnapshotId snapshotId = entry.getKey(); + SnapshotInfo snapshotInfo = entry.getValue(); List shardStatusBuilder = new ArrayList<>(); if (snapshotInfo.state().completed()) { - Map shardStatuses = snapshotShards(repositoryName, repositoryData, snapshotInfo); + Map shardStatuses = snapshotShards( + request, + repositoryName, + repositoryData, + snapshotInfo + ); + boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; + long initialSnapshotTotalSize = 0; + if (isShallowV2Snapshot) { + if (request.indices().length == 0) { + initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes(); + } else { + // TODO: add info from index level map? + } + } + for (Map.Entry shardStatus : shardStatuses.entrySet()) { IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy(); shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus)); @@ -376,7 +383,8 @@ private void loadRepositoryData( snapshotInfo.includeGlobalState(), startTime, // Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0 - (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime + (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime, + initialSnapshotTotalSize ) ); } @@ -406,6 +414,72 @@ private SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, String re return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId); } + /** + * Returns snapshot info for finished snapshots + * @param request snapshot status request + * @param repositoryName repository name + * @param repositoryData repository data + * @param snapshotsInProgress currently running snapshots + * @param currentSnapshotNames list of names of currently running snapshots + * @return map of snapshot id to snapshot info + */ + private Map snapshotsInfo( + SnapshotsStatusRequest request, + String repositoryName, + RepositoryData repositoryData, + SnapshotsInProgress snapshotsInProgress, + Set currentSnapshotNames + ) { + final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); + final Map snapshotsInfoMap = new HashMap<>(); + final Map matchedSnapshotIds = repositoryData.getSnapshotIds() + .stream() + .filter(s -> requestedSnapshotNames.contains(s.getName())) + .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); + int totalShardsAcrossSnapshots = 0; + for (final String snapshotName : request.snapshots()) { + if (currentSnapshotNames.contains(snapshotName)) { + // we've already found this snapshot in the current snapshot entries, so skip over + continue; + } + SnapshotId snapshotId = matchedSnapshotIds.get(snapshotName); + if (snapshotId == null) { + // neither in the current snapshot entries nor found in the repository + if (request.ignoreUnavailable()) { + // ignoring unavailable snapshots, so skip over + logger.debug( + "snapshot status request ignoring snapshot [{}], not found in repository [{}]", + snapshotName, + repositoryName + ); + continue; + } else { + throw new SnapshotMissingException(repositoryName, snapshotName); + } + } + SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId); + boolean isV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; + if (isV2Snapshot == false && request.indices().length == 0) { + totalShardsAcrossSnapshots += snapshotInfo.totalShards(); + } + snapshotsInfoMap.put(snapshotId, snapshotInfo); + } + if (totalShardsAcrossSnapshots > maximumAllowedShardCount && request.indices().length == 0) { + String cause = "Total shard count [" + + totalShardsAcrossSnapshots + + "] is more than the maximum allowed value of shard count [" + + maximumAllowedShardCount + + "] for snapshot status request"; + throw new SnapshotException(repositoryName, String.join(", ", requestedSnapshotNames), cause) { + @Override + public RestStatus status() { + return RestStatus.REQUEST_ENTITY_TOO_LARGE; + } + }; + } + return unmodifiableMap(snapshotsInfoMap); + } + /** * Returns status of shards currently finished snapshots *

@@ -413,21 +487,69 @@ private SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, String re * {@link SnapshotShardsService#currentSnapshotShards(Snapshot)} because it * returns similar information but for already finished snapshots. *

- * + * @param request snapshot status request * @param repositoryName repository name * @param snapshotInfo snapshot info * @return map of shard id to snapshot status */ private Map snapshotShards( + final SnapshotsStatusRequest request, final String repositoryName, final RepositoryData repositoryData, final SnapshotInfo snapshotInfo ) throws IOException { + final Set requestedIndexNames = Sets.newHashSet(request.indices()); + String snapshotName = snapshotInfo.snapshotId().getName(); + Set indices = Sets.newHashSet(snapshotInfo.indices()); + if (requestedIndexNames.isEmpty() == false) { + Set finalIndices = indices; + List indicesNotFound = requestedIndexNames.stream() + .filter(i -> finalIndices.contains(i) == false) + .collect(Collectors.toList()); + if (indicesNotFound.isEmpty() == false) { + handleIndexNotFound(String.join(", ", indicesNotFound), request, snapshotName, repositoryName); + } + indices = requestedIndexNames; + } + final Repository repository = repositoriesService.repository(repositoryName); - final Map shardStatus = new HashMap<>(); - for (String index : snapshotInfo.indices()) { + boolean isV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; + int totalShardsAcrossIndices = 0; + final Map indexMetadataMap = new HashMap<>(); + + for (String index : indices) { IndexId indexId = repositoryData.resolveIndexId(index); IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); + if (indexMetadata != null) { + if (requestedIndexNames.isEmpty() == false && isV2Snapshot == false) { + totalShardsAcrossIndices += indexMetadata.getNumberOfShards(); + } + indexMetadataMap.put(indexId, indexMetadata); + } else if (requestedIndexNames.isEmpty() == false) { + handleIndexNotFound(index, request, snapshotName, repositoryName); + } + } + + if (totalShardsAcrossIndices > maximumAllowedShardCount && requestedIndexNames.isEmpty() == false && isV2Snapshot == false) { + String cause = "Total shard count [" + + totalShardsAcrossIndices + + "] across the requested indices [" + + requestedIndexNames.stream().collect(Collectors.joining(", ")) + + "] is more than the maximum allowed value of shard count [" + + maximumAllowedShardCount + + "] for snapshot status request"; + throw new SnapshotException(repositoryName, snapshotName, cause) { + @Override + public RestStatus status() { + return RestStatus.REQUEST_ENTITY_TOO_LARGE; + } + }; + } + + final Map shardStatus = new HashMap<>(); + for (Map.Entry entry : indexMetadataMap.entrySet()) { + IndexId indexId = entry.getKey(); + IndexMetadata indexMetadata = entry.getValue(); if (indexMetadata != null) { int numberOfShards = indexMetadata.getNumberOfShards(); for (int i = 0; i < numberOfShards; i++) { @@ -457,6 +579,26 @@ private Map snapshotShards( return unmodifiableMap(shardStatus); } + private void handleIndexNotFound(String index, SnapshotsStatusRequest request, String snapshotName, String repositoryName) { + if (request.ignoreUnavailable()) { + // ignoring unavailable index + logger.debug( + "snapshot status request ignoring indices [{}], not found in snapshot[{}] in repository [{}]", + index, + snapshotName, + repositoryName + ); + } else { + String cause = "indices [" + index + "] missing in snapshot [" + snapshotName + "]"; + throw new SnapshotException(repositoryName, snapshotName, cause) { + @Override + public RestStatus status() { + return RestStatus.NOT_FOUND; + } + }; + } + } + private static SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { for (SnapshotShardFailure shardFailure : shardFailures) { if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) { diff --git a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java index 7216c447acc3e..80b5fd71c579e 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java @@ -72,8 +72,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable { private long avgTotalBytes; private long avgFreeByte; + private final long primaryStoreSize; + protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L); } /** @@ -84,6 +86,7 @@ protected ClusterInfo() { * @param shardSizes a shardkey to size in bytes mapping per shard. * @param routingToDataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path + * @param primaryStoreSize total size in bytes for all the primary shards * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -92,7 +95,8 @@ public ClusterInfo( final Map shardSizes, final Map routingToDataPath, final Map reservedSpace, - final Map nodeFileCacheStats + final Map nodeFileCacheStats, + final long primaryStoreSize ) { this.leastAvailableSpaceUsage = leastAvailableSpaceUsage; this.shardSizes = shardSizes; @@ -100,6 +104,7 @@ public ClusterInfo( this.routingToDataPath = routingToDataPath; this.reservedSpace = reservedSpace; this.nodeFileCacheStats = nodeFileCacheStats; + this.primaryStoreSize = primaryStoreSize; calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage); } @@ -110,7 +115,7 @@ public ClusterInfo(StreamInput in) throws IOException { Map routingMap = in.readMap(ShardRouting::new, StreamInput::readString); Map reservedSpaceMap; reservedSpaceMap = in.readMap(NodeAndPath::new, ReservedSpace::new); - + this.primaryStoreSize = in.readLong(); this.leastAvailableSpaceUsage = Collections.unmodifiableMap(leastMap); this.mostAvailableSpaceUsage = Collections.unmodifiableMap(mostMap); this.shardSizes = Collections.unmodifiableMap(sizeMap); @@ -156,6 +161,10 @@ public long getAvgTotalBytes() { return avgTotalBytes; } + public long getPrimaryStoreSize() { + return primaryStoreSize; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.leastAvailableSpaceUsage, StreamOutput::writeString, (o, v) -> v.writeTo(o)); diff --git a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java index e381b8f244bf3..28f0d4cef53dc 100644 --- a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java @@ -40,6 +40,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.admin.indices.stats.ShardStats; @@ -115,6 +116,9 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map nodeFileCacheStats; private volatile IndicesStatsSummary indicesStatsSummary; // null if this node is not currently the cluster-manager + + private volatile long primaryStoreSize; + private final AtomicReference refreshAndRescheduleRunnable = new AtomicReference<>(); private volatile boolean enabled; private volatile TimeValue fetchTimeout; @@ -127,6 +131,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi this.mostAvailableSpaceUsages = Map.of(); this.nodeFileCacheStats = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; + this.primaryStoreSize = 0L; this.threadPool = threadPool; this.client = client; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); @@ -213,7 +218,8 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace, - nodeFileCacheStats + nodeFileCacheStats, + primaryStoreSize ); } @@ -301,15 +307,29 @@ public void onFailure(Exception e) { final CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<>() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { + long currentPrimaryStoreSize = indicesStatsResponse.getPrimaries().getStore().sizeInBytes(); + Map indexStatsMap = indicesStatsResponse.getIndices(); + logger.info("### currentPrimaryStoreSize = {}", currentPrimaryStoreSize); + logger.info("### indexStatsMap = "); + for (Map.Entry entry : indexStatsMap.entrySet()) { + logger.info(" ### {} : {}", entry.getKey(), entry.getValue().getPrimaries().getStore().sizeInBytes()); + } final ShardStats[] stats = indicesStatsResponse.getShards(); final Map shardSizeByIdentifierBuilder = new HashMap<>(); + final Map indexSizeByIdentifierBuilder = new HashMap<>(); final Map dataPathByShardRoutingBuilder = new HashMap<>(); final Map reservedSpaceBuilders = new HashMap<>(); - buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders); - + currentPrimaryStoreSize = buildShardLevelInfo( + logger, + stats, + shardSizeByIdentifierBuilder, + dataPathByShardRoutingBuilder, + reservedSpaceBuilders + ); + primaryStoreSize = currentPrimaryStoreSize; final Map rsrvdSpace = new HashMap<>(); reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build())); - + logger.info("### after buildShardLevelInfo - currentPrimaryStoreSize = {}", currentPrimaryStoreSize); indicesStatsSummary = new IndicesStatsSummary(shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, rsrvdSpace); } @@ -366,13 +386,14 @@ public void addListener(Consumer clusterInfoConsumer) { listeners.add(clusterInfoConsumer); } - static void buildShardLevelInfo( + static long buildShardLevelInfo( Logger logger, ShardStats[] stats, final Map shardSizes, final Map newShardRoutingToDataPath, final Map reservedSpaceByShard ) { + long currentPrimaryStoreSize = 0L; for (ShardStats s : stats) { final ShardRouting shardRouting = s.getShardRouting(); newShardRoutingToDataPath.put(shardRouting, s.getDataPath()); @@ -381,7 +402,11 @@ static void buildShardLevelInfo( if (storeStats == null) { continue; } + // TODO: put entries for index name, size final long size = storeStats.sizeInBytes(); + if (shardRouting.primary()) { + currentPrimaryStoreSize += size; + } final long reserved = storeStats.getReservedSize().getBytes(); final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); @@ -396,6 +421,7 @@ static void buildShardLevelInfo( reservedSpaceBuilder.add(shardRouting.shardId(), reserved); } } + return currentPrimaryStoreSize; } static void fillDiskUsagePerNode( diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fe6cff25243e2..5468e8d9f08d8 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -632,6 +632,7 @@ public void apply(Settings value, Settings current, Settings previous) { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, + SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API, SnapshotsService.SHALLOW_SNAPSHOT_V2, FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1a9b233b387b2..72f234351aef1 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1186,6 +1186,7 @@ protected Node( clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, + clusterInfoService, actionModule.getActionFilters(), remoteStorePinnedTimestampService ); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java index 2b4b188ab0acd..502be16f2fa8e 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java @@ -61,6 +61,7 @@ public class RestSnapshotsStatusAction extends BaseRestHandler { public List routes() { return unmodifiableList( asList( + new Route(GET, "/_snapshot/{repository}/{snapshot}/{index}/_status"), new Route(GET, "/_snapshot/{repository}/{snapshot}/_status"), new Route(GET, "/_snapshot/{repository}/_status"), new Route(GET, "/_snapshot/_status") @@ -80,7 +81,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC if (snapshots.length == 1 && "_all".equalsIgnoreCase(snapshots[0])) { snapshots = Strings.EMPTY_ARRAY; } - SnapshotsStatusRequest snapshotsStatusRequest = snapshotsStatusRequest(repository).snapshots(snapshots); + String[] indices = request.paramAsStringArray("index", Strings.EMPTY_ARRAY); + SnapshotsStatusRequest snapshotsStatusRequest = snapshotsStatusRequest(repository).snapshots(snapshots).indices(indices); snapshotsStatusRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", snapshotsStatusRequest.ignoreUnavailable())); snapshotsStatusRequest.clusterManagerNodeTimeout( diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 3325396a8b8c3..b40a59d007dcc 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -100,6 +100,8 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy"; private static final String PINNED_TIMESTAMP = "pinned_timestamp"; + + private static final String SNAPSHOT_SIZE_IN_BYTES = "snapshot_size_in_bytes"; private static final String USER_METADATA = "metadata"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime) @@ -126,6 +128,9 @@ public static final class SnapshotInfoBuilder { private Boolean remoteStoreIndexShallowCopy = null; private long pinnedTimestamp = 0L; + + private long snapshotSizeInBytes = 0L; + private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -186,6 +191,10 @@ private void setPinnedTimestamp(long pinnedTimestamp) { this.pinnedTimestamp = pinnedTimestamp; } + private void setSnapshotSizeInBytes(long snapshotSizeInBytes) { + this.snapshotSizeInBytes = snapshotSizeInBytes; + } + private void setShardFailures(List shardFailures) { this.shardFailures = shardFailures; } @@ -226,7 +235,8 @@ public SnapshotInfo build() { includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } } @@ -282,6 +292,7 @@ int getSuccessfulShards() { new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY) ); SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setPinnedTimestamp, new ParseField(PINNED_TIMESTAMP)); + SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setSnapshotSizeInBytes, new ParseField(SNAPSHOT_SIZE_IN_BYTES)); SNAPSHOT_INFO_PARSER.declareObjectArray( SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, @@ -319,6 +330,9 @@ int getSuccessfulShards() { private Boolean remoteStoreIndexShallowCopy; private long pinnedTimestamp; + + private long snapshotSizeInBytes; + @Nullable private final Map userMetadata; @@ -328,11 +342,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0, 0); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0, 0); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -351,6 +365,7 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) { entry.includeGlobalState(), entry.userMetadata(), entry.remoteStoreIndexShallowCopy(), + 0L, 0L ); } @@ -367,7 +382,8 @@ public SnapshotInfo( Boolean includeGlobalState, Map userMetadata, Boolean remoteStoreIndexShallowCopy, - long pinnedTimestamp + long pinnedTimestamp, + long snapshotSizeInBytes ) { this( snapshotId, @@ -384,7 +400,8 @@ public SnapshotInfo( includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } @@ -403,7 +420,8 @@ public SnapshotInfo( Boolean includeGlobalState, Map userMetadata, Boolean remoteStoreIndexShallowCopy, - long pinnedTimestamp + long pinnedTimestamp, + long snapshotSizeInBytes ) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); @@ -420,6 +438,7 @@ public SnapshotInfo( this.userMetadata = userMetadata; this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; this.pinnedTimestamp = pinnedTimestamp; + this.snapshotSizeInBytes = snapshotSizeInBytes; } /** @@ -444,6 +463,7 @@ public SnapshotInfo(final StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_2_17_0)) { pinnedTimestamp = in.readVLong(); + snapshotSizeInBytes = in.readVLong(); } } @@ -563,6 +583,10 @@ public long getPinnedTimestamp() { return pinnedTimestamp; } + public long getSnapshotSizeInBytes() { + return snapshotSizeInBytes; + } + /** * Returns shard failures; an empty list will be returned if there were no shard * failures, or if {@link #state()} returns {@code null}. @@ -632,6 +656,8 @@ public String toString() { + remoteStoreIndexShallowCopy + ", pinnedTimestamp=" + pinnedTimestamp + + ", snapshotSizeInBytes=" + + snapshotSizeInBytes + '}'; } @@ -668,6 +694,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); } builder.field(PINNED_TIMESTAMP, pinnedTimestamp); + builder.field(SNAPSHOT_SIZE_IN_BYTES, snapshotSizeInBytes); builder.startArray(INDICES); for (String index : indices) { @@ -730,6 +757,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final if (pinnedTimestamp != 0) { builder.field(PINNED_TIMESTAMP, pinnedTimestamp); } + if (snapshotSizeInBytes != 0) { + builder.field(SNAPSHOT_SIZE_IN_BYTES, snapshotSizeInBytes); + } builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -779,6 +809,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr int totalShards = 0; int successfulShards = 0; long pinnedTimestamp = 0; + long snapshotSizeInBytes = 0; Boolean includeGlobalState = null; Boolean remoteStoreIndexShallowCopy = null; Map userMetadata = null; @@ -822,6 +853,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr remoteStoreIndexShallowCopy = parser.booleanValue(); } else if (PINNED_TIMESTAMP.equals(currentFieldName)) { pinnedTimestamp = parser.longValue(); + } else if (SNAPSHOT_SIZE_IN_BYTES.equals(currentFieldName)) { + snapshotSizeInBytes = parser.longValue(); } } else if (token == XContentParser.Token.START_ARRAY) { if (DATA_STREAMS.equals(currentFieldName)) { @@ -875,7 +908,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } @@ -909,6 +943,7 @@ public void writeTo(final StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_2_17_0)) { out.writeVLong(pinnedTimestamp); + out.writeVLong(snapshotSizeInBytes); } } @@ -943,7 +978,8 @@ public boolean equals(Object o) { && Objects.equals(shardFailures, that.shardFailures) && Objects.equals(userMetadata, that.userMetadata) && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy) - && Objects.equals(pinnedTimestamp, that.pinnedTimestamp); + && Objects.equals(pinnedTimestamp, that.pinnedTimestamp) + && Objects.equals(snapshotSizeInBytes, that.snapshotSizeInBytes); } @Override @@ -964,7 +1000,8 @@ public int hashCode() { shardFailures, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index ea6cd93e5a53b..c4611e8bc6a23 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -46,6 +46,7 @@ import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -158,6 +159,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final RepositoriesService repositoriesService; + private final ClusterInfoService clusterInfoService; + private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; private final ThreadPool threadPool; @@ -203,6 +206,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus Setting.Property.Dynamic ); + // TODO: discuss values and name to be used here + public static final Setting MAX_SHARDS_ALLOWED_IN_STATUS_API = Setting.intSetting( + "snapshot.max_shards_allowed_in_status_api", + 1000, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public static final Setting SHALLOW_SNAPSHOT_V2 = Setting.boolSetting( "snapshot.shallow_snapshot_v2", false, @@ -211,6 +223,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus ); private volatile int maxConcurrentOperations; + // private volatile int dummyMaxShardsAllowedInStatusApi; private volatile boolean isShallowSnapV2; @@ -220,6 +233,7 @@ public SnapshotsService( IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, TransportService transportService, + ClusterInfoService clusterInfoService, ActionFilters actionFilters, @Nullable RemoteStorePinnedTimestampService remoteStorePinnedTimestampService ) { @@ -229,6 +243,7 @@ public SnapshotsService( this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); this.threadPool = transportService.getThreadPool(); this.transportService = transportService; + this.clusterInfoService = clusterInfoService; this.remoteStorePinnedTimestampService = remoteStorePinnedTimestampService; // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. @@ -447,6 +462,8 @@ public TimeValue timeout() { } public void createShallowSnapshotV2(final CreateSnapshotRequest request, final ActionListener listener) { + logger.info("*** storeSize from clusterInfoService = {}", clusterInfoService.getClusterInfo().getPrimaryStoreSize()); + long snapshotSizeInBytes = clusterInfoService.getClusterInfo().getPrimaryStoreSize(); long pinnedTimestamp = System.currentTimeMillis(); final String repositoryName = request.repository(); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); @@ -530,7 +547,8 @@ public void createShallowSnapshotV2(final CreateSnapshotRequest request, final A request.includeGlobalState(), userMeta, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { throw new SnapshotException(repositoryName, snapshotName, "Aborting Snapshot, no longer cluster manager"); @@ -1710,6 +1728,7 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met entry.includeGlobalState(), entry.userMetadata(), entry.remoteStoreIndexShallowCopy(), + 0, 0 ); final StepListener metadataListener = new StepListener<>(); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 2feb0d3ba9405..d267cbc2e162a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -96,6 +96,7 @@ protected CreateSnapshotResponse createTestInstance() { globalState, SnapshotInfoTests.randomUserMetadata(), false, + 0, 0 ) ); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index 58af390d194d3..1868e9f37d610 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -78,6 +78,7 @@ protected GetSnapshotsResponse createTestInstance() { randomBoolean(), SnapshotInfoTests.randomUserMetadata(), false, + 0, 0 ) ); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java index 3918e5d9b235c..372dfb5c49528 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java @@ -61,7 +61,7 @@ public void testToString() throws Exception { List snapshotIndexShardStatuses = new ArrayList<>(); snapshotIndexShardStatuses.add(snapshotIndexShardStatus); boolean includeGlobalState = randomBoolean(); - SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L); + SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L, 0L); int initializingShards = 0; int startedShards = 0; @@ -213,7 +213,7 @@ protected SnapshotStatus createTestInstance() { snapshotIndexShardStatuses.add(snapshotIndexShardStatus); } boolean includeGlobalState = randomBoolean(); - return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L); + return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L, 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java index 4ec7db2f3d552..f652c65a20a8d 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java @@ -51,7 +51,8 @@ public void testSerialization() throws Exception { randomShardSizes(), randomRoutingToDataPath(), randomReservedSpace(), - randomFileCacheStats() + randomFileCacheStats(), + randomLong() ); BytesStreamOutput output = new BytesStreamOutput(); clusterInfo.writeTo(output); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 6ab57d10b05c1..298a04eccc064 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -902,7 +902,7 @@ private static ClusterInfo clusterInfo( final Map diskUsages, final Map reservedSpace ) { - return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of()); + return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of(), 0L); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java index 7f2f048485318..0d9f6e63e31d7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java @@ -176,7 +176,7 @@ public DevNullClusterInfo( final Map shardSizes, final Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of(), 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index 6a03a1f79bcde..e359ebeee1bdc 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -275,9 +275,10 @@ public static class DevNullClusterInfo extends ClusterInfo { public DevNullClusterInfo( final Map leastAvailableSpaceUsage, final Map mostAvailableSpaceUsage, - final Map shardSizes + final Map shardSizes, + final long primaryStoreSize ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of(), primaryStoreSize); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 2e24640fe858d..27568e8b84db1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1649,7 +1649,7 @@ static class DevNullClusterInfo extends ClusterInfo { Map reservedSpace, final Map nodeFileCacheStats ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, nodeFileCacheStats); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, nodeFileCacheStats, 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 622d3a2c920b5..78b9e910840b2 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -127,7 +127,15 @@ public void testCanAllocateUsesMaxAvailableSpace() { final Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); + final ClusterInfo clusterInfo = new ClusterInfo( + leastAvailableUsages, + mostAvailableUsage, + shardSizes, + Map.of(), + Map.of(), + Map.of(), + 0L + ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), @@ -203,7 +211,7 @@ public void testCannotAllocateDueToLackOfDiskResources() { // way bigger than available space final long shardSize = randomIntBetween(110, 1000); shardSizes.put("[test][0][p]", shardSize); - ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of(), 0L); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), @@ -326,7 +334,8 @@ public void testCanRemainUsesLeastAvailableSpace() { shardSizes, shardRoutingMap, Map.of(), - Map.of() + Map.of(), + 0L ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), diff --git a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java index 6b6f74353812b..4f3ca999583f5 100644 --- a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java +++ b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java @@ -171,7 +171,7 @@ public void testGetTotalIndexSize() { Map diskUsages = diskUsages(1, 100, 50); final Map shardSizes = new HashMap<>(); shardSizes.put("[test_index][0][p]", 10L); // 10 bytes - ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of(), 10L); assertEquals(10, getIndexPrimaryStoreSize(clusterState, clusterInfo, indexName)); } @@ -185,7 +185,7 @@ public void testValidateEligibleNodesCapacityWithAllAccepted() { Map diskUsages = diskUsages(1, 100, 50); final Map shardSizes = new HashMap<>(); shardSizes.put("[test_index][0][p]", 10L); // 10 bytes - ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of(), 0L); TieringValidationResult tieringValidationResult = new TieringValidationResult(indices); validateEligibleNodesCapacity(clusterInfo, clusterState, tieringValidationResult); assertEquals(indices, tieringValidationResult.getAcceptedIndices()); @@ -202,7 +202,7 @@ public void testValidateEligibleNodesCapacityWithAllRejected() { Map diskUsages = diskUsages(1, 100, 10); final Map shardSizes = new HashMap<>(); shardSizes.put("[test_index][0][p]", 20L); // 20 bytes - ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of(), 0L); TieringValidationResult tieringValidationResult = new TieringValidationResult(indices); validateEligibleNodesCapacity(clusterInfo, clusterState, tieringValidationResult); assertEquals(indices.size(), tieringValidationResult.getRejectedIndices().size()); @@ -305,7 +305,7 @@ private static DiskThresholdSettings diskThresholdSettings(String low, String hi private static ClusterInfo clusterInfo(int noOfNodes, long totalBytes, long freeBytes) { final Map diskUsages = diskUsages(noOfNodes, totalBytes, freeBytes); - return new ClusterInfo(diskUsages, null, null, null, Map.of(), Map.of()); + return new ClusterInfo(diskUsages, null, null, null, Map.of(), Map.of(), 0L); } private static Map diskUsages(int noOfSearchNodes, long totalBytes, long freeBytes) { diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index ef2c64f89d3a0..f99d5c0c1e915 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -214,6 +214,7 @@ public void testSnapshotWithConflictingName() throws Exception { true, Collections.emptyMap(), false, + 0, 0 ), Version.CURRENT, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java index 684a8dd36fccc..5406f85b11614 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java @@ -87,6 +87,7 @@ protected SnapshotInfo createTestInstance() { includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, + 0, 0 ); } @@ -116,6 +117,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 1: @@ -135,6 +137,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 2: @@ -150,6 +153,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 3: @@ -165,6 +169,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 4: @@ -180,6 +185,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 5: @@ -207,6 +213,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 6: @@ -222,6 +229,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { Boolean.FALSE.equals(instance.includeGlobalState()), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 7: @@ -237,6 +245,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata), instance.isRemoteStoreIndexShallowCopyEnabled(), + 0, 0 ); case 8: @@ -256,7 +265,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 123456 + 123456, + 0 ); case 9: return new SnapshotInfo( @@ -271,6 +281,7 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()), + 123456, 123456 ); default: diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 769dfeb37ff8d..ed2424cab23e1 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -455,7 +455,7 @@ public void testSearchableSnapshotOverSubscription() { for (TestClusterNodes.TestClusterNode node : testClusterNodes.nodes.values()) { nodeFileCacheStats.put(node.node.getId(), new FileCacheStats(0, 1, 0, 0, 0, 0, 0)); } - ClusterInfo clusterInfo = new ClusterInfo(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), nodeFileCacheStats); + ClusterInfo clusterInfo = new ClusterInfo(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), nodeFileCacheStats, 0L); testClusterNodes.nodes.values().forEach(node -> when(node.getMockClusterInfoService().getClusterInfo()).thenReturn(clusterInfo)); final StepListener createSnapshotResponseListener = new StepListener<>(); @@ -2009,20 +2009,21 @@ public void onFailure(final Exception e) { ); remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, threadPool); final ActionFilters actionFilters = new ActionFilters(emptySet()); + nodeEnv = new NodeEnvironment(settings, environment); + final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); + final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); + client = new NodeClient(settings, threadPool); + clusterInfoService = Mockito.mock(ClusterInfoService.class); snapshotsService = new SnapshotsService( settings, clusterService, indexNameExpressionResolver, repositoriesService, transportService, + clusterInfoService, actionFilters, null ); - nodeEnv = new NodeEnvironment(settings, environment); - final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); - final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); - client = new NodeClient(settings, threadPool); - clusterInfoService = Mockito.mock(ClusterInfoService.class); final SetOnce rerouteServiceSetOnce = new SetOnce<>(); final SnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService( settings, diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index c3577885e9cb1..2e1f1d45c9ec9 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -234,6 +234,7 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { true, Collections.emptyMap(), false, + 0, 0 ), Version.CURRENT, @@ -261,6 +262,7 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { true, Collections.emptyMap(), false, + 0, 0 ), Version.CURRENT, @@ -290,6 +292,7 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { true, Collections.emptyMap(), false, + 0, 0 ), Version.CURRENT, diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 35ca5d80aeb4e..40f3f864efd60 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -138,7 +138,8 @@ class SizeFakingClusterInfo extends ClusterInfo { delegate.shardSizes, delegate.routingToDataPath, delegate.reservedSpace, - delegate.nodeFileCacheStats + delegate.nodeFileCacheStats, + delegate.getPrimaryStoreSize() ); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index b76de4e0b00cd..3eb81c2f9d4b0 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -613,6 +613,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget(