Skip to content

Commit

Permalink
Use primary store size
Browse files Browse the repository at this point in the history
  • Loading branch information
ltaragi committed Aug 20, 2024
1 parent 40e23fb commit bcbc7f1
Show file tree
Hide file tree
Showing 17 changed files with 165 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.action.admin.cluster.stats.ClusterStatsIndices;
import org.opensearch.cluster.SnapshotsInProgress;
Expand All @@ -47,15 +46,12 @@
import org.junit.Before;

import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.snapshots.SnapshotsService.DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.snapshots.SnapshotsService.SNAPSHOT_V2;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -107,11 +103,14 @@ public void testStatusAPICallForShallowCopySnapshot() throws Exception {
assertBusy(() -> {
// although no. of shards in snapshot (3) is greater than the max value allowed in a status api call, the request does not fail
// TODO: currently this gives snapshot not found at repository issue --> discuss what to do here
SnapshotStatus snapshotsStatus = client().admin().cluster().prepareSnapshotStatus(snapshotRepoName)
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.setIndices(remoteStoreEnabledIndexName1)
.get()
.getSnapshots().get(0);
.getSnapshots()
.get(0);
logger.info("*** snapshotsStatus = {}", snapshotsStatus);
});
}
Expand Down Expand Up @@ -202,7 +201,7 @@ public void testStatusAPICallInProgressShallowSnapshot() throws Exception {
logger.info("--> wait for snapshot to finish");
createSnapshotResponseActionFuture.actionGet();
}
//./gradlew :server:internalClusterTest --tests "org.opensearch.snapshots.RemoteIndexSnapshotStatusApiIT.testStatusAPICallForShallowV2Snapshot"

public void testStatusAPICallForShallowV2Snapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
Expand All @@ -229,23 +228,24 @@ public void testStatusAPICallForShallowV2Snapshot() throws Exception {
logger.info("*** triggering refresh from ");
refresh();

logger.info("Set SNAPSHOT_V2 as true and DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
logger.info("Set SNAPSHOT_V2 as true and MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2).put(SNAPSHOT_V2.getKey(), true)
Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2).put(SNAPSHOT_V2.getKey(), true)
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final String snapshot = "snapshot";
SnapshotInfo snapshotInfo = createFullSnapshot(snapshotRepoName, snapshot);
assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot
logger.info("*** primarySize = {}", client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size());
logger.info(
"*** primarySize = {}",
client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size()
);
ClusterStatsIndices clusterStatsIndices = client().admin().cluster().prepareClusterStats().get().getIndicesStats();
logger.info("*** clusterStatsIndices = {}", clusterStatsIndices.toString());
logger.info("*** clusterStatsIndices.getStore().sizeInBytes() = {}", clusterStatsIndices.getStore().sizeInBytes());



logger.info("Indexing some data");
for (int i = 0; i < 50; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
Expand All @@ -258,16 +258,17 @@ public void testStatusAPICallForShallowV2Snapshot() throws Exception {
final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, snapshot1);
assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot
logger.info("*** primarySize1 = {}", client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size());
logger.info(
"*** primarySize1 = {}",
client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size()
);
ClusterStatsIndices clusterStatsIndices1 = client().admin().cluster().prepareClusterStats().get().getIndicesStats();
logger.info("*** clusterStatsIndices1 = {}", clusterStatsIndices.toString());
logger.info("*** clusterStatsIndices.getStore().sizeInBytes()1 = {}", clusterStatsIndices.getStore().sizeInBytes());



logger.info("Reset SNAPSHOT_V2 and DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API to default values");
logger.info("Reset SNAPSHOT_V2 and MAX_SHARDS_ALLOWED_IN_STATUS_API to default values");
updateSettingsRequest.persistentSettings(
Settings.builder().putNull(DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()).putNull(SNAPSHOT_V2.getKey())
Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()).putNull(SNAPSHOT_V2.getKey())
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.snapshots.SnapshotsService.DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -594,9 +594,9 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws
createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3));
createSnapshot(repositoryName, snapshot2, List.of(index1, index2));

logger.info("Set DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// across a single snapshot
Expand Down Expand Up @@ -630,8 +630,8 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws
);
}, 1, TimeUnit.MINUTES);

logger.info("Reset DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

Expand Down Expand Up @@ -732,9 +732,9 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
assertEquals(cause, ex2.getMessage());

// failure due to too many shards requested
logger.info("Set DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

SnapshotException ex3 = expectThrows(
Expand All @@ -751,8 +751,8 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
assertEquals(ex3.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertTrue(ex3.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request"));

logger.info("Reset DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(DUMMY_MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

}, 1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
package org.opensearch.action.admin.cluster.snapshots.create;

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
Expand All @@ -48,7 +47,6 @@
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.apache.logging.log4j.Logger;

import java.io.IOException;

Expand All @@ -59,7 +57,6 @@
*/
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;
private final Client client;
private static final Logger logger = LogManager.getLogger(TransportCreateSnapshotAction.class);

@Inject
Expand All @@ -69,8 +66,7 @@ public TransportCreateSnapshotAction(
ThreadPool threadPool,
SnapshotsService snapshotsService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
CreateSnapshotAction.NAME,
Expand All @@ -81,7 +77,6 @@ public TransportCreateSnapshotAction(
CreateSnapshotRequest::new,
indexNameExpressionResolver
);
this.client = client;
this.snapshotsService = snapshotsService;
}

Expand Down Expand Up @@ -111,15 +106,10 @@ protected void clusterManagerOperation(
ClusterState state,
final ActionListener<CreateSnapshotResponse> listener
) {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest();
client.admin().cluster().clusterStats(clusterStatsRequest, ActionListener.wrap(clusterStats -> {
final long storeSize = clusterStats.getIndicesStats().getStore().sizeInBytes();
logger.info("*** storeSize inside TransportCreateSnapshotAction = {}", storeSize);
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new), -1);
} else {
snapshotsService.startCreateSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()), -1);
}
}, listener::onFailure));
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.startCreateSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}
Loading

0 comments on commit bcbc7f1

Please sign in to comment.