Skip to content

Commit

Permalink
Fix Doubled Enqueue Metrics (#30)
Browse files Browse the repository at this point in the history
* fix metrics

* update README

* bump version
  • Loading branch information
orioldsm authored Aug 27, 2024
1 parent e64bf1b commit b3d3437
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 63 deletions.
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ For each job, the following metrics will be reported:
4. **shared.sidekiq._queue_._job_.error**: counter incremented each time a
job fails.

For job retry attempts, the above 4 metrics will still be reported but the enqueue/dequeue metrics
will have a `.retry` appended:

1. **shared.sidekiq._queue_._job_.enqueue.retry**
2. **shared.sidekiq._queue_._job_.dequeue.retry**

The metric names can be changed by overriding the `statsd_metric_name`
method in your worker classes.

Expand All @@ -92,7 +98,7 @@ For each queue, the following metrics will be reported:
2. **shared.sidekiq._queue_.latency**: gauge of how long the oldest job has been in the queue

For each worker, the following metrics and tags will be reported:
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement (**this metric is currently inaccurate**)

## DogStatsD Keys
For each job, the following metrics and tags will be reported:
Expand All @@ -106,14 +112,23 @@ For each job, the following metrics and tags will be reported:
4. **sidekiq.error (tags: {queue: _queue_, worker: _job_})**: counter incremented each time a
job fails.

For job retry attempts, the above 4 metrics will still be reported but the enqueue/dequeue metrics
will have a `.retry` appended:

1. **sidekiq.enqueue.retry (tags: {queue: _queue_, worker: _job_})**
2. **sidekiq.dequeue.retry (tags: {queue: _queue_, worker: _job_})**

For each queue, the following metrics and tags will be reported:
1. **sidekiq.queue.size (tags: {queue: _queue_})**: gauge of how many jobs are in the queue
2. **sidekiq.queue.latency (tags: {queue: _queue_})**: gauge of how long the oldest job has been in the queue

For each worker, the following metrics and tags will be reported:
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement (**this metric is currently inaccurate**)

## Worker

**WARNING: The metrics reported by this Worker are currently inaccurate.**

There is a worker, `Sidekiq::Instrument::Worker`, that submits gauges
for various interesting statistics; namely, the bulk of the information in `Sidekiq::Stats`
and the sizes of each individual queue. While the worker class is a fully valid Sidekiq worker,
Expand Down
20 changes: 14 additions & 6 deletions lib/sidekiq/instrument/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@ module Sidekiq::Instrument
class ClientMiddleware
include Sidekiq::Instrument::MetricNames

def call(worker_class, job, queue, redis_pool)
def call(worker_class, job, queue, _redis_pool)
# worker_class is a const in sidekiq >= 6.x
klass = Object.const_get(worker_class.to_s)
class_instance = klass.new
Statter.statsd.increment(metric_name(class_instance, 'enqueue'))
Statter.dogstatsd&.increment('sidekiq.enqueue', worker_dog_options(class_instance))
WorkerMetrics.trace_workers_increment_counter(klass.name.underscore, redis_pool)
result = yield

# This is needed because the ClientMiddleware is called twice for scheduled jobs
# - Once when it gets scheduled
# - Once when it gets dequeued for processing
# We only want to increment the enqueue metric when the job is scheduled and
# Sidekiq::Context.current[:class] is only ever set when the job is scheduled
if Sidekiq::Context.current[:class].present?
WorkerMetrics.trace_workers_increment_counter(klass.name.underscore)
Statter.statsd.increment(metric_name(class_instance, 'enqueue'))
Statter.dogstatsd&.increment('sidekiq.enqueue', worker_dog_options(class_instance))
end

Statter.dogstatsd&.flush(sync: true)
result
yield
end
end
end
24 changes: 21 additions & 3 deletions lib/sidekiq/instrument/middleware/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,40 @@ module Sidekiq::Instrument
class ServerMiddleware
include Sidekiq::Instrument::MetricNames

def call(worker, _job, _queue, &block)
Statter.statsd.increment(metric_name(worker, 'dequeue'))
Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker))
def call(worker, job, _queue, &block)
dequeue_string = is_retry(job) ? 'dequeue.retry' : 'dequeue'
Statter.dogstatsd&.increment("sidekiq.#{dequeue_string}", worker_dog_options(worker))
Statter.statsd.increment(metric_name(worker, dequeue_string))

start_time = Time.now
yield block
execution_time_ms = (Time.now - start_time) * 1000
Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms)
Statter.dogstatsd&.timing('sidekiq.runtime', execution_time_ms, worker_dog_options(worker))
rescue StandardError => e
# if we have retries left, increment the enqueue.retry counter to indicate the job is going back on the queue
if max_retries(worker) > current_retries(job) + 1
WorkerMetrics.trace_workers_increment_counter(worker.class.to_s.underscore)
Statter.dogstatsd&.increment('sidekiq.enqueue.retry', worker_dog_options(worker))
end

Statter.statsd.increment(metric_name(worker, 'error'))
Statter.dogstatsd&.increment('sidekiq.error', worker_dog_options(worker))
raise e
ensure
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
Statter.dogstatsd&.flush(sync: true)
end

private

# returns -1 if no retries have been attempted
def current_retries(job)
job["retry_count"] || -1
end

def is_retry(job)
current_retries(job) >= 0
end
end
end
7 changes: 7 additions & 0 deletions lib/sidekiq/instrument/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ def worker_dog_options(worker)
{ tags: ["queue:#{queue_name(worker)}", "worker:#{underscore(class_name(worker))}"] }
end

def max_retries(worker)
retries = worker.class.get_sidekiq_options['retry'] || Sidekiq[:max_retries]
return Sidekiq[:max_retries] if retries.to_s.eql?("true")
return 0 if retries.eql?("false")
retries
end

private

def queue_name(worker)
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/instrument/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Sidekiq
module Instrument
VERSION = '0.6.2'
VERSION = '0.7.0'
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq/instrument/worker_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class WorkerMetrics
class_attribute :enabled, :namespace

class << self
def trace_workers_increment_counter(klass_name, sidekiq_redis_pool_user)
def trace_workers_increment_counter(klass_name)
return unless enabled?

Sidekiq.redis do |redis|
Expand Down
98 changes: 69 additions & 29 deletions spec/sidekiq-instrument/client_middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,51 +28,91 @@
end
end

context 'without statsd_metric_name' do
it 'increments the StatsD enqueue counter' do
expect do
context 'with Sidekiq::Context.current[:class] (job being enqueued)' do
before do
Sidekiq::Context.current[:class] = 'MyWorker'
end

context 'without statsd_metric_name' do
it 'increments the StatsD enqueue counter' do
expect do
MyWorker.perform_async
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end

it 'increments the DogStatsD enqueue counter' do
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] }).once
MyWorker.perform_async
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end
end

it 'increments the DogStatsD enqueue counter' do
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] }).once
MyWorker.perform_async
context 'with statsd_metric_name' do
it 'does the enqueue counter' do
expect do
MyOtherWorker.perform_async
end.to trigger_statsd_increment('my_other_worker.enqueue')
end
end
end

context 'with statsd_metric_name' do
it 'increments the enqueue counter' do
expect do
context 'with WorkerMetrics.enabled true' do
it 'increments the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
end.to trigger_statsd_increment('my_other_worker.enqueue')
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('1')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('2')
end
end
end

context 'with WorkerMetrics.enabled true' do
it 'increments the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('1')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('2')
context 'without optional DogStatsD client' do
before do
@tmp = Sidekiq::Instrument::Statter.dogstatsd
Sidekiq::Instrument::Statter.dogstatsd = nil
end

after do
Sidekiq::Instrument::Statter.dogstatsd = @tmp
end

it 'does not error' do
expect { MyWorker.perform_async }.not_to raise_error
end
end
end

context 'without optional DogStatsD client' do
context 'without the Sidekiq::Context.current[:class] (job being dequeued)' do
before do
@tmp = Sidekiq::Instrument::Statter.dogstatsd
Sidekiq::Instrument::Statter.dogstatsd = nil
Sidekiq::Context.current[:class] = nil
end

it 'does not increment the StatsD enqueue counter' do
expect do
MyWorker.perform_async
end.not_to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end

after do
Sidekiq::Instrument::Statter.dogstatsd = @tmp
it 'does not increment the DogStatsD enqueue counter' do
expect(
Sidekiq::Instrument::Statter.dogstatsd
).not_to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] })
MyWorker.perform_async
end

it 'does not error' do
expect { MyWorker.perform_async }.not_to raise_error
context 'with WorkerMetrics.enabled true' do
before do
Redis.new.flushall
Redis.new.hset(worker_metric_name, 'my_other_worker', 0)
end

it 'does not increment the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('0')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('0')
end
end
end
end
Expand Down
Loading

0 comments on commit b3d3437

Please sign in to comment.