From d263cacd6fe04d71f1d90abec382fa959950a3fd Mon Sep 17 00:00:00 2001 From: David Zane Date: Tue, 6 Feb 2024 14:11:29 -0800 Subject: [PATCH] Tracing for deep search path Signed-off-by: David Zane --- CHANGELOG.md | 1 + .../telemetry/tracing/DefaultTracer.java | 2 +- .../core/listener/QueryInsightsListener.java | 4 +- .../search/AbstractSearchAsyncAction.java | 9 +- .../action/search/SearchRequestContext.java | 11 ++- .../SearchRequestOperationsListener.java | 12 +-- .../action/search/SearchRequestSlowLog.java | 4 +- .../action/search/SearchRequestStats.java | 4 +- .../action/search/TransportSearchAction.java | 32 ++++++- .../telemetry/tracing/AttributeNames.java | 15 ++++ .../telemetry/tracing/SpanBuilder.java | 18 ++++ ...ceableSearchRequestOperationsListener.java | 85 +++++++++++++++++++ .../AbstractSearchAsyncActionTests.java | 21 ++++- ...erationsCompositeListenerFactoryTests.java | 4 +- ...earchRequestOperationsListenerSupport.java | 8 +- .../SearchRequestOperationsListenerTests.java | 13 ++- .../search/SearchRequestStatsTests.java | 48 +++++++++-- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../telemetry/tracing/SpanBuilderTests.java | 19 +++++ 19 files changed, 270 insertions(+), 43 deletions(-) create mode 100644 server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java diff --git a/CHANGELOG.md b/CHANGELOG.md index da03b2e4430a6..4a908d1b27007 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887)) - [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028)) +- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index 8f1a26d99e725..64b0df06bb604 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -39,7 +39,7 @@ class DefaultTracer implements Tracer { * @param tracingTelemetry tracing telemetry instance * @param tracerContextStorage storage used for storing current span context */ - public DefaultTracer(TracingTelemetry tracingTelemetry, TracerContextStorage tracerContextStorage) { + DefaultTracer(TracingTelemetry tracingTelemetry, TracerContextStorage tracerContextStorage) { this.tracingTelemetry = tracingTelemetry; this.tracerContextStorage = tracerContextStorage; } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 705273f52a567..c56805ac21601 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -110,13 +110,13 @@ public boolean isEnabled() { } @Override - public void onPhaseStart(SearchPhaseContext context) {} + public void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - public void onPhaseFailure(SearchPhaseContext context) {} + public void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override public void onRequestStart(SearchRequestContext searchRequestContext) {} diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 3c27d3ce59e4c..f28221686945f 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -220,6 +220,7 @@ public final void start() { null ) ); + onRequestEnd(searchRequestContext); return; } executePhase(this); @@ -439,10 +440,10 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) { } } - void onPhaseStart(SearchPhase phase) { + void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) { setCurrentPhase(phase); if (SearchPhaseName.isValidName(phase.getName())) { - this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext); } } @@ -452,7 +453,7 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) { private void executePhase(SearchPhase phase) { try { - onPhaseStart(phase); + onPhaseStart(phase, searchRequestContext); phase.recordAndRun(); } catch (Exception e) { if (logger.isDebugEnabled()) { @@ -717,7 +718,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { if (SearchPhaseName.isValidName(phase.getName())) { - this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext); } raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 383d9b5e82fe2..e3eb33bd06fd1 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -23,20 +23,19 @@ */ @InternalApi public class SearchRequestContext { + private final SearchRequest searchRequest; private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; private TotalHits totalHits; private final EnumMap shardStats; - private final SearchRequest searchRequest; - - SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) { + SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest) { this.searchRequestOperationsListener = searchRequestOperationsListener; + this.searchRequest = searchRequest; this.absoluteStartNanos = System.nanoTime(); this.phaseTookMap = new HashMap<>(); this.shardStats = new EnumMap<>(ShardStatsFieldNames.class); - this.searchRequest = searchRequest; } SearchRequestOperationsListener getSearchRequestOperationsListener() { @@ -78,7 +77,7 @@ void setTotalHits(TotalHits totalHits) { this.totalHits = totalHits; } - TotalHits totalHits() { + public TotalHits totalHits() { return totalHits; } @@ -89,7 +88,7 @@ void setShardStats(int total, int successful, int skipped, int failed) { this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed); } - String formattedShardStats() { + public String formattedShardStats() { if (shardStats.isEmpty()) { return ""; } else { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 2acb35af667f0..6ee882aa54562 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -31,11 +31,11 @@ protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } - protected abstract void onPhaseStart(SearchPhaseContext context); + protected abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext); protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - protected abstract void onPhaseFailure(SearchPhaseContext context); + protected abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext); protected void onRequestStart(SearchRequestContext searchRequestContext) {} @@ -69,10 +69,10 @@ static final class CompositeListener extends SearchRequestOperationsListener { } @Override - protected void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onPhaseStart(context); + listener.onPhaseStart(context, searchRequestContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); } @@ -91,10 +91,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onPhaseFailure(context); + listener.onPhaseFailure(context, searchRequestContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 74e04d976cb1c..56efcbef83f66 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -134,13 +134,13 @@ public SearchRequestSlowLog(ClusterService clusterService) { } @Override - protected void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - protected void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override protected void onRequestStart(SearchRequestContext searchRequestContext) {} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index ac32b08afb7f6..638142461135e 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -58,7 +58,7 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) { } @Override - protected void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 79e599ec9387b..dd6f870c04c7d 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -88,6 +88,12 @@ import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; +import org.opensearch.telemetry.tracing.listener.TraceableSearchRequestOperationsListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterAware; import org.opensearch.transport.RemoteClusterService; @@ -173,6 +179,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -215,6 +223,7 @@ public TransportSearchAction( this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory; clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); + this.tracer = tracer; } private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) { @@ -431,13 +440,26 @@ private void executeRequest( if (originalSearchRequest.isPhaseTook() == null) { originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED)); } - SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory - .buildCompositeListener(originalSearchRequest, logger); + SearchRequestOperationsListener.CompositeListener requestOperationsListeners; + TraceableSearchRequestOperationsListener traceableSearchRequestOperationsListener = null; + if (tracer.isRecording()) { + traceableSearchRequestOperationsListener = new TraceableSearchRequestOperationsListener(tracer); + requestOperationsListeners = searchRequestOperationsCompositeListenerFactory + .buildCompositeListener(originalSearchRequest, logger, traceableSearchRequestOperationsListener); + } else { + requestOperationsListeners = searchRequestOperationsCompositeListenerFactory + .buildCompositeListener(originalSearchRequest, logger); + } SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; ActionListener listener; + + if (tracer.isRecording() && traceableSearchRequestOperationsListener != null) { + originalListener = TraceableActionListener.create(originalListener, traceableSearchRequestOperationsListener.getRequestSpan(), tracer); + } + try { searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); listener = searchRequest.transformResponseListener(originalListener); @@ -473,7 +495,9 @@ private void executeRequest( ); } }, listener::onFailure); - searchRequest.transformRequest(requestTransformListener); + try (SpanScope spanScope = tracer.withSpanInScope(traceableSearchRequestOperationsListener.getRequestSpan())) { + searchRequest.transformRequest(requestTransformListener); + } } private ActionListener buildRewriteListener( diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index b6b2cf360d1c5..024b74ab6d2d0 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -94,4 +94,19 @@ private AttributeNames() { * Refresh Policy */ public static final String REFRESH_POLICY = "refresh_policy"; + + /** + * Search Request Source + */ + public static final String SOURCE = "source"; + + /** + * Search Response Shard Stats + */ + public static final String SHARDS = "shards"; + + /** + * Search Response Total Hits + */ + public static final String TOTAL_HITS = "total_hits"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 1dce422943b7a..16071479bba23 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -42,6 +42,15 @@ private SpanBuilder() { } + /** + * Creates {@link SpanCreationContext} from String + * @param spanName String. + * @return context. + */ + public static SpanCreationContext from(String spanName) { + return SpanCreationContext.server().name(spanName); + } + /** * Creates {@link SpanCreationContext} from the {@link HttpRequest} * @param request Http request. @@ -170,4 +179,13 @@ private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequ return attributes; } + /** + * Creates {@link SpanCreationContext} with parent set to specified SpanContext. + * @param spanName name of span. + * @param parentSpan target parent span. + * @return context + */ + public static SpanCreationContext from(String spanName, SpanContext parentSpan) { + return SpanCreationContext.server().name(spanName).parent(parentSpan); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java new file mode 100644 index 0000000000000..ec4e4cb156c3d --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.telemetry.tracing.AttributeNames; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanContext; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.noop.NoopSpan; + +import static org.opensearch.core.common.Strings.capitalize; + +/** + * SearchRequestOperationsListener subscriber for search request tracing + * + * @opensearch.internal + */ +public final class TraceableSearchRequestOperationsListener extends SearchRequestOperationsListener { + private final Tracer tracer; + private Span requestSpan; + private Span phaseSpan; + private SpanScope phaseSpanScope; + + public TraceableSearchRequestOperationsListener(Tracer tracer) { + this.tracer = tracer; + this.requestSpan = NoopSpan.INSTANCE; + this.phaseSpan = NoopSpan.INSTANCE; + } + + @Override + protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + phaseSpan = tracer.startSpan( + SpanBuilder.from( + "coord" + capitalize(context.getCurrentPhase().getName()), + new SpanContext(requestSpan) + ) + ); + phaseSpanScope = tracer.withSpanInScope(phaseSpan); + } + + @Override + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + phaseSpan.endSpan(); + phaseSpanScope.close(); + } + + @Override + protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + phaseSpan.endSpan(); + phaseSpanScope.close(); + } + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) { + requestSpan = tracer.startSpan(SpanBuilder.from("coordReq")); + } + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + // add response-related attributes on request end + requestSpan.addAttribute( + AttributeNames.TOTAL_HITS, + searchRequestContext.totalHits() == null ? 0 : searchRequestContext.totalHits().value + ); + requestSpan.addAttribute( + AttributeNames.SHARDS, + searchRequestContext.formattedShardStats().isEmpty() ? "null" : searchRequestContext.formattedShardStats() + ); + } + + public Span getRequestSpan() { + return requestSpan; + } +} diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index a7cbbffc51ed4..5aa04ef04db26 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -420,17 +420,32 @@ public void testOnPhaseStart() { action.onPhaseStart(new SearchPhase("test") { @Override public void run() {} - }); + }, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); action.onPhaseStart(new SearchPhase("none") { @Override public void run() {} - }); + }, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); action.onPhaseStart(new SearchPhase(action.getName()) { @Override public void run() {} - }); + }, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 1cb336e18b12c..88737148c2738 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -119,13 +119,13 @@ public void testStandardListenerAndPerRequestListenerDisabled() { public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { return new SearchRequestOperationsListener() { @Override - protected void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - protected void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} }; } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java index 0f737e00478cb..e13b55b1eab24 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java @@ -17,7 +17,13 @@ */ public interface SearchRequestOperationsListenerSupport { default void onPhaseStart(SearchRequestOperationsListener listener, SearchPhaseContext context) { - listener.onPhaseStart(context); + listener.onPhaseStart( + context, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); } default void onPhaseEnd(SearchRequestOperationsListener listener, SearchPhaseContext context) { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java index a53c35a8401b3..00403692e4513 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -8,6 +8,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -29,7 +30,7 @@ public void testListenersAreExecuted() { SearchRequestOperationsListener testListener = new SearchRequestOperationsListener() { @Override - public void onPhaseStart(SearchPhaseContext context) { + public void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @@ -40,7 +41,7 @@ public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRe } @Override - public void onPhaseFailure(SearchPhaseContext context) { + public void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } }; @@ -62,7 +63,13 @@ public void onPhaseFailure(SearchPhaseContext context) { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(ctx.getCurrentPhase()).thenReturn(searchPhase); when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - compositeListener.onPhaseStart(ctx); + compositeListener.onPhaseStart( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count()); } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 377ccebbfd418..555be465dfed1 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -34,9 +34,21 @@ public void testSearchRequestPhaseFailure() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseStart( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseFailure( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); } } @@ -52,7 +64,13 @@ public void testSearchRequestStats() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); long tookTimeInMillis = randomIntBetween(1, 10); - testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseStart( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); @@ -84,7 +102,13 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseStart( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); countDownLatch.countDown(); }); threads[i].start(); @@ -155,8 +179,20 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseStart( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); + testRequestStats.onPhaseFailure( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); countDownLatch.countDown(); }); threads[i].start(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 7c50e961853b5..4b43b5f9c3ff5 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2310,7 +2310,8 @@ public void onFailure(final Exception e) { client ), NoopMetricsRegistry.INSTANCE, - searchRequestOperationsCompositeListenerFactory + searchRequestOperationsCompositeListenerFactory, + NoopTracer.INSTANCE ) ); actions.put( diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java index b4183412cdf02..d1ead0e25ef8a 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java @@ -19,6 +19,7 @@ import org.opensearch.http.HttpResponse; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.telemetry.tracing.noop.NoopSpan; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportException; @@ -32,6 +33,14 @@ public class SpanBuilderTests extends OpenSearchTestCase { + public void testString() { + String spanName = "test-name"; + SpanCreationContext context = SpanBuilder.from(spanName); + Attributes attributes = context.getAttributes(); + assertEquals(spanName, context.getSpanName()); + assertNull(attributes); + } + public void testHttpRequestContext() { HttpRequest httpRequest = createHttpRequest(); SpanCreationContext context = SpanBuilder.from(httpRequest); @@ -67,6 +76,16 @@ public void testTransportContext() { assertEquals(connection.getNode().getHostAddress(), attributes.getAttributesMap().get(AttributeNames.TRANSPORT_TARGET_HOST)); } + public void testParentSpan() { + String spanName = "test-name"; + SpanContext parentSpanContext = new SpanContext(NoopSpan.INSTANCE); + SpanCreationContext context = SpanBuilder.from(spanName, parentSpanContext); + Attributes attributes = context.getAttributes(); + assertNull(attributes); + assertEquals(spanName, context.getSpanName()); + assertEquals(parentSpanContext, context.getParent()); + } + private static Transport.Connection createTransportConnection() { return new Transport.Connection() { @Override