diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 4eb9d8f4dc66e..e584ef6ce6c1a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -424,7 +424,7 @@ public void testFileCacheStats() throws Exception { final Client client = client(); final int numNodes = 2; - internalCluster().ensureAtLeastNumSearchNodes(numNodes); + internalCluster().ensureAtLeastNumDataNodes(numNodes); createIndexWithDocsAndEnsureGreen(1, 100, indexName1); createRepositoryWithSettings(null, repoName); @@ -432,6 +432,7 @@ public void testFileCacheStats() throws Exception { deleteIndicesAndEnsureGreen(client, indexName1); assertAllNodesFileCacheEmpty(); + internalCluster().ensureAtLeastNumSearchNodes(numNodes); restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); assertNodesFileCacheNonEmpty(numNodes); } @@ -440,8 +441,7 @@ private void assertAllNodesFileCacheEmpty() { NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet(); for (NodeStats stats : response.getNodes()) { FileCacheStats fcstats = stats.getFileCacheStats(); - assertNotNull(fcstats); - assertTrue(isFileCacheEmpty(fcstats)); + assertNull(fcstats); } } @@ -449,11 +449,15 @@ private void assertNodesFileCacheNonEmpty(int numNodes) { NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet(); int nonEmptyFileCacheNodes = 0; for (NodeStats stats : response.getNodes()) { - FileCacheStats fcstats = stats.getFileCacheStats(); - assertNotNull(fcstats); - if (!isFileCacheEmpty(fcstats)) { - nonEmptyFileCacheNodes++; + FileCacheStats fcStats = stats.getFileCacheStats(); + if (stats.getNode().isSearchNode()) { + if (!isFileCacheEmpty(fcStats)) { + nonEmptyFileCacheNodes++; + } + } else { + assertNull(fcStats); } + } assertEquals(numNodes, nonEmptyFileCacheNodes); } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index a3135c544c7d4..770cb2ef3e3cb 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -121,6 +121,10 @@ public static boolean isRemoteClusterClient(final Settings settings) { return hasRole(settings, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE); } + public static boolean isSearchNode(Settings settings) { + return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE); + } + private final String nodeName; private final String nodeId; private final String ephemeralId; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 475b73a55cc51..c616d272acc98 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.action.search.CreatePitController; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressure; @@ -152,6 +153,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -629,4 +631,14 @@ public void apply(Settings value, Settings current, Settings previous) { public static List> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList(); + /** + * Map of feature flag name to feature-flagged cluster settings. Once each feature + * is ready for production release, the feature flag can be removed, and the + * setting should be moved to {@link #BUILT_IN_CLUSTER_SETTINGS}. + */ + public static final Map> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of( + FeatureFlags.SEARCHABLE_SNAPSHOT, + List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING) + ); + } diff --git a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java index 056397158d9b6..ab91e2ef185b3 100644 --- a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java +++ b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java @@ -91,9 +91,15 @@ public SettingsModule( registerSetting(setting); } + for (Map.Entry> featureFlaggedSetting : ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.entrySet()) { + if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) { + featureFlaggedSetting.getValue().forEach(this::registerSetting); + } + } + for (Map.Entry> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) { if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) { - featureFlaggedSetting.getValue().forEach(feature -> registerSetting(feature)); + featureFlaggedSetting.getValue().forEach(this::registerSetting); } } diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 79d0800817cb3..e57662bd1c343 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -44,6 +44,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NativeFSLockFactory; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; @@ -59,6 +60,8 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; +import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -70,9 +73,15 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; +import org.opensearch.index.store.remote.filecache.FileCacheStats; +import org.opensearch.index.store.remote.utils.cache.CacheUsage; +import org.opensearch.index.store.remote.utils.cache.stats.CacheStats; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; +import org.opensearch.node.Node; import java.io.Closeable; import java.io.IOException; @@ -104,6 +113,7 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableSet; +import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; /** * A component that holds all data paths for a single node. @@ -123,14 +133,20 @@ public static class NodePath { public final Path indicesPath; /** Cached FileStore from path */ public final FileStore fileStore; - + public final Path fileCachePath; + /* + Cache reserved size can default to a different value depending on configuration + */ + public ByteSizeValue fileCacheReservedSize; public final int majorDeviceNumber; public final int minorDeviceNumber; public NodePath(Path path) throws IOException { this.path = path; this.indicesPath = path.resolve(INDICES_FOLDER); + this.fileCachePath = path.resolve(CACHE_FOLDER); this.fileStore = Environment.getFileStore(path); + this.fileCacheReservedSize = ByteSizeValue.ZERO; if (fileStore.supportsFileAttributeView("lucene")) { this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number"); this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number"); @@ -180,6 +196,7 @@ public String toString() { private final Logger logger = LogManager.getLogger(NodeEnvironment.class); private final NodePath[] nodePaths; + private final NodePath fileCacheNodePath; private final Path sharedDataPath; private final Lock[] locks; @@ -189,6 +206,8 @@ public String toString() { private final NodeMetadata nodeMetadata; + private FileCache fileCache; + /** * Maximum number of data nodes that should run in an environment. */ @@ -217,6 +236,7 @@ public String toString() { public static final String NODES_FOLDER = "nodes"; public static final String INDICES_FOLDER = "indices"; + public static final String CACHE_FOLDER = "cache"; public static final String NODE_LOCK_FILENAME = "node.lock"; /** @@ -291,6 +311,7 @@ public void close() { public NodeEnvironment(Settings settings, Environment environment) throws IOException { if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { nodePaths = null; + fileCacheNodePath = null; sharedDataPath = null; locks = null; nodeLockId = -1; @@ -342,6 +363,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } this.locks = nodeLock.locks; this.nodePaths = nodeLock.nodePaths; + this.fileCacheNodePath = nodePaths[0]; + + initializeFileCache(settings); + this.nodeLockId = nodeLock.nodeId; if (logger.isDebugEnabled()) { @@ -366,6 +391,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce ensureNoShardData(nodePaths); } + if (DiscoveryNode.isSearchNode(settings) == false) { + ensureNoFileCacheData(fileCacheNodePath); + } + this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths); success = true; } finally { @@ -375,6 +404,40 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } } + /** + * Initializes the search cache with a defined capacity. + * The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}. + * If the user doesn't configure the cache size, it fails if the node is a data + search node. + * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. + */ + private void initializeFileCache(Settings settings) { + if (DiscoveryNode.isSearchNode(settings)) { + long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); + FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath)); + long availableCapacity = info.getAvailable().getBytes(); + + // Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set. + if (capacity == 0) { + // If node is not a dedicated search node without configuration, prevent cache initialization + if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { + throw new SettingsException( + "Unable to initialize the " + + DiscoveryNodeRole.SEARCH_ROLE.roleName() + + "-" + + DiscoveryNodeRole.DATA_ROLE.roleName() + + " node: Missing value for configuration " + + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() + ); + } else { + capacity = 80 * availableCapacity / 100; + } + } + capacity = Math.min(capacity, availableCapacity); + fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); + this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity); + } + } + /** * Resolve a specific nodes/{node.id} path for the specified path and node lock id. * @@ -888,6 +951,17 @@ public NodePath[] nodePaths() { return nodePaths; } + /** + * Returns the {@link NodePath} used for file caching. + */ + public NodePath fileCacheNodePath() { + assertEnvIsLocked(); + if (nodePaths == null || locks == null) { + throw new IllegalStateException("node is not configured to store local location"); + } + return fileCacheNodePath; + } + public int getNodeLockId() { assertEnvIsLocked(); if (nodePaths == null || locks == null) { @@ -1143,6 +1217,22 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException { } } + /** + * Throws an exception if cache exists on a non-search node. + */ + private void ensureNoFileCacheData(final NodePath fileCacheNodePath) throws IOException { + List cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath); + if (cacheDataPaths.isEmpty() == false) { + final String message = String.format( + Locale.ROOT, + "node does not have the %s role but has data within node search cache: %s. Use 'opensearch-node repurpose' tool to clean up", + DiscoveryNodeRole.SEARCH_ROLE.roleName(), + cacheDataPaths + ); + throw new IllegalStateException(message); + } + } + private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException { List indexMetadataPaths = collectIndexMetadataPaths(nodePaths); if (indexMetadataPaths.isEmpty() == false) { @@ -1200,6 +1290,34 @@ private static boolean isIndexMetadataPath(Path path) { return Files.isDirectory(path) && path.getFileName().toString().equals(MetadataStateFormat.STATE_DIR_NAME); } + /** + * Collect the path containing cache data in the indicated cache node path. + * The returned paths will point to the shard data folder. + */ + static List collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException { + List indexSubPaths = new ArrayList<>(); + Path fileCachePath = fileCacheNodePath.fileCachePath; + if (Files.isDirectory(fileCachePath)) { + try (DirectoryStream nodeStream = Files.newDirectoryStream(fileCachePath)) { + for (Path nodePath : nodeStream) { + if (Files.isDirectory(nodePath)) { + try (DirectoryStream indexStream = Files.newDirectoryStream(nodePath)) { + for (Path indexPath : indexStream) { + if (Files.isDirectory(indexPath)) { + try (Stream shardStream = Files.list(indexPath)) { + shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add); + } + } + } + } + } + } + } + } + + return indexSubPaths; + } + /** * Resolve the custom path for a index's shard. */ @@ -1306,4 +1424,34 @@ private static void tryWriteTempFile(Path path) throws IOException { } } } + + /** + * Returns the {@link FileCache} instance for remote search node + */ + public FileCache fileCache() { + return this.fileCache; + } + + /** + * Returns the current {@link FileCacheStats} for remote search node + */ + public FileCacheStats fileCacheStats() { + if (fileCache == null) { + return null; + } + + CacheStats stats = fileCache.stats(); + CacheUsage usage = fileCache.usage(); + return new FileCacheStats( + System.currentTimeMillis(), + usage.activeUsage(), + fileCache.capacity(), + usage.usage(), + stats.evictionWeight(), + stats.removeWeight(), + stats.replaceCount(), + stats.hitCount(), + stats.missCount() + ); + } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index fa2729224d52f..2ced9f56d7a35 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -458,53 +458,9 @@ public synchronized IndexShard createShard( try { lock = nodeEnv.shardLock(shardId, "starting shard", TimeUnit.SECONDS.toMillis(5)); eventListener.beforeIndexShardCreated(shardId, indexSettings); - ShardPath path; - try { - path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); - } catch (IllegalStateException ex) { - logger.warn("{} failed to load shard path, trying to remove leftover", shardId); - try { - ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); - path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); - } catch (Exception inner) { - ex.addSuppressed(inner); - throw ex; - } - } - - if (path == null) { - // TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard - // that's being relocated/replicated we know how large it will become once it's done copying: - // Count up how many shards are currently on each data path: - Map dataPathToShardCount = new HashMap<>(); - for (IndexShard shard : this) { - Path dataPath = shard.shardPath().getRootStatePath(); - Integer curCount = dataPathToShardCount.get(dataPath); - if (curCount == null) { - curCount = 0; - } - dataPathToShardCount.put(dataPath, curCount + 1); - } - path = ShardPath.selectNewPathForShard( - nodeEnv, - shardId, - this.indexSettings, - routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE - ? getAvgShardSizeInBytes() - : routing.getExpectedShardSize(), - dataPathToShardCount - ); - logger.debug("{} creating using a new path [{}]", shardId, path); - } else { - logger.debug("{} creating using an existing path [{}]", shardId, path); - } - - if (shards.containsKey(shardId.id())) { - throw new IllegalStateException(shardId + " already exists"); - } - + ShardPath path = getShardPath(routing, shardId, lock); logger.debug("creating shard_id {}", shardId); - // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. + // if we are on a shared FS we only own the shard (i.e. we can safely delete it) if we are the primary. final Engine.Warmer engineWarmer = (reader) -> { IndexShard shard = getShardOrNull(shardId.getId()); if (shard != null) { @@ -573,6 +529,63 @@ public synchronized IndexShard createShard( } } + /* + Fetches the shard path based on the index type - + For a remote snapshot index, the cache path is used to initialize the shards. + For a local index, a local shard path is loaded or a new path is calculated. + */ + private ShardPath getShardPath(ShardRouting routing, ShardId shardId, ShardLock lock) throws IOException { + ShardPath path; + if (this.indexSettings.isRemoteSnapshot()) { + path = ShardPath.loadFileCachePath(nodeEnv, shardId); + } else { + try { + path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); + } catch (IllegalStateException ex) { + logger.warn("{} failed to load shard path, trying to remove leftover", shardId); + try { + ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); + path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); + } catch (Exception inner) { + ex.addSuppressed(inner); + throw ex; + } + } + + if (path == null) { + // TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard + // that's being relocated/replicated we know how large it will become once it's done copying: + // Count up how many shards are currently on each data path: + Map dataPathToShardCount = new HashMap<>(); + for (IndexShard shard : this) { + Path dataPath = shard.shardPath().getRootStatePath(); + Integer curCount = dataPathToShardCount.get(dataPath); + if (curCount == null) { + curCount = 0; + } + dataPathToShardCount.put(dataPath, curCount + 1); + } + path = ShardPath.selectNewPathForShard( + nodeEnv, + shardId, + this.indexSettings, + routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE + ? getAvgShardSizeInBytes() + : routing.getExpectedShardSize(), + dataPathToShardCount + ); + logger.debug("{} creating using a new path [{}]", shardId, path); + } else { + logger.debug("{} creating using an existing path [{}]", shardId, path); + } + } + + if (shards.containsKey(shardId.id())) { + throw new IllegalStateException(shardId + " already exists"); + } + return path; + } + @Override public synchronized void removeShard(int shardId, String reason) { final ShardId sId = new ShardId(index(), shardId); diff --git a/server/src/main/java/org/opensearch/index/shard/ShardPath.java b/server/src/main/java/org/opensearch/index/shard/ShardPath.java index 93fd99fff4527..6980dbb7ef355 100644 --- a/server/src/main/java/org/opensearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/opensearch/index/shard/ShardPath.java @@ -130,6 +130,16 @@ public boolean isCustomDataPath() { return isCustomDataPath; } + /** + * Returns the shard path to be stored within the cache on the search capable node. + */ + public static ShardPath loadFileCachePath(NodeEnvironment env, ShardId shardId) { + NodeEnvironment.NodePath path = env.fileCacheNodePath(); + final Path dataPath = env.resolveCustomLocation(path.fileCachePath.toString(), shardId); + final Path statePath = path.resolve(shardId); + return new ShardPath(true, dataPath, statePath, shardId); + } + /** * This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple * directories with a valid shard state exist the one with the highest version will be used. diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index bedd81b609751..0c6007db2f74e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -132,10 +132,6 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.remote.filecache.FileCache; -import org.opensearch.index.store.remote.filecache.FileCacheStats; -import org.opensearch.index.store.remote.utils.cache.CacheUsage; -import org.opensearch.index.store.remote.utils.cache.stats.CacheStats; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.TranslogFactory; @@ -263,8 +259,6 @@ public class IndicesService extends AbstractLifecycleComponent private final TimeValue cleanInterval; final IndicesRequestCache indicesRequestCache; // pkg-private for testing private final IndicesQueryCache indicesQueryCache; - - private final FileCache remoteStoreFileCache; private final MetaStateService metaStateService; private final Collection>> engineFactoryProviders; private final Map directoryFactories; @@ -310,8 +304,7 @@ public IndicesService( ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, - Supplier repositoriesServiceSupplier, - FileCache remoteStoreFileCache + Supplier repositoriesServiceSupplier ) { this.settings = settings; this.threadPool = threadPool; @@ -325,7 +318,6 @@ public IndicesService( this.indexNameExpressionResolver = indexNameExpressionResolver; this.indicesRequestCache = new IndicesRequestCache(settings); this.indicesQueryCache = new IndicesQueryCache(settings); - this.remoteStoreFileCache = remoteStoreFileCache; this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; indexingMemoryController = new IndexingMemoryController( @@ -427,8 +419,7 @@ public IndicesService( ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, - Supplier repositoriesServiceSupplier, - FileCache remoteStoreFileCache + Supplier repositoriesServiceSupplier ) { this.settings = settings; this.threadPool = threadPool; @@ -436,7 +427,6 @@ public IndicesService( this.extensionsManager = extensionsManager; this.nodeEnv = nodeEnv; this.xContentRegistry = xContentRegistry; - this.remoteStoreFileCache = remoteStoreFileCache; this.valuesSourceRegistry = valuesSourceRegistry; this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; @@ -1933,20 +1923,4 @@ public boolean allPendingDanglingIndicesWritten() { return nodeWriteDanglingIndicesInfo == false || (danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0); } - - public FileCacheStats getFileCacheStats() { - CacheStats stats = remoteStoreFileCache.stats(); - CacheUsage usage = remoteStoreFileCache.usage(); - return new FileCacheStats( - System.currentTimeMillis(), - usage.activeUsage(), - remoteStoreFileCache.capacity(), - usage.usage(), - stats.evictionWeight(), - stats.removeWeight(), - stats.replaceCount(), - stats.hitCount(), - stats.missCount() - ); - } } diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 8016e5243cc38..29feb541f4b5f 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -32,11 +32,13 @@ package org.opensearch.monitor.fs; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -70,6 +72,8 @@ public static class Path implements Writeable, ToXContentObject { long total = -1; long free = -1; long available = -1; + long fileCacheReserved = -1; + long fileCacheUtilized = 0; public Path() {} @@ -91,6 +95,10 @@ public Path(StreamInput in) throws IOException { total = in.readLong(); free = in.readLong(); available = in.readLong(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_3_0_0)) { + fileCacheReserved = in.readLong(); + fileCacheUtilized = in.readLong(); + } } @Override @@ -101,6 +109,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(total); out.writeLong(free); out.writeLong(available); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeLong(fileCacheReserved); + out.writeLong(fileCacheUtilized); + } } public String getPath() { @@ -127,6 +139,14 @@ public ByteSizeValue getAvailable() { return new ByteSizeValue(available); } + public ByteSizeValue getFileCacheReserved() { + return new ByteSizeValue(fileCacheReserved); + } + + public ByteSizeValue getFileCacheUtilized() { + return new ByteSizeValue(fileCacheUtilized); + } + private long addLong(long current, long other) { if (current == -1 && other == -1) { return 0; @@ -143,6 +163,8 @@ private long addLong(long current, long other) { public void add(Path path) { total = FsProbe.adjustForHugeFilesystems(addLong(total, path.total)); free = FsProbe.adjustForHugeFilesystems(addLong(free, path.free)); + fileCacheReserved = FsProbe.adjustForHugeFilesystems(addLong(fileCacheReserved, path.fileCacheReserved)); + fileCacheUtilized = FsProbe.adjustForHugeFilesystems(addLong(fileCacheUtilized, path.fileCacheUtilized)); available = FsProbe.adjustForHugeFilesystems(addLong(available, path.available)); } @@ -156,6 +178,10 @@ static final class Fields { static final String FREE_IN_BYTES = "free_in_bytes"; static final String AVAILABLE = "available"; static final String AVAILABLE_IN_BYTES = "available_in_bytes"; + static final String CACHE_RESERVED = "cache_reserved"; + static final String CACHE_RESERVED_IN_BYTES = "cache_reserved_in_bytes"; + static final String CACHE_UTILIZED = "cache_utilized"; + static final String CACHE_UTILIZED_IN_BYTES = "cache_utilized_in_bytes"; } @Override @@ -180,6 +206,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (available != -1) { builder.humanReadableField(Fields.AVAILABLE_IN_BYTES, Fields.AVAILABLE, getAvailable()); } + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && fileCacheReserved != -1) { + builder.humanReadableField(Fields.CACHE_RESERVED_IN_BYTES, Fields.CACHE_RESERVED, getFileCacheReserved()); + } + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && fileCacheReserved != 0) { + builder.humanReadableField(Fields.CACHE_UTILIZED, Fields.CACHE_UTILIZED_IN_BYTES, getFileCacheUtilized()); + } builder.endObject(); return builder; diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index a196a449fa10a..b1deef0c22f99 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -36,9 +36,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Constants; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.PathUtils; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment.NodePath; @@ -62,8 +65,11 @@ public class FsProbe { private final NodeEnvironment nodeEnv; - public FsProbe(NodeEnvironment nodeEnv) { + private final Settings settings; + + public FsProbe(NodeEnvironment nodeEnv, Settings settings) { this.nodeEnv = nodeEnv; + this.settings = settings; } public FsInfo stats(FsInfo previous) throws IOException { @@ -74,13 +80,18 @@ public FsInfo stats(FsInfo previous) throws IOException { FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length]; for (int i = 0; i < dataLocations.length; i++) { paths[i] = getFSInfo(dataLocations[i]); + if (settings != null && DiscoveryNode.isSearchNode(settings) && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) { + paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes()); + paths[i].fileCacheUtilized = adjustForHugeFilesystems(nodeEnv.fileCacheStats().getUsed().getBytes()); + paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized); + } } FsInfo.IoStats ioStats = null; if (Constants.LINUX) { Set> devicesNumbers = new HashSet<>(); - for (int i = 0; i < dataLocations.length; i++) { - if (dataLocations[i].majorDeviceNumber != -1 && dataLocations[i].minorDeviceNumber != -1) { - devicesNumbers.add(Tuple.tuple(dataLocations[i].majorDeviceNumber, dataLocations[i].minorDeviceNumber)); + for (NodePath dataLocation : dataLocations) { + if (dataLocation.majorDeviceNumber != -1 && dataLocation.minorDeviceNumber != -1) { + devicesNumbers.add(Tuple.tuple(dataLocation.majorDeviceNumber, dataLocation.minorDeviceNumber)); } } ioStats = ioStats(devicesNumbers, previous); @@ -167,6 +178,7 @@ public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException { fsPath.total = adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace()); fsPath.free = adjustForHugeFilesystems(nodePath.fileStore.getUnallocatedSpace()); fsPath.available = adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace()); + fsPath.fileCacheReserved = adjustForHugeFilesystems(nodePath.fileCacheReservedSize.getBytes()); fsPath.type = nodePath.fileStore.type(); fsPath.mount = nodePath.fileStore.toString(); return fsPath; diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsService.java b/server/src/main/java/org/opensearch/monitor/fs/FsService.java index 728a6d7f0b36d..f0cd1eb94c73b 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsService.java @@ -70,7 +70,7 @@ public class FsService { ); public FsService(final Settings settings, final NodeEnvironment nodeEnvironment) { - final FsProbe probe = new FsProbe(nodeEnvironment); + final FsProbe probe = new FsProbe(nodeEnvironment, settings); final FsInfo initialValue = stats(probe, null); if (ALWAYS_REFRESH_SETTING.get(settings)) { assert REFRESH_INTERVAL_SETTING.exists(settings) == false; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 41b24c01aef33..6b7d810e9f0d9 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,19 +36,16 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.opensearch.common.SetOnce; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexingPressureService; -import org.opensearch.index.store.remote.filecache.FileCache; -import org.opensearch.index.store.remote.filecache.FileCacheFactory; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; -import org.opensearch.monitor.fs.FsInfo; -import org.opensearch.monitor.fs.FsProbe; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.TaskResourceTrackingService; @@ -60,7 +57,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionType; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; @@ -316,6 +312,12 @@ public class Node implements Closeable { } }, Setting.Property.NodeScope); + public static final Setting NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting( + "node.search.cache.size", + ByteSizeValue.ZERO, + Property.NodeScope + ); + private static final String CLIENT_TYPE = "node"; /** @@ -625,12 +627,11 @@ protected Node( final Collection>> engineFactoryProviders = enginePlugins.stream() .map(plugin -> (Function>) plugin::getEngineFactory) .collect(Collectors.toList()); - // TODO: for now this is a single cache, later, this should read node and index settings - final FileCache remoteStoreFileCache = createRemoteStoreFileCache(); + final Map builtInDirectoryFactories = IndexModule.createBuiltInDirectoryFactories( repositoriesServiceReference::get, threadPool, - remoteStoreFileCache + nodeEnvironment.fileCache() ); final Map directoryFactories = new HashMap<>(); @@ -698,8 +699,7 @@ protected Node( searchModule.getValuesSourceRegistry(), recoveryStateFactories, remoteDirectoryFactory, - repositoriesServiceReference::get, - remoteStoreFileCache + repositoriesServiceReference::get ); } else { indicesService = new IndicesService( @@ -724,8 +724,7 @@ protected Node( searchModule.getValuesSourceRegistry(), recoveryStateFactories, remoteDirectoryFactory, - repositoriesServiceReference::get, - remoteStoreFileCache + repositoriesServiceReference::get ); } @@ -969,7 +968,8 @@ protected Node( searchTransportService, indexingPressureService, searchModule.getValuesSourceRegistry().getUsageService(), - searchBackpressureService + searchBackpressureService, + nodeEnvironment ); final SearchService searchService = newSearchService( @@ -1130,16 +1130,6 @@ protected Node( } } - private FileCache createRemoteStoreFileCache() { - // TODO: implement more custom logic to create named caches, using multiple node paths, more capacity computation options and - // capacity reservation logic - FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(nodeEnvironment.nodePaths()[0])); - long diskCapacity = info.getTotal().getBytes(); - // hard coded as 50% for now - long capacity = (long) (diskCapacity * 0.50); - return FileCacheFactory.createConcurrentLRUFileCache(capacity); - } - protected TransportService newTransportService( Settings settings, Transport transport, diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 8d15c69d2c428..b4446085243df 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -45,6 +45,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.discovery.Discovery; +import org.opensearch.env.NodeEnvironment; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; import org.opensearch.indices.IndicesService; @@ -85,8 +86,8 @@ public class NodeService implements Closeable { private final AggregationUsageService aggregationUsageService; private final SearchBackpressureService searchBackpressureService; private final ClusterService clusterService; - private final Discovery discovery; + private final NodeEnvironment nodeEnvironment; NodeService( Settings settings, @@ -106,7 +107,8 @@ public class NodeService implements Closeable { SearchTransportService searchTransportService, IndexingPressureService indexingPressureService, AggregationUsageService aggregationUsageService, - SearchBackpressureService searchBackpressureService + SearchBackpressureService searchBackpressureService, + NodeEnvironment nodeEnvironment ) { this.settings = settings; this.threadPool = threadPool; @@ -126,6 +128,7 @@ public class NodeService implements Closeable { this.aggregationUsageService = aggregationUsageService; this.searchBackpressureService = searchBackpressureService; this.clusterService = clusterService; + this.nodeEnvironment = nodeEnvironment; clusterService.addStateApplier(ingestService); } @@ -206,7 +209,7 @@ public NodeStats stats( searchBackpressure ? this.searchBackpressureService.nodeStats() : null, clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null, weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, - fileCacheStats ? indicesService.getFileCacheStats() : null + fileCacheStats ? nodeEnvironment.fileCacheStats() : null ); } diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java index 4f16e7526c9ea..0232f671f86de 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java @@ -50,12 +50,14 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.opensearch.test.NodeRoles.nonRemoteClusterClientNode; -import static org.opensearch.test.NodeRoles.remoteClusterClientNode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; +import static org.opensearch.test.NodeRoles.nonRemoteClusterClientNode; +import static org.opensearch.test.NodeRoles.remoteClusterClientNode; +import static org.opensearch.test.NodeRoles.searchNode; +import static org.opensearch.test.NodeRoles.nonSearchNode; public class DiscoveryNodeTests extends OpenSearchTestCase { @@ -175,6 +177,14 @@ public void testDiscoveryNodeIsRemoteClusterClientUnset() { runTestDiscoveryNodeIsRemoteClusterClient(nonRemoteClusterClientNode(), false); } + public void testDiscoveryNodeIsSearchSet() { + runTestDiscoveryNodeIsSearch(searchNode(), true); + } + + public void testDiscoveryNodeIsSearchUnset() { + runTestDiscoveryNodeIsSearch(nonSearchNode(), false); + } + // Added in 2.0 temporarily, validate the MASTER_ROLE is in the list of known roles. // MASTER_ROLE was removed from BUILT_IN_ROLES and is imported by setDeprecatedMasterRole(), // as a workaround for making the new CLUSTER_MANAGER_ROLE has got the same abbreviation 'm'. @@ -194,6 +204,16 @@ private void runTestDiscoveryNodeIsRemoteClusterClient(final Settings settings, } } + private void runTestDiscoveryNodeIsSearch(final Settings settings, final boolean expected) { + final DiscoveryNode node = DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node"); + assertThat(node.isSearchNode(), equalTo(expected)); + if (expected) { + assertThat(node.getRoles(), hasItem(DiscoveryNodeRole.SEARCH_ROLE)); + } else { + assertThat(node.getRoles(), not(hasItem(DiscoveryNodeRole.SEARCH_ROLE))); + } + } + public void testGetRoleFromRoleNameIsCaseInsensitive() { String dataRoleName = "DATA"; DiscoveryNodeRole dataNodeRole = DiscoveryNode.getRoleFromRoleName(dataRoleName); diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 9c2ba140cdc09..0a0e4f0d1fcdb 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -38,6 +38,9 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.set.Sets; import org.opensearch.core.internal.io.IOUtils; @@ -45,6 +48,8 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardId; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsProbe; import org.opensearch.node.Node; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; @@ -65,6 +70,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.opensearch.test.NodeRoles.addRoles; import static org.opensearch.test.NodeRoles.nonDataNode; import static org.opensearch.test.NodeRoles.nonClusterManagerNode; import static org.hamcrest.CoreMatchers.equalTo; @@ -583,6 +589,49 @@ public void testEnsureNoShardDataOrIndexMetadata() throws IOException { verifyFailsOnShardData(noDataNoClusterManagerSettings, indexPath, shardDataDirName); } + public void testSearchFileCacheConfiguration() throws IOException { + Settings searchRoleSettings = addRoles(buildEnvSettings(Settings.EMPTY), Set.of(DiscoveryNodeRole.SEARCH_ROLE)); + ByteSizeValue cacheSize = new ByteSizeValue(100, ByteSizeUnit.MB); + Settings searchRoleSettingsWithConfig = Settings.builder() + .put(searchRoleSettings) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize) + .build(); + + Settings onlySearchRoleSettings = Settings.builder() + .put(searchRoleSettings) + .put( + NodeRoles.removeRoles( + searchRoleSettings, + Set.of( + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, + DiscoveryNodeRole.INGEST_ROLE, + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE + ) + ) + ) + .build(); + + // Test exception thrown with configuration missing + assertThrows(SettingsException.class, () -> newNodeEnvironment(searchRoleSettings)); + + // Test data + search node with defined cache size + try (NodeEnvironment env = newNodeEnvironment(searchRoleSettingsWithConfig)) { + NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); + assertEquals(cacheSize.getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + + // Test dedicated search node with no configuration + try (NodeEnvironment env = newNodeEnvironment(onlySearchRoleSettings)) { + NodeEnvironment.NodePath fileCacheNodePath = env.fileCacheNodePath(); + assertTrue(fileCacheNodePath.fileCacheReservedSize.getBytes() > 0); + FsProbe fsProbe = new FsProbe(env, onlySearchRoleSettings); + FsInfo fsInfo = fsProbe.stats(null); + FsInfo.Path cachePathInfo = fsInfo.iterator().next(); + assertEquals(cachePathInfo.getFileCacheReserved().getBytes(), fileCacheNodePath.fileCacheReservedSize.getBytes()); + } + } + private void verifyFailsOnShardData(Settings settings, Path indexPath, String shardDataDirName) { IllegalStateException ex = expectThrows( IllegalStateException.class, diff --git a/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java b/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java index 25ec7c7987855..443325b8716c0 100644 --- a/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java @@ -33,7 +33,6 @@ import org.opensearch.cluster.routing.AllocationId; import org.opensearch.common.settings.Settings; -import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.Index; @@ -44,6 +43,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; +import static org.opensearch.env.Environment.PATH_SHARED_DATA_SETTING; public class ShardPathTests extends OpenSearchTestCase { public void testLoadShardPath() throws IOException { @@ -109,9 +109,7 @@ public void testGetRootPaths() throws IOException { if (useCustomDataPath) { final Path path = createTempDir(); customDataPath = "custom"; - nodeSettings = Settings.builder() - .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath()) - .build(); + nodeSettings = Settings.builder().put(PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath()).build(); customPath = path.resolve("custom").resolve("0"); } else { customPath = null; @@ -149,6 +147,23 @@ public void testGetRootPaths() throws IOException { } } + public void testLoadFileCachePath() throws IOException { + Settings searchNodeSettings = Settings.builder().put("node.roles", "search").put(PATH_SHARED_DATA_SETTING.getKey(), "").build(); + + try (NodeEnvironment env = newNodeEnvironment(searchNodeSettings)) { + ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); + Path fileCachePath = env.fileCacheNodePath().fileCachePath; + writeShardStateMetadata("0xDEADBEEF", fileCachePath); + ShardPath shardPath = ShardPath.loadFileCachePath(env, shardId); + + assertTrue(shardPath.getDataPath().startsWith(fileCachePath)); + assertFalse(shardPath.getShardStatePath().startsWith(fileCachePath)); + + assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); + assertEquals("foo", shardPath.getShardId().getIndexName()); + } + } + private static void writeShardStateMetadata(String indexUUID, Path... paths) throws WriteStateException { ShardStateMetadata.FORMAT.writeAndCleanup( new ShardStateMetadata(true, indexUUID, AllocationId.newInitializing(), ShardStateMetadata.IndexDataLocation.LOCAL), diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java index b70b76c23fb96..25da974a9f1dc 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.util.Constants; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment.NodePath; import org.opensearch.test.OpenSearchTestCase; @@ -64,7 +65,7 @@ public class FsProbeTests extends OpenSearchTestCase { public void testFsInfo() throws IOException { try (NodeEnvironment env = newNodeEnvironment()) { - FsProbe probe = new FsProbe(env); + FsProbe probe = new FsProbe(env, null); FsInfo stats = probe.stats(null); assertNotNull(stats); @@ -103,6 +104,39 @@ public void testFsInfo() throws IOException { assertThat(path.total, greaterThan(0L)); assertThat(path.free, greaterThan(0L)); assertThat(path.available, greaterThan(0L)); + assertTrue(path.fileCacheReserved == 0); + assertTrue(path.fileCacheUtilized == 0); + } + } + } + + public void testFsCacheInfo() throws IOException { + Settings settings = Settings.builder().put("node.roles", "search").build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + FsProbe probe = new FsProbe(env, settings); + FsInfo stats = probe.stats(null); + assertNotNull(stats); + assertTrue(stats.getTimestamp() > 0L); + FsInfo.Path total = stats.getTotal(); + assertNotNull(total); + assertTrue(total.total > 0L); + assertTrue(total.free > 0L); + assertTrue(total.available > 0L); + assertTrue(total.fileCacheReserved > 0L); + assertTrue((total.free - total.available) >= total.fileCacheReserved); + + for (FsInfo.Path path : stats) { + assertNotNull(path); + assertFalse(path.getPath().isEmpty()); + assertFalse(path.getMount().isEmpty()); + assertFalse(path.getType().isEmpty()); + assertTrue(path.total > 0L); + assertTrue(path.free > 0L); + assertTrue(path.available > 0L); + + if (path.fileCacheReserved > -1L) { + assertTrue(path.free - path.available >= path.fileCacheReserved); + } } } } @@ -173,7 +207,7 @@ public void testIoStats() { ) ); - final FsProbe probe = new FsProbe(null) { + final FsProbe probe = new FsProbe(null, null) { @Override List readProcDiskStats() throws IOException { return diskStats.get(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 5939ca0919342..33b7e74ad7b51 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1834,8 +1834,7 @@ public void onFailure(final Exception e) { null, emptyMap(), new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get, - null + repositoriesServiceReference::get ); } else { indicesService = new IndicesService( @@ -1871,8 +1870,7 @@ public void onFailure(final Exception e) { null, emptyMap(), new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get, - null + repositoriesServiceReference::get ); } final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 126e9357a15ca..0cbffdb38f05a 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -218,6 +218,8 @@ public final class InternalTestCluster extends TestCluster { nodeAndClient.node.settings() ); + private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(100, ByteSizeUnit.MB); + public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1; public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3; @@ -684,11 +686,7 @@ public synchronized void ensureAtLeastNumSearchNodes(int n) { int size = numSearchNodes(); if (size < n) { logger.info("increasing cluster size from {} to {}", size, n); - if (numSharedDedicatedClusterManagerNodes > 0) { - startSearchOnlyNodes(n - size); - } else { - startNodes(n - size, Settings.builder().put(onlyRole(Settings.EMPTY, DiscoveryNodeRole.SEARCH_ROLE)).build()); - } + startNodes(n - size, Settings.builder().put(onlyRole(Settings.EMPTY, DiscoveryNodeRole.SEARCH_ROLE)).build()); validateClusterFormed(); } } @@ -702,14 +700,12 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) { int size = numSearchAndDataNodes(); if (size < n) { logger.info("increasing cluster size from {} to {}", size, n); - if (numSharedDedicatedClusterManagerNodes > 0) { - startDataAndSearchNodes(n - size); - } else { - Set searchAndDataRoles = new HashSet<>(); - searchAndDataRoles.add(DiscoveryNodeRole.DATA_ROLE); - searchAndDataRoles.add(DiscoveryNodeRole.SEARCH_ROLE); - startNodes(n - size, Settings.builder().put(onlyRoles(Settings.EMPTY, searchAndDataRoles)).build()); - } + Set searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE); + Settings settings = Settings.builder() + .put(Settings.EMPTY) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE) + .build(); + startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build()); validateClusterFormed(); } } diff --git a/test/framework/src/main/java/org/opensearch/test/NodeRoles.java b/test/framework/src/main/java/org/opensearch/test/NodeRoles.java index 958b6c81def34..4285ac76fc4d4 100644 --- a/test/framework/src/main/java/org/opensearch/test/NodeRoles.java +++ b/test/framework/src/main/java/org/opensearch/test/NodeRoles.java @@ -244,4 +244,20 @@ public static Settings nonRemoteClusterClientNode(final Settings settings) { return removeRoles(settings, Collections.singleton(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)); } + public static Settings searchNode() { + return searchNode(Settings.EMPTY); + } + + public static Settings searchNode(final Settings settings) { + return addRoles(settings, Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)); + } + + public static Settings nonSearchNode() { + return nonSearchNode(Settings.EMPTY); + } + + public static Settings nonSearchNode(final Settings settings) { + return removeRoles(settings, Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)); + } + }