Skip to content

Commit

Permalink
Add lastSuccessfulFetchOfPinnedTimestamps to RepositoriesStats
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Sep 10, 2024
1 parent 7cb2bd0 commit b238922
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.OpenSearchException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
Expand Down Expand Up @@ -37,6 +39,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -58,6 +61,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REPOSITORIES;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
Expand Down Expand Up @@ -1011,4 +1015,36 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute");
Settings pinnedTimestampEnabledSettings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m")
.build();
internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings);
internalCluster().startDataOnlyNode(pinnedTimestampEnabledSettings);
ensureStableCluster(2);

logger.info("Sleeping for 70 seconds to wait for fetching of pinned timestamps");
Thread.sleep(70000);

long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
assertBusy(() -> {
NodesStatsResponse nodesStatsResponse = internalCluster().client()
.admin()
.cluster()
.prepareNodesStats()
.addMetric(REPOSITORIES.metricName())
.execute()
.actionGet();
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
long lastRecordedFetch = nodeStats.getRepositoriesStats().getLastSuccessfulFetchOfPinnedTimestamps();
assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps);
}
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -578,7 +580,11 @@ public List<RepositoryStatsSnapshot> repositoriesStats() {
}

public RepositoriesStats getRepositoriesStats() {
return new RepositoriesStats(repositoriesStats());
long lastSuccessfulFetchOfPinnedTimestamps = 0;
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
}
return new RepositoriesStats(repositoriesStats(), lastSuccessfulFetchOfPinnedTimestamps);
}

private List<RepositoryStatsSnapshot> getRepositoryStatsForActiveRepositories() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.repositories;

import org.opensearch.Version;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;
import java.util.List;
Expand All @@ -29,17 +31,30 @@ public class RepositoriesStats implements Writeable, ToXContentObject {

List<RepositoryStatsSnapshot> repositoryStatsSnapshots;

public RepositoriesStats(List<RepositoryStatsSnapshot> repositoryStatsSnapshots) {
private long lastSuccessfulFetchOfPinnedTimestamps;

public RepositoriesStats(List<RepositoryStatsSnapshot> repositoryStatsSnapshots, long lastSuccessfulFetchOfPinnedTimestamps) {
this.repositoryStatsSnapshots = repositoryStatsSnapshots;
this.lastSuccessfulFetchOfPinnedTimestamps = lastSuccessfulFetchOfPinnedTimestamps;
}

public long getLastSuccessfulFetchOfPinnedTimestamps() {
return this.lastSuccessfulFetchOfPinnedTimestamps;
}

public RepositoriesStats(StreamInput in) throws IOException {
this.repositoryStatsSnapshots = in.readList(RepositoryStatsSnapshot::new);
if (in.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) {
this.lastSuccessfulFetchOfPinnedTimestamps = in.readOptionalLong();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(repositoryStatsSnapshots);
if (out.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) {
out.writeOptionalLong(lastSuccessfulFetchOfPinnedTimestamps);
}
}

@Override
Expand All @@ -51,6 +66,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}
builder.endArray();
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
builder.field("last_successful_fetch_of_pinned_timestamps", lastSuccessfulFetchOfPinnedTimestamps);
}
return builder;
}
}

0 comments on commit b238922

Please sign in to comment.