Skip to content

Commit

Permalink
Deprecated ClusterService and Using NodeClient to fetch meta data (#774
Browse files Browse the repository at this point in the history
…) (#792)

Signed-off-by: penghuo <[email protected]>
  • Loading branch information
penghuo authored Aug 31, 2022
1 parent eeb90cf commit c2973a3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.opensearch.sql.legacy.plugin;

import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.expression.config.ExpressionConfig;
Expand All @@ -34,8 +33,6 @@
@Configuration
@Import({ExpressionConfig.class})
public class OpenSearchSQLPluginConfig {
@Autowired
private ClusterService clusterService;

@Autowired
private NodeClient nodeClient;
Expand All @@ -48,7 +45,7 @@ public class OpenSearchSQLPluginConfig {

@Bean
public OpenSearchClient client() {
return new OpenSearchNodeClient(clusterService, nodeClient);
return new OpenSearchNodeClient(nodeClient);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import com.google.common.collect.Streams;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -18,48 +18,34 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.ThreadContext;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
import org.opensearch.threadpool.ThreadPool;

/** OpenSearch connection by node client. */
public class OpenSearchNodeClient implements OpenSearchClient {

public static final Function<String, Predicate<String>> ALL_FIELDS =
(anyIndex -> (anyField -> true));

/** Current cluster state on local node. */
private final ClusterService clusterService;

/** Node client provided by OpenSearch container. */
private final NodeClient client;

/** Index name expression resolver to get concrete index name. */
private final IndexNameExpressionResolver resolver;

private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

/**
* Constructor of ElasticsearchNodeClient.
*/
public OpenSearchNodeClient(ClusterService clusterService,
NodeClient client) {
this.clusterService = clusterService;
public OpenSearchNodeClient(NodeClient client) {
this.client = client;
this.resolver = new IndexNameExpressionResolver(client.threadPool().getThreadContext());
}
Expand All @@ -78,14 +64,16 @@ public OpenSearchNodeClient(ClusterService clusterService,
@Override
public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
try {
ClusterState state = clusterService.state();
String[] concreteIndices = resolveIndexExpression(state, indexExpression);

return populateIndexMappings(
state.metadata().findMappings(concreteIndices, ALL_FIELDS));
} catch (IOException e) {
GetMappingsResponse mappingsResponse = client.admin().indices()
.prepareGetMappings(indexExpression)
.setLocal(true)
.get();
return Streams.stream(mappingsResponse.mappings().iterator())
.collect(Collectors.toMap(cursor -> cursor.key,
cursor -> new IndexMapping(cursor.value)));
} catch (Exception e) {
throw new IllegalStateException(
"Failed to read mapping in cluster state for index pattern [" + indexExpression + "]", e);
"Failed to read mapping for index pattern [" + indexExpression + "]", e);
}
}

Expand All @@ -97,19 +85,24 @@ public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
*/
@Override
public Map<String, Integer> getIndexMaxResultWindows(String... indexExpression) {
ClusterState state = clusterService.state();
ImmutableOpenMap<String, IndexMetadata> indicesMetadata = state.metadata().getIndices();
String[] concreteIndices = resolveIndexExpression(state, indexExpression);

ImmutableMap.Builder<String, Integer> result = ImmutableMap.builder();
for (String index : concreteIndices) {
Settings settings = indicesMetadata.get(index).getSettings();
Integer maxResultWindow = settings.getAsInt("index.max_result_window",
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(settings));
result.put(index, maxResultWindow);
try {
GetSettingsResponse settingsResponse =
client.admin().indices().prepareGetSettings(indexExpression).setLocal(true).get();
ImmutableMap.Builder<String, Integer> result = ImmutableMap.builder();
for (ObjectObjectCursor<String, Settings> indexToSetting :
settingsResponse.getIndexToSettings()) {
Settings settings = indexToSetting.value;
result.put(
indexToSetting.key,
settings.getAsInt(
IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(),
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(settings)));
}
return result.build();
} catch (Exception e) {
throw new IllegalStateException(
"Failed to read setting for index pattern [" + indexExpression + "]", e);
}

return result.build();
}

/**
Expand Down Expand Up @@ -149,9 +142,8 @@ public List<String> indices() {
*/
@Override
public Map<String, String> meta() {
final ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();
builder.put(META_CLUSTER_NAME, clusterService.getClusterName().value());
return builder.build();
return ImmutableMap.of(META_CLUSTER_NAME,
client.settings().get("cluster.name", "opensearch"));
}

@Override
Expand All @@ -161,40 +153,12 @@ public void cleanup(OpenSearchRequest request) {

@Override
public void schedule(Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(
withCurrentContext(task),
new TimeValue(0),
SQL_WORKER_THREAD_POOL_NAME
);
// at that time, task already running the sql-worker ThreadPool.
task.run();
}

@Override
public NodeClient getNodeClient() {
return client;
}

private String[] resolveIndexExpression(ClusterState state, String[] indices) {
return resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices);
}

private Map<String, IndexMapping> populateIndexMappings(
ImmutableOpenMap<String, MappingMetadata> indexMappings) {

ImmutableMap.Builder<String, IndexMapping> result = ImmutableMap.builder();
for (ObjectObjectCursor<String, MappingMetadata> cursor:
indexMappings) {
result.put(cursor.key, new IndexMapping(cursor.value));
}
return result.build();
}

/** Copy from LogUtils. */
private static Runnable withCurrentContext(final Runnable task) {
final Map<String, String> currentContext = ThreadContext.getImmutableContext();
return () -> {
ThreadContext.putAll(currentContext);
task.run();
};
}
}
Loading

0 comments on commit c2973a3

Please sign in to comment.