Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Datadog tracing to client, server, & worker instrumentation code #27

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

steps:
- name: Checkout project
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Ruby
uses: ruby/setup-ruby@v1
Expand Down
26 changes: 17 additions & 9 deletions lib/sidekiq/instrument/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,28 @@

require 'sidekiq/instrument/mixin'
require 'active_support/core_ext/string/inflections'
require 'ddtrace'

module Sidekiq::Instrument
class ClientMiddleware
include Sidekiq::Instrument::MetricNames

def call(worker_class, job, queue, redis_pool)
# worker_class is a const in sidekiq >= 6.x
klass = Object.const_get(worker_class.to_s)
class_instance = klass.new
Statter.statsd.increment(metric_name(class_instance, 'enqueue'))
Statter.dogstatsd&.increment('sidekiq.enqueue', worker_dog_options(class_instance))
WorkerMetrics.trace_workers_increment_counter(klass.name.underscore, redis_pool)
result = yield
Statter.dogstatsd&.flush(sync: true)
def call(worker_class, job, _queue, redis_pool)
span = Datadog::Tracing.trace('sidekiq.job.enqueue', service: Datadog.configuration[:service], resource: job['class'])
begin
# worker_class is a const in sidekiq >= 6.x
klass = Object.const_get(worker_class.to_s)
class_instance = klass.new
Statter.statsd.increment(metric_name(class_instance, 'enqueue'))
Statter.dogstatsd&.increment('sidekiq.enqueue', worker_dog_options(class_instance))
WorkerMetrics.trace_workers_increment_counter(klass.name.underscore, redis_pool)
result = yield
rescue StandardError => e
span.set_error(e)
ensure
span.finish
Statter.dogstatsd&.flush(sync: true)
end
result
end
end
Expand Down
36 changes: 21 additions & 15 deletions lib/sidekiq/instrument/middleware/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,33 @@

require 'sidekiq/instrument/mixin'
require 'active_support/core_ext/string/inflections'
require 'ddtrace'

module Sidekiq::Instrument
class ServerMiddleware
include Sidekiq::Instrument::MetricNames

def call(worker, _job, _queue, &block)
Statter.statsd.increment(metric_name(worker, 'dequeue'))
Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker))
def call(worker, job, _queue, &block)
span = Datadog::Tracing.trace('sidekiq.job.dequeue', service: Datadog.configuration[:service], resource: job['class'])
begin
Statter.statsd.increment(metric_name(worker, 'dequeue'))
Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker))

start_time = Time.now
yield block
execution_time_ms = (Time.now - start_time) * 1000
Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms)
Statter.dogstatsd&.timing('sidekiq.runtime', execution_time_ms, worker_dog_options(worker))
rescue StandardError => e
Statter.statsd.increment(metric_name(worker, 'error'))
Statter.dogstatsd&.increment('sidekiq.error', worker_dog_options(worker))
raise e
ensure
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
Statter.dogstatsd&.flush(sync: true)
start_time = Time.now
yield block
execution_time_ms = (Time.now - start_time) * 1000
Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms)
Statter.dogstatsd&.timing('sidekiq.runtime', execution_time_ms, worker_dog_options(worker))
rescue StandardError => e
span.set_error(e)
Statter.statsd.increment(metric_name(worker, 'error'))
Statter.dogstatsd&.increment('sidekiq.error', worker_dog_options(worker))
raise e
ensure
span.finish
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
Statter.dogstatsd&.flush(sync: true)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq/instrument/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Sidekiq
module Instrument
VERSION = '0.6.2'
VERSION = '0.7.0'
end
end
44 changes: 26 additions & 18 deletions lib/sidekiq/instrument/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'sidekiq'
require 'sidekiq/api'
require 'ddtrace'

module Sidekiq::Instrument
class Worker
Expand All @@ -18,35 +19,42 @@ class Worker
}.freeze

def perform
info = Sidekiq::Stats.new
span = Datadog::Tracing.trace('sidekiq.job.perform', service: Datadog.configuration[:service], resource: self.class.to_s)
begin
info = Sidekiq::Stats.new

self.class::METRIC_NAMES.each do |method, stat|
stat ||= method
self.class::METRIC_NAMES.each do |method, stat|
stat ||= method

Statter.statsd.gauge("shared.sidekiq.stats.#{stat}", info.send(method))
Statter.dogstatsd&.gauge("sidekiq.#{stat}", info.send(method))
end
Statter.statsd.gauge("shared.sidekiq.stats.#{stat}", info.send(method))
Statter.dogstatsd&.gauge("sidekiq.#{stat}", info.send(method))
end

working = Sidekiq::Workers.new.count
Statter.statsd.gauge('shared.sidekiq.stats.working', working)
Statter.dogstatsd&.gauge('sidekiq.working', working)
send_worker_metrics
Sidekiq::Queue.all.each do |queue|
Statter.statsd.gauge("shared.sidekiq.#{queue.name}.size", queue.size)
Statter.dogstatsd&.gauge('sidekiq.queue.size', queue.size, tags: dd_tags(queue))
working = Sidekiq::Workers.new.count
Statter.statsd.gauge('shared.sidekiq.stats.working', working)
Statter.dogstatsd&.gauge('sidekiq.working', working)
send_worker_metrics
Sidekiq::Queue.all.each do |queue|
Statter.statsd.gauge("shared.sidekiq.#{queue.name}.size", queue.size)
Statter.dogstatsd&.gauge('sidekiq.queue.size', queue.size, tags: dd_tags(queue))

Statter.statsd.gauge("shared.sidekiq.#{queue.name}.latency", queue.latency)
Statter.dogstatsd&.gauge('sidekiq.queue.latency', queue.latency, tags: dd_tags(queue))
Statter.statsd.gauge("shared.sidekiq.#{queue.name}.latency", queue.latency)
Statter.dogstatsd&.gauge('sidekiq.queue.latency', queue.latency, tags: dd_tags(queue))
end
rescue StandardError => e
span.set_error(e)
raise e
ensure
span.finish
Statter.dogstatsd&.flush(sync: true)
end

Statter.dogstatsd&.flush(sync: true)
end

private

# @param [Sidekiq::Queue] queue used for stats emission
# @return [Array<String>] an array of tags
# @example this method can be override to add more tags
# @example this method can be overridden to add more tags
# class MyStatsWorker < Sidekiq::Instrument::Worker
# private
#
Expand Down
1 change: 1 addition & 0 deletions sidekiq-instrument.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Gem::Specification.new do |spec|

spec.add_dependency 'sidekiq', '>= 4.2', '< 7'
spec.add_dependency 'statsd-instrument', '>= 2.0.4'
spec.add_dependency 'ddtrace', '~> 1.0'
spec.add_dependency 'dogstatsd-ruby', '~> 5.5'
spec.add_dependency 'activesupport', '>= 5.1', '< 7'
spec.add_dependency "redis-client", ">= 0.14.1"
Expand Down
Loading