Skip to content

Commit

Permalink
Add span scopes
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 11, 2024
1 parent 68f1723 commit 8d9e4a1
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -452,7 +453,9 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase, searchRequestContext);
phase.recordAndRun();
try (SpanScope spanScope = searchRequestContext.getTracer().withSpanInScope(searchRequestContext.getPhaseSpan())) {
phase.recordAndRun();
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.util.EnumMap;
import java.util.HashMap;
Expand All @@ -33,6 +35,7 @@ class SearchRequestContext {
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;
private Tracer tracer;
private Span requestSpan;
private Span phaseSpan;

Expand All @@ -50,9 +53,17 @@ class SearchRequestContext {
this(searchRequest, new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}

/**
* This constructor is for testing only
*/
SearchRequestContext(SearchRequest searchRequest, SearchRequestOperationsListener searchRequestOperationsListener) {
this(searchRequest, searchRequestOperationsListener, NoopTracer.INSTANCE);
}

SearchRequestContext(SearchRequest searchRequest, SearchRequestOperationsListener searchRequestOperationsListener, Tracer tracer) {
this.searchRequest = searchRequest;
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.tracer = tracer;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
Expand Down Expand Up @@ -131,6 +142,10 @@ public String formattedShardStats() {
}
}

public Tracer getTracer() {
return tracer;
}

public void setRequestSpan(Span requestSpan) {
this.requestSpan = requestSpan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import static org.opensearch.core.common.Strings.capitalize;
Expand All @@ -37,24 +36,21 @@ void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequest
searchRequestContext.setPhaseSpan(
tracer.startSpan(
SpanBuilder.from(
"coordinator" + capitalize(context.getCurrentPhase().getName()),
"coord" + capitalize(context.getCurrentPhase().getName()),
new SpanContext(searchRequestContext.getRequestSpan())
)
)
);
SpanScope spanScope = tracer.withSpanInScope(searchRequestContext.getPhaseSpan());
}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
searchRequestContext.setPhaseSpan(null);
}

@Override
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
searchRequestContext.setPhaseSpan(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -503,19 +504,17 @@ private void executeRequest(
final List<SearchRequestOperationsListener> searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider);
SearchRequestContext searchRequestContext = new SearchRequestContext(
originalSearchRequest,
new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger)
new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger),
tracer
);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
Span requestSpan;

if (FeatureFlags.isEnabled(TELEMETRY)) {
requestSpan = tracer.startSpan(SpanBuilder.from("coordinatorRequest"));
originalListener = TraceableActionListener.create(originalListener, requestSpan, tracer);
searchRequestContext.setRequestSpan(requestSpan);
}
Span requestSpan = tracer.startSpan(SpanBuilder.from("coordReq"));
originalListener = TraceableActionListener.create(originalListener, requestSpan, tracer);
searchRequestContext.setRequestSpan(requestSpan);

try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
Expand Down Expand Up @@ -552,7 +551,9 @@ private void executeRequest(
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
try (SpanScope spanScope = tracer.withSpanInScope(requestSpan)) {
searchRequest.transformRequest(requestTransformListener);
}
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(
Expand Down
57 changes: 31 additions & 26 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -139,6 +138,7 @@
import org.opensearch.search.suggest.completion.CompletionSuggestion;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -163,7 +163,6 @@
import static org.opensearch.common.unit.TimeValue.timeValueHours;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.common.util.FeatureFlags.TELEMETRY;

/**
* The main search service
Expand Down Expand Up @@ -615,7 +614,10 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
SearchContext context = createContext(readerContext, request, task, true)
) {
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer)) {
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer);
SpanScope spanScope = tracer.withSpanInScope(executor.getQuerySpan())
) {
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
Expand Down Expand Up @@ -646,7 +648,10 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer, true, afterQueryTime)) {
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer, true, afterQueryTime);
SpanScope spanScope = tracer.withSpanInScope(executor.getFetchSpan())
) {
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (reader.singleSession()) {
Expand Down Expand Up @@ -675,7 +680,8 @@ public void executeQueryPhase(
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer);
SpanScope spanScope = tracer.withSpanInScope(executor.getQuerySpan())
) {
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
Expand All @@ -699,7 +705,8 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
readerContext.setAggregatedDfs(request.dfs());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer);
SpanScope spanScope = tracer.withSpanInScope(executor.getQuerySpan())
) {
searchContext.searcher().setAggregatedDfs(request.dfs());
queryPhase.execute(searchContext);
Expand Down Expand Up @@ -754,7 +761,8 @@ public void executeFetchPhase(
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer);
SpanScope spanScope = tracer.withSpanInScope(executor.getQuerySpan())
) {
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
Expand Down Expand Up @@ -790,7 +798,8 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
tracer,
true,
System.nanoTime()
)
);
SpanScope spanScope = tracer.withSpanInScope(executor.getFetchSpan())
) {
fetchPhase.execute(searchContext);
if (readerContext.singleSession()) {
Expand Down Expand Up @@ -1756,14 +1765,10 @@ private static final class SearchOperationListenerExecutor implements AutoClosea
this.fetch = fetch;
if (fetch) {
listener.onPreFetchPhase(context);
if (FeatureFlags.isEnabled(TELEMETRY)) {
fetchSpan = tracer.startSpan(SpanBuilder.from("shardFetch", context));
}
fetchSpan = tracer.startSpan(SpanBuilder.from("shardFetch", context));
} else {
listener.onPreQueryPhase(context);
if (FeatureFlags.isEnabled(TELEMETRY)) {
querySpan = tracer.startSpan(SpanBuilder.from("shardQuery", context));
}
querySpan = tracer.startSpan(SpanBuilder.from("shardQuery", context));
}
}

Expand All @@ -1779,29 +1784,29 @@ public void close() {
if (afterQueryTime != -1) {
if (fetch) {
listener.onFetchPhase(context, afterQueryTime - time);
if (fetchSpan != null) {
fetchSpan.endSpan();
}
fetchSpan.endSpan();
} else {
listener.onQueryPhase(context, afterQueryTime - time);
if (querySpan != null) {
querySpan.endSpan();
}
querySpan.endSpan();
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
if (fetchSpan != null) {
fetchSpan.endSpan();
}
fetchSpan.endSpan();
} else {
listener.onFailedQueryPhase(context);
if (querySpan != null) {
querySpan.endSpan();
}
querySpan.endSpan();
}
}
}
}

Span getQuerySpan() {
return querySpan;
}

Span getFetchSpan() {
return fetchSpan;
}
}
}

0 comments on commit 8d9e4a1

Please sign in to comment.