Skip to content

Commit

Permalink
Add lastSuccessfulFetchOfPinnedTimestamps to NodeStats via RemoteStor…
Browse files Browse the repository at this point in the history
…eNodeStats

Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Sep 11, 2024
1 parent 7cb2bd0 commit 7f81828
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.OpenSearchException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
Expand Down Expand Up @@ -37,6 +39,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -58,6 +61,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE_NODE_STATS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
Expand Down Expand Up @@ -1011,4 +1015,36 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute");
Settings pinnedTimestampEnabledSettings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m")
.build();
internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings);
internalCluster().startDataOnlyNode(pinnedTimestampEnabledSettings);
ensureStableCluster(2);

logger.info("Sleeping for 70 seconds to wait for fetching of pinned timestamps");
Thread.sleep(70000);

long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
assertBusy(() -> {
NodesStatsResponse nodesStatsResponse = internalCluster().client()
.admin()
.cluster()
.prepareNodesStats()
.addMetric(REMOTE_STORE_NODE_STATS.metricName())
.execute()
.actionGet();
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps();
assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps);
}
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.repositories.RepositoriesStats;
import org.opensearch.script.ScriptCacheStats;
Expand Down Expand Up @@ -162,6 +163,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private NodeCacheStats nodeCacheStats;

@Nullable
private RemoteStoreNodeStats remoteStoreNodeStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -243,6 +247,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
nodeCacheStats = null;
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new);
} else {
remoteStoreNodeStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -274,7 +283,8 @@ public NodeStats(
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats,
@Nullable NodeCacheStats nodeCacheStats
@Nullable NodeCacheStats nodeCacheStats,
@Nullable RemoteStoreNodeStats remoteStoreNodeStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -305,6 +315,7 @@ public NodeStats(
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
this.nodeCacheStats = nodeCacheStats;
this.remoteStoreNodeStats = remoteStoreNodeStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -467,6 +478,11 @@ public NodeCacheStats getNodeCacheStats() {
return nodeCacheStats;
}

@Nullable
public RemoteStoreNodeStats getRemoteStoreNodeStats() {
return remoteStoreNodeStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -525,6 +541,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeOptionalWriteable(nodeCacheStats);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(remoteStoreNodeStats);
}
}

@Override
Expand Down Expand Up @@ -631,6 +650,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getNodeCacheStats() != null) {
getNodeCacheStats().toXContent(builder, params);
}
if (getRemoteStoreNodeStats() != null) {
getRemoteStoreNodeStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public enum Metric {
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control"),
CACHE_STATS("caches");
CACHE_STATS("caches"),
REMOTE_STORE_NODE_STATS("remote_store_node_stats");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.REMOTE_STORE_NODE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,8 @@ protected Node(
segmentReplicationStatsTracker,
repositoryService,
admissionControlService,
cacheService
cacheService,
remoteStoreNodeService
);

final SearchService searchService = newSearchService(
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class NodeService implements Closeable {
private final AdmissionControlService admissionControlService;
private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;
private final CacheService cacheService;
private final RemoteStoreNodeService remoteStoreNodeService;

NodeService(
Settings settings,
Expand Down Expand Up @@ -128,7 +130,8 @@ public class NodeService implements Closeable {
SegmentReplicationStatsTracker segmentReplicationStatsTracker,
RepositoriesService repositoriesService,
AdmissionControlService admissionControlService,
CacheService cacheService
CacheService cacheService,
RemoteStoreNodeService remoteStoreNodeService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -158,6 +161,7 @@ public class NodeService implements Closeable {
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
this.cacheService = cacheService;
this.remoteStoreNodeService = remoteStoreNodeService;
}

public NodeInfo info(
Expand Down Expand Up @@ -241,7 +245,8 @@ public NodeStats stats(
boolean segmentReplicationTrackerStats,
boolean repositoriesStats,
boolean admissionControl,
boolean cacheService
boolean cacheService,
boolean remoteStoreNodeService
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -274,7 +279,8 @@ public NodeStats stats(
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats() : null,
cacheService ? this.cacheService.stats(indices) : null
cacheService ? this.cacheService.stats(indices) : null,
remoteStoreNodeService ? this.remoteStoreNodeService.getRemoteStoreNodeStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
Expand Down Expand Up @@ -249,4 +250,12 @@ public static boolean isMigratingToRemoteStore(Metadata metadata) {

return (isMixedMode && isRemoteStoreMigrationDirection);
}

public RemoteStoreNodeStats getRemoteStoreNodeStats() {
long lastSuccessfulFetchOfPinnedTimestamps = 0;
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
}
return new RemoteStoreNodeStats(lastSuccessfulFetchOfPinnedTimestamps);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node.remotestore;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;

/**
* Remote store node level stats
* @opensearch.internal
*/
public class RemoteStoreNodeStats implements Writeable, ToXContentFragment {

/**
* Time stamp for the last successful fetch of pinned timestamps by the RemoteStorePinnedTimestampService
*/
private long lastSuccessfulFetchOfPinnedTimestamps;

public RemoteStoreNodeStats(final long lastSuccessfulFetchOfPinnedTimestamps) {
this.lastSuccessfulFetchOfPinnedTimestamps = lastSuccessfulFetchOfPinnedTimestamps;
}

public long getLastSuccessfulFetchOfPinnedTimestamps() {
return this.lastSuccessfulFetchOfPinnedTimestamps;
}

public RemoteStoreNodeStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) {
this.lastSuccessfulFetchOfPinnedTimestamps = in.readOptionalLong();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.CURRENT) && RemoteStoreSettings.isPinnedTimestampsEnabled()) {
out.writeOptionalLong(this.lastSuccessfulFetchOfPinnedTimestamps);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("remote_store_node_stats");
builder.field("last_successful_fetch_of_pinned_timestamps", this.lastSuccessfulFetchOfPinnedTimestamps);
return builder.endObject();
}

@Override
public String toString() {
return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
segmentReplicationRejectionStats,
null,
admissionControlStats,
nodeCacheStats
nodeCacheStats,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse(
null,
null,
null,
null,
null
);
if (defaultBehavior) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -226,6 +227,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ List<NodeStats> adjustNodesStats(List<NodeStats> nodesStats) {
nodeStats.getSegmentReplicationRejectionStats(),
nodeStats.getRepositoriesStats(),
nodeStats.getAdmissionControlStats(),
nodeStats.getNodeCacheStats()
nodeStats.getNodeCacheStats(),
nodeStats.getRemoteStoreNodeStats()
);
}).collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit 7f81828

Please sign in to comment.