Skip to content

Commit

Permalink
support list of prefixes for repository attribute keys
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Oct 18, 2024
1 parent 0bded88 commit a98a5de
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.persistent.PersistentTasksCustomMetadata;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -67,7 +68,8 @@

import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getClusterStateRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRoutingTableRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
Expand Down Expand Up @@ -517,7 +519,11 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
List<String> repos = Arrays.asList(
getClusterStateRepoName(remotePublicationNode.get().getAttributes()),
getRoutingTableRepoName(remotePublicationNode.get().getAttributes())
);
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), repos);
}
}

Expand Down Expand Up @@ -546,16 +552,12 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
List<String> reposToSkip = new ArrayList<>(1);
// find a remote node which has routing table configured
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(
node -> node.isRemoteStoreNode()
&& node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
)
.filter(node -> node.isRemoteStoreNode() && RemoteStoreNodeAttribute.getRoutingTableRepoName(node.getAttributes()) != null)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
// This ensures a new node with remote routing table repo is able to join the cluster.
if (remoteRoutingTableNode.isEmpty()) {
String joiningNodeRepoName = joiningNode.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
String joiningNodeRepoName = getRoutingTableRepoName(joiningNode.getAttributes());
if (joiningNodeRepoName != null) {
reposToSkip.add(joiningNodeRepoName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,12 +1166,8 @@ public static void updateRemoteStoreSettings(
.findFirst();

if (remoteNode.isPresent()) {
translogRepo = remoteNode.get()
.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
segmentRepo = remoteNode.get()
.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
translogRepo = RemoteStoreNodeAttribute.getTranslogRepoName(remoteNode.get().getAttributes());
segmentRepo = RemoteStoreNodeAttribute.getSegmentRepoName(remoteNode.get().getAttributes());
if (segmentRepo != null && translogRepo != null) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo)
Expand Down
32 changes: 15 additions & 17 deletions server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -62,10 +63,9 @@
import java.util.stream.Stream;

import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isClusterStateRepoConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRoutingTableRepoConfigured;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -510,20 +510,15 @@ public boolean isSearchNode() {
* @return true if the node contains remote store node attributes, false otherwise
*/
public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
return isClusterStateRepoConfigured(this.getAttributes()) && RemoteStoreNodeAttribute.isSegmentRepoConfigured(this.getAttributes());
}

/**
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
return this.getAttributes()
.keySet()
.stream()
.anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
return isClusterStateRepoConfigured(this.getAttributes()) && isRoutingTableRepoConfigured(this.getAttributes());
}

/**
Expand Down Expand Up @@ -587,13 +582,16 @@ public String toString() {
sb.append('}');
}
if (!attributes.isEmpty()) {
sb.append(
attributes.entrySet()
.stream()
.filter(entry -> !entry.getKey().startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)) // filter remote_store attributes
// from logging to reduce noise.
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
sb.append(attributes.entrySet().stream().filter(entry -> {
for (String prefix : REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX) {
if (entry.getKey().startsWith(prefix)) {
return false;
}
}
return true;
}) // filter remote_store attributes
// from logging to reduce noise.
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -235,9 +234,7 @@ protected void doClose() throws IOException {
@Override
protected void doStart() {
assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
final String remoteStoreRepo = RemoteStoreNodeAttribute.getRoutingTableRepoName(settings);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata;
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -1069,9 +1068,8 @@ public void close() throws IOException {

public void start() {
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
final String remoteStoreRepo = RemoteStoreNodeAttribute.getClusterStateRepoName(settings);

assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -70,11 +69,6 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {

private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s";
static final String TRANSLOG_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey()
+ RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
static final String SEGMENT_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey()
+ RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;

private static final Logger logger = LogManager.getLogger(RemoteIndexPathUploader.class);

private final Settings settings;
Expand Down Expand Up @@ -226,9 +220,8 @@ private void writePathToRemoteStore(
}
}

private Repository validateAndGetRepository(String repoSetting) {
final String repo = settings.get(repoSetting);
assert repo != null : "Remote " + repoSetting + " repository is not configured";
private Repository validateAndGetRepository(String repo) {
assert repo != null : "Remote repository is not configured";
final Repository repository = repositoriesService.get().repository(repo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
return repository;
Expand All @@ -240,15 +233,16 @@ public void start() {
// If remote store data attributes are not present than we skip this.
return;
}
translogRepository = (BlobStoreRepository) validateAndGetRepository(TRANSLOG_REPO_NAME_KEY);
segmentRepository = (BlobStoreRepository) validateAndGetRepository(SEGMENT_REPO_NAME_KEY);

translogRepository = (BlobStoreRepository) validateAndGetRepository(RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(settings));
segmentRepository = (BlobStoreRepository) validateAndGetRepository(RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(settings));
}

private boolean isTranslogSegmentRepoSame() {
// TODO - The current comparison checks the repository name. But it is also possible that the repository are same
// by attributes, but different by name. We need to handle this.
String translogRepoName = settings.get(TRANSLOG_REPO_NAME_KEY);
String segmentRepoName = settings.get(SEGMENT_REPO_NAME_KEY);
String translogRepoName = RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(settings);
String segmentRepoName = RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(settings);
return Objects.equals(translogRepoName, segmentRepoName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;

import java.util.List;
import java.util.Map;
Expand All @@ -30,8 +31,6 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomMetadataDuringMigration;
import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* Utils for checking and mutating cluster state during remote migration
Expand Down Expand Up @@ -74,8 +73,9 @@ public void maybeAddRemoteIndexSettings(IndexMetadata.Builder indexMetadataBuild
index
);
Map<String, String> remoteRepoNames = getRemoteStoreRepoName(discoveryNodes);
String segmentRepoName = remoteRepoNames.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String tlogRepoName = remoteRepoNames.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
String segmentRepoName = RemoteStoreNodeAttribute.getSegmentRepoName(remoteRepoNames);
String tlogRepoName = RemoteStoreNodeAttribute.getTranslogRepoName(remoteRepoNames);

assert Objects.nonNull(segmentRepoName) && Objects.nonNull(tlogRepoName) : "Remote repo names cannot be null";
Settings.Builder indexSettingsBuilder = Settings.builder().put(currentIndexSettings);
updateRemoteStoreSettings(indexSettingsBuilder, segmentRepoName, tlogRepoName);
Expand Down
Loading

0 comments on commit a98a5de

Please sign in to comment.