diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 6ce51ee136861..4a9b07c12821d 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -707,6 +707,10 @@ private void finishHim() { } }, releasable::close), span, tracer) ); + } catch (Exception e) { + span.setError(e); + span.endSpan(); + throw e; } } bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 539fbff83a44a..65358db9b8085 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -204,11 +204,6 @@ public TransportShardBulkAction( ); } - @Override - protected Tracer getTracer() { - return tracer; - } - protected void handlePrimaryTermValidationRequest( final PrimaryTermValidationRequest request, final TransportChannel channel, @@ -420,9 +415,9 @@ protected void dispatchedShardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - Span span = getTracer().startSpan(SpanBuilder.from("shardPrimaryAction", clusterService.localNode().getId(), request)); + Span span = tracer.startSpan(SpanBuilder.from("shardPrimaryAction", clusterService.localNode().getId(), request)); ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); - try (SpanScope spanScope = getTracer().withSpanInScope(span)) { + try (SpanScope spanScope = tracer.withSpanInScope(span)) { performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> { assert update != null; assert shardId != null; @@ -442,7 +437,7 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update")); } - }), TraceableActionListener.create(listener, span, getTracer()), threadPool, executor(primary)); + }), TraceableActionListener.create(listener, span, tracer), threadPool, executor(primary)); } } @@ -818,9 +813,9 @@ static BulkItemResponse processUpdateResponse( @Override protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener listener) { - Span span = getTracer().startSpan(SpanBuilder.from("shardReplicaAction", clusterService.localNode().getId(), request)); - try (SpanScope spanScope = getTracer().withSpanInScope(span)) { - ActionListener.completeWith(TraceableActionListener.create(listener, span, getTracer()), () -> { + Span span = tracer.startSpan(SpanBuilder.from("shardReplicaAction", clusterService.localNode().getId(), request)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + ActionListener.completeWith(TraceableActionListener.create(listener, span, tracer), () -> { final Translog.Location location = performOnReplica(request, replica); return new WriteReplicaResult<>(request, location, null, replica, logger); }); diff --git a/server/src/main/java/org/opensearch/action/support/TransportAction.java b/server/src/main/java/org/opensearch/action/support/TransportAction.java index 1432c762d1bf1..72aae210d61ae 100644 --- a/server/src/main/java/org/opensearch/action/support/TransportAction.java +++ b/server/src/main/java/org/opensearch/action/support/TransportAction.java @@ -46,8 +46,6 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskListener; import org.opensearch.tasks.TaskManager; -import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.util.concurrent.atomic.AtomicInteger; @@ -81,10 +79,6 @@ private Releasable registerChildNode(TaskId parentTask) { } } - protected Tracer getTracer() { - return NoopTracer.INSTANCE; - } - /** * Use this method when the transport action call should result in creation of a new task associated with the call. *

diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index 2f92b69f16960..f2b421a8aa13a 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -83,7 +83,7 @@ private AttributeNames() { /** * Number of request items in bulk request */ - public static final String NUM_BULK_ITEMS = "num_bulk_items"; + public static final String BULK_REQUEST_ITEMS = "bulk_request_items"; /** * Node ID diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index b7baa8ac3e689..81e5665b3886c 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -74,10 +74,6 @@ public static SpanCreationContext from(String spanName, String nodeId, BulkShard return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, bulkShardRequest)); } - public static SpanCreationContext from(String spanName, String nodeId, ShardId shardId) { - return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, shardId)); - } - private static String createSpanName(HttpRequest httpRequest) { return httpRequest.method().name() + SEPARATOR + httpRequest.uri(); } @@ -139,16 +135,13 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio } private static Attributes buildSpanAttributes(String nodeId, BulkShardRequest bulkShardRequest) { - Attributes attributes = buildSpanAttributes(nodeId, bulkShardRequest.shardId()); - attributes.addAttribute(AttributeNames.NUM_BULK_ITEMS, bulkShardRequest.items().length); - return attributes; - } - - private static Attributes buildSpanAttributes(String nodeId, ShardId shardId) { Attributes attributes = Attributes.create() .addAttribute(AttributeNames.NODE_ID, nodeId) - .addAttribute(AttributeNames.INDEX, (shardId != null) ? shardId.getIndexName() : "NULL") - .addAttribute(AttributeNames.SHARD_ID, (shardId != null) ? shardId.getId() : -1); + .addAttribute(AttributeNames.BULK_REQUEST_ITEMS, bulkShardRequest.items().length); + if (bulkShardRequest.shardId() != null) { + attributes.addAttribute(AttributeNames.INDEX, bulkShardRequest.shardId().getIndexName()) + .addAttribute(AttributeNames.SHARD_ID, bulkShardRequest.shardId().getId()); + } return attributes; }