Skip to content

Commit

Permalink
Merge branch 'main' into action-pack-instrument-notification
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin authored Nov 22, 2023
2 parents 13b349c + fb31945 commit ec10392
Show file tree
Hide file tree
Showing 22 changed files with 1,077 additions and 560 deletions.
58 changes: 58 additions & 0 deletions instrumentation/active_job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,64 @@ OpenTelemetry::SDK.configure do |c|
end
```

## Active Support Instrumentation

Earlier versions of this instrumentation relied on registering custom `around_perform` hooks in order to deal with limitations
in `ActiveSupport::Notifications`, however those patches resulted in error reports and inconsistent behavior when combined with other gems.

This instrumentation now relies entirely on `ActiveSupport::Notifications` and registers a custom Subscriber that listens to relevant events to report as spans.

See the table below for details of what [Rails Framework Hook Events](https://guides.rubyonrails.org/active_support_instrumentation.html#active-job) are recorded by this instrumentation:

| Event Name | Creates Span? | Notes |
| - | - | - |
| `enqueue_at.active_job` | :white_check_mark: | Creates an egress span with kind `producer` |
| `enqueue.active_job` | :white_check_mark: | Creates an egress span with kind `producer` |
| `enqueue_retry.active_job` | :white_check_mark: | Creates an `internal` span |
| `perform_start.active_job` | :x: | This is invoked prior to the appropriate ingress point and is therefore ignored |
| `perform.active_job` | :white_check_mark: | Creates an ingress span with kind `consumer` |
| `retry_stopped.active_job` | :white_check_mark: | Creates and `internal` span with an `exception` event |
| `discard.active_job` | :white_check_mark: | Creates and `internal` span with an `exception` event |

## Semantic Conventions

This instrumentation generally uses [Messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) by treating job enqueuers as `producers` and workers as `consumers`.

Internal spans are named using the name of the `ActiveSupport` event that was provided.

Attributes that are specific to this instrumentation are recorded under `messaging.active_job.*`:

| Attribute Name | Type | Notes |
| - | - | - |
| `code.namespace` | String | `ActiveJob` class name |
| `messaging.system` | String | Static value set to `active_job` |
| `messaging.destination` | String | Set from `ActiveJob#queue_name` |
| `messaging.message.id` | String | Set from `ActiveJob#job_id` |
| `messaging.active_job.adapter.name` | String | The name of the `ActiveJob` adapter implementation |
| `messaging.active_job.message.priority` | String | Present when set by the client from `ActiveJob#priority` |
| `messaging.active_job.message.provider_job_id` | String | Present if the underlying adapter has backend specific message ids |

## Differences between ActiveJob versions

### ActiveJob 6.1

`perform.active_job` events do not include timings for `ActiveJob` callbacks therefore time spent in `before` and `after` hooks will be missing

### ActiveJob 7+

`perform.active_job` no longer includes exceptions handled using `rescue_from` in the payload.

In order to preserve this behavior you will have to update the span yourself, e.g.

```ruby
rescue_from MyCustomError do |e|
# Custom code to handle the error
span = OpenTelemetry::Instrumentation::ActiveJob.current_span
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error('Job failed')
end
```

## Examples

Example usage can be seen in the `./example/active_job.rb` file [here](https://github.com/open-telemetry/opentelemetry-ruby-contrib/blob/main/instrumentation/active_job/example/active_job.rb)
Expand Down
8 changes: 0 additions & 8 deletions instrumentation/active_job/example/Gemfile

This file was deleted.

99 changes: 92 additions & 7 deletions instrumentation/active_job/example/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,115 @@
#
# SPDX-License-Identifier: Apache-2.0

ENV['OTEL_SERVICE_NAME'] ||= 'otel-active-job-demo'
require 'rubygems'
require 'bundler/setup'
require 'active_job'
require 'bundler/inline'

Bundler.require
gemfile do
source 'https://rubygems.org'
gem 'activejob', '~> 7.0.0', require: 'active_job'
gem 'opentelemetry-instrumentation-active_job', path: '../'
gem 'opentelemetry-sdk'
gem 'opentelemetry-exporter-otlp'
end

ENV['OTEL_LOG_LEVEL'] ||= 'fatal'
ENV['OTEL_TRACES_EXPORTER'] ||= 'console'
OpenTelemetry::SDK.configure do |c|
c.use 'OpenTelemetry::Instrumentation::ActiveJob'
at_exit { OpenTelemetry.tracer_provider.shutdown }
end

class FailingJob < ::ActiveJob::Base
queue_as :demo
def perform
raise 'this job failed'
end
end

class FailingRetryJob < ::ActiveJob::Base
queue_as :demo

retry_on StandardError, attempts: 2, wait: 0
def perform
raise 'this job failed'
end
end

class RetryJob < ::ActiveJob::Base
queue_as :demo

retry_on StandardError, attempts: 3, wait: 0
def perform
if executions < 3
raise 'this job failed'
else
puts <<~EOS
--------------------------------------------------
Done Retrying!
--------------------------------------------------
EOS
end
end
end

class DiscardJob < ::ActiveJob::Base
queue_as :demo

class DiscardError < StandardError; end

discard_on DiscardError

def perform
raise DiscardError, 'this job failed'
end
end

EXAMPLE_TRACER = OpenTelemetry.tracer_provider.tracer('activejob-example', '1.0')

class TestJob < ::ActiveJob::Base
def perform
puts <<~EOS
EXAMPLE_TRACER.in_span("custom span") do
puts <<~EOS
--------------------------------------------------
The computer is doing some work, beep beep boop.
--------------------------------------------------
EOS
end
end
end

class DoItNowJob < ::ActiveJob::Base
def perform
$stderr.puts <<~EOS
--------------------------------------------------
The computer is doing some work, beep beep boop.
Called with perform_now!
--------------------------------------------------
EOS
end
end

class BatchJob < ::ActiveJob::Base
def perform
TestJob.perform_later
FailingJob.perform_later
FailingRetryJob.perform_later
RetryJob.perform_later
DiscardJob.perform_later
end
end

::ActiveJob::Base.queue_adapter = :async

TestJob.perform_later
sleep 0.1 # To ensure we see both spans!
EXAMPLE_TRACER.in_span('run-jobs') do
DoItNowJob.perform_now
BatchJob.perform_later
end

sleep 5 # allow the job to complete
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,39 @@ module OpenTelemetry
module Instrumentation
# Contains the OpenTelemetry instrumentation for the ActiveJob gem
module ActiveJob
extend self

CURRENT_SPAN_KEY = Context.create_key('current-span')
private_constant :CURRENT_SPAN_KEY

# Returns the current span from the current or provided context
#
# @param [optional Context] context The context to lookup the current
# {Span} from. Defaults to Context.current
def current_span(context = nil)
context ||= Context.current
context.value(CURRENT_SPAN_KEY) || OpenTelemetry::Trace::Span::INVALID
end

# Returns a context containing the span, derived from the optional parent
# context, or the current context if one was not provided.
#
# @param [optional Context] context The context to use as the parent for
# the returned context
def context_with_span(span, parent_context: Context.current)
parent_context.set_value(CURRENT_SPAN_KEY, span)
end

# Activates/deactivates the Span within the current Context, which makes the "current span"
# available implicitly.
#
# On exit, the Span that was active before calling this method will be reactivated.
#
# @param [Span] span the span to activate
# @yield [span, context] yields span and a context containing the span to the block.
def with_span(span)
Context.with_value(CURRENT_SPAN_KEY, span) { |c, s| yield s, c }
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'mappers/attribute'
require_relative 'handlers/default'
require_relative 'handlers/enqueue'
require_relative 'handlers/perform'

module OpenTelemetry
module Instrumentation
module ActiveJob
# Module that contains custom event handlers, which are used to generate spans per event
module Handlers
module_function

# Subscribes Event Handlers to relevant ActiveJob notifications
#
# The following events are recorded as spans:
# - enqueue
# - enqueue_at
# - enqueue_retry
# - perform
# - retry_stopped
# - discard
#
# Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span,
# while internal spans keep their ActiveSupport event name.
#
# @note this method is not thread safe and should not be used in a multi-threaded context
# @note Why no perform_start?
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
def subscribe
return unless Array(@subscriptions).empty?

mapper = Mappers::Attribute.new
config = ActiveJob::Instrumentation.instance.config
parent_span_provider = OpenTelemetry::Instrumentation::ActiveJob

# TODO, use delegation instead of inheritance
default_handler = Handlers::Default.new(parent_span_provider, mapper, config)
enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config)
perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config)

handlers_by_pattern = {
'enqueue' => enqueue_handler,
'enqueue_at' => enqueue_handler,
'enqueue_retry' => default_handler,
'perform' => perform_handler,
'retry_stopped' => default_handler,
'discard' => default_handler
}

@subscriptions = handlers_by_pattern.map do |key, handler|
::ActiveSupport::Notifications.subscribe("#{key}.active_job", handler)
end
end

# Removes Event Handler Subscriptions for ActiveJob notifications
# @note this method is not thread-safe and should not be used in a multi-threaded context
def unsubscribe
@subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) }
@subscriptions = nil
end
end
end
end
end
Loading

0 comments on commit ec10392

Please sign in to comment.