Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Tracing for deep search path (#12103) #12646

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,19 @@ public SpanContext(Span span) {
Span getSpan() {
return span;
}

/**
* Sets the error for the current span behind this context
* @param cause error
*/
public void setError(final Exception cause) {
span.setError(cause);
}

/**
* Ends current span
*/
public void endSpan() {
span.endSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) {
}

/**
* Sets the parent for spann
* @param parent parent
* Sets the parent for span
* @param parent parent span context
* @return spanCreationContext
*/
public SpanCreationContext parent(SpanContext parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onPhaseStart(SearchPhaseContext context) {}
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -116,6 +120,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final SearchRequestContext searchRequestContext;
private final Tracer tracer;

private SearchPhase currentPhase;
private boolean currentPhaseHasLifecycle;
Expand All @@ -140,7 +145,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -177,6 +183,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestContext = searchRequestContext;
this.tracer = tracer;
}

@Override
Expand Down Expand Up @@ -221,6 +228,7 @@ public final void start() {
null
)
);
onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -460,15 +468,24 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
}

private void executePhase(SearchPhase phase) {
try {
Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
}

if (currentPhaseHasLifecycle == false) {
phaseSpan.setError(e);
}

onPhaseFailure(phase, "", e);
} finally {
if (currentPhaseHasLifecycle == false) {
phaseSpan.endSpan();
}
}
}

Expand Down Expand Up @@ -733,7 +750,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (currentPhaseHasLifecycle) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Comparator;
Expand Down Expand Up @@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.List;
Expand Down Expand Up @@ -77,7 +78,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +99,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Map;
Expand Down Expand Up @@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -102,7 +104,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
public TotalHits totalHits() {
return totalHits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
@InternalApi
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;
public static final SearchRequestOperationsListener NOOP = new SearchRequestOperationsListener(false) {
@Override
protected void onPhaseStart(SearchPhaseContext context) {}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}
};

protected SearchRequestOperationsListener() {
this.enabled = true;
Expand All @@ -35,7 +45,7 @@ protected SearchRequestOperationsListener(final boolean enabled) {

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected abstract void onPhaseFailure(SearchPhaseContext context);
protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down Expand Up @@ -91,10 +101,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, cause);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected void onPhaseStart(SearchPhaseContext context) {}
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {}
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}

Expand Down
Loading
Loading