Skip to content

Commit

Permalink
Tracing for deep search path
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Feb 6, 2024
1 parent 52b27f4 commit 15bceb2
Show file tree
Hide file tree
Showing 21 changed files with 282 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
2 changes: 2 additions & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,5 @@ ${error.file}

# HDFS ForkJoinPool.common() support by SecurityManager
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=org.opensearch.secure_sm.SecuredForkJoinWorkerThreadFactory

-Dopensearch.experimental.feature.telemetry.enabled=true
9 changes: 9 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,12 @@ ${path.logs}
# Once there is no observed impact on performance, this feature flag can be removed.
#
#opensearch.experimental.optimization.datetime_formatter_caching.enabled: false

telemetry.tracer.enabled: true
#telemetry.metrics.enabled: true
telemetry.feature.tracer.enabled: true
#telemetry.feature.metrics.enabled: true
#search.query.metrics.enabled: true
telemetry.otel.tracer.span.exporter.class: io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
#telemetry.otel.metrics.exporter.class: io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
telemetry.tracer.sampler.probability: 1.0
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @opensearch.internal
*/
@InternalApi
class DefaultTracer implements Tracer {
public class DefaultTracer implements Tracer {
/**
* Current thread name.
*/
Expand All @@ -39,7 +39,7 @@ class DefaultTracer implements Tracer {
* @param tracingTelemetry tracing telemetry instance
* @param tracerContextStorage storage used for storing current span context
*/
public DefaultTracer(TracingTelemetry tracingTelemetry, TracerContextStorage<String, Span> tracerContextStorage) {
DefaultTracer(TracingTelemetry tracingTelemetry, TracerContextStorage<String, Span> tracerContextStorage) {
this.tracingTelemetry = tracingTelemetry;
this.tracerContextStorage = tracerContextStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ public boolean isEnabled() {
}

@Override
public void onPhaseStart(SearchPhaseContext context) {}
public void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}
public void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public final void start() {
null
)
);
onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -439,10 +440,10 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) {
}
}

void onPhaseStart(SearchPhase phase) {
void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) {
setCurrentPhase(phase);
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext);
}
}

Expand All @@ -452,7 +453,7 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {

private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase);
onPhaseStart(phase, searchRequestContext);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -717,7 +718,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@
*/
@InternalApi
public class SearchRequestContext {
private final SearchRequest searchRequest;
private final SearchRequestOperationsListener searchRequestOperationsListener;
private long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

private final SearchRequest searchRequest;

SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.searchRequest = searchRequest;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand Down Expand Up @@ -78,7 +77,7 @@ void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
public TotalHits totalHits() {
return totalHits;
}

Expand All @@ -89,7 +88,7 @@ void setShardStats(int total, int successful, int skipped, int failed) {
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed);
}

String formattedShardStats() {
public String formattedShardStats() {
if (shardStats.isEmpty()) {
return "";
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

protected abstract void onPhaseStart(SearchPhaseContext context);
protected abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected abstract void onPhaseFailure(SearchPhaseContext context);
protected abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down Expand Up @@ -69,10 +69,10 @@ static final class CompositeListener extends SearchRequestOperationsListener {
}

@Override
protected void onPhaseStart(SearchPhaseContext context) {
protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseStart(context);
listener.onPhaseStart(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e);
}
Expand All @@ -91,10 +91,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ public SearchRequestSlowLog(ClusterService clusterService) {
}

@Override
protected void onPhaseStart(SearchPhaseContext context) {}
protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {}
protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) {
}

@Override
protected void onPhaseStart(SearchPhaseContext context) {
protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
}

Expand All @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.telemetry.tracing.listener.TraceableSearchRequestOperationsListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterAware;
import org.opensearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -173,6 +179,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory;
private final Tracer tracer;

private volatile boolean searchQueryMetricsEnabled;

Expand All @@ -195,7 +202,8 @@ public TransportSearchAction(
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory,
Tracer tracer
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -215,6 +223,7 @@ public TransportSearchAction(
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
this.tracer = tracer;
}

private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
Expand Down Expand Up @@ -431,13 +440,26 @@ private void executeRequest(
if (originalSearchRequest.isPhaseTook() == null) {
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory
.buildCompositeListener(originalSearchRequest, logger);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners;
TraceableSearchRequestOperationsListener traceableSearchRequestOperationsListener = null;
if (tracer.isRecording()) {
traceableSearchRequestOperationsListener = new TraceableSearchRequestOperationsListener(tracer);
requestOperationsListeners = searchRequestOperationsCompositeListenerFactory
.buildCompositeListener(originalSearchRequest, logger, traceableSearchRequestOperationsListener);
} else {
requestOperationsListeners = searchRequestOperationsCompositeListenerFactory
.buildCompositeListener(originalSearchRequest, logger);
}
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;

if (tracer.isRecording() && traceableSearchRequestOperationsListener != null) {
originalListener = TraceableActionListener.create(originalListener, traceableSearchRequestOperationsListener.getRequestSpan(), tracer);
}

try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = searchRequest.transformResponseListener(originalListener);
Expand Down Expand Up @@ -473,7 +495,9 @@ private void executeRequest(
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
try (SpanScope spanScope = tracer.withSpanInScope(traceableSearchRequestOperationsListener.getRequestSpan())) {
searchRequest.transformRequest(requestTransformListener);
}
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,19 @@ private AttributeNames() {
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";

/**
* Search Request Source
*/
public static final String SOURCE = "source";

/**
* Search Response Shard Stats
*/
public static final String SHARDS = "shards";

/**
* Search Response Total Hits
*/
public static final String TOTAL_HITS = "total_hits";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ private SpanBuilder() {

}

/**
* Creates {@link SpanCreationContext} from String
* @param spanName String.
* @return context.
*/
public static SpanCreationContext from(String spanName) {
return SpanCreationContext.server().name(spanName);
}

/**
* Creates {@link SpanCreationContext} from the {@link HttpRequest}
* @param request Http request.
Expand Down Expand Up @@ -170,4 +179,13 @@ private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequ
return attributes;
}

/**
* Creates {@link SpanCreationContext} with parent set to specified SpanContext.
* @param spanName name of span.
* @param parentSpan target parent span.
* @return context
*/
public static SpanCreationContext from(String spanName, SpanContext parentSpan) {
return SpanCreationContext.server().name(spanName).parent(parentSpan);
}
}
Loading

0 comments on commit 15bceb2

Please sign in to comment.