Skip to content

Commit

Permalink
Add stats for Workers in queue (#21)
Browse files Browse the repository at this point in the history
* Add stats for Workers in queue

* review

* redis_version update

* specify host port

* upgrade workflow step

* more specs

* execute tests only for redis 7

* fix reset_counters

---------

Co-authored-by: Manoj Naduviledath <[email protected]>
  • Loading branch information
naduvima and Manoj Naduviledath authored Apr 3, 2023
1 parent c39b255 commit a069402
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 49 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
ruby-version: [2.6, 2.7, 3.0]
redis-version: [4, 5, 6]
redis-version: [7]

steps:
- name: Checkout project
Expand All @@ -28,7 +28,7 @@ jobs:
bundler-cache: true # runs 'bundle install' and caches installed gems automatically

- name: Start Redis
uses: supercharge/redis-github-action@1.4.0
uses: supercharge/redis-github-action@1.5.0
with:
redis-version: ${{ matrix.redis-version }}

Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq/instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "sidekiq/instrument/worker"
require "sidekiq/instrument/middleware/client"
require "sidekiq/instrument/middleware/server"
require "sidekiq/instrument/worker_metrics"

module Sidekiq
module Instrument
Expand Down
4 changes: 4 additions & 0 deletions lib/sidekiq/instrument/middleware/client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# frozen_string_literal: true

require 'sidekiq/instrument/mixin'
require 'active_support/core_ext/string/inflections'

module Sidekiq::Instrument
class ClientMiddleware
Expand All @@ -10,6 +13,7 @@ def call(worker_class, job, queue, redis_pool)
class_instance = klass.new
Statter.statsd.increment(metric_name(class_instance, 'enqueue'))
Statter.dogstatsd&.increment('sidekiq.enqueue', worker_dog_options(class_instance))
WorkerMetrics.trace_workers_increment_counter(klass.name.underscore, redis_pool)
result = yield
Statter.dogstatsd&.flush(sync: true)
result
Expand Down
14 changes: 14 additions & 0 deletions lib/sidekiq/instrument/middleware/instrument.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
require "active_support/core_ext/class/attribute"
require "statsd/instrument"

require "sidekiq/instrument/statter"
require "sidekiq/instrument/version"
require "sidekiq/instrument/worker"
require "sidekiq/instrument/middleware/client"
require "sidekiq/instrument/middleware/server"
require "sidekiq/instrument/worker_metrics"

module Sidekiq
module Instrument
end
end
5 changes: 4 additions & 1 deletion lib/sidekiq/instrument/middleware/server.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# frozen_string_literal: true

require 'sidekiq/instrument/mixin'
require 'active_support/core_ext/string/inflections'

module Sidekiq::Instrument
class ServerMiddleware
Expand All @@ -9,6 +12,7 @@ def call(worker, job, queue, &block)
Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker))

start_time = Time.now
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
yield block
execution_time_ms = (Time.now - start_time) * 1000
Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms)
Expand All @@ -22,4 +26,3 @@ def call(worker, job, queue, &block)
end
end
end

2 changes: 1 addition & 1 deletion lib/sidekiq/instrument/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Sidekiq
module Instrument
VERSION = '0.5.5'
VERSION = '0.5.6'
end
end
11 changes: 10 additions & 1 deletion lib/sidekiq/instrument/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def perform
working = Sidekiq::Workers.new.count
Statter.statsd.gauge('shared.sidekiq.stats.working', working)
Statter.dogstatsd&.gauge('sidekiq.working', working)

send_worker_metrics
Sidekiq::Queue.all.each do |queue|
Statter.statsd.gauge("shared.sidekiq.#{queue.name}.size", queue.size)
Statter.dogstatsd&.gauge('sidekiq.queue.size', queue.size, tags: dd_tags(queue))
Expand Down Expand Up @@ -59,5 +59,14 @@ def perform
def dd_tags(queue)
["queue:#{queue.name}"]
end

def send_worker_metrics
return unless WorkerMetrics.enabled

WorkerMetrics.workers_in_queue.each do |key, value|
Statter.statsd.gauge("shared.sidekiq.trace.inqueue.#{key}", value)
Statter.dogstatsd&.gauge("shared.sidekiq.trace.inqueue.#{key}", value)
end
end
end
end
106 changes: 106 additions & 0 deletions lib/sidekiq/instrument/worker_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# frozen_string_literal: true

require 'redis'
require 'redis-client'
module Sidekiq
module Instrument
# Stores worker count with a key sidekiq_instrument_trace_workers:#{namespace}:in_queue
# Values are hash having keys as worker names.
class WorkerMetrics
class_attribute :enabled, :namespace, :redis_config

class_attribute :redis_password

class << self
def redis_pool_user
@redis_pool_user ||= begin
redis_client_config = RedisClient.config(**redis_config)
@redis = redis_client_config.new_pool(
timeout: 0.5, size: Integer(ENV.fetch('RAILS_MAX_THREADS', 5))
)
end
end

def reset_redis
@redis = nil
end

def trace_workers_increment_counter(klass_name, sidekiq_redis_pool_user)
return unless enabled?

if redis_config?
redis_pool_user.with do |redis|
redis.call 'HINCRBY', worker_metric_name, klass_name, 1
end
else
sidekiq_redis_pool_user.with do |redis|
redis.hincrby worker_metric_name, klass_name, 1
end
end
end

def trace_workers_decrement_counter(klass_name)
return unless enabled?

if redis_config?
redis_pool_user.with do |redis|
redis.call 'HINCRBY', worker_metric_name, klass_name, -1
end
else
Sidekiq.redis do |redis|
redis.hincrby worker_metric_name, klass_name, -1
end
end
end

def reset_counters
return unless enabled?

if redis_config?
redis_pool_user.with do |redis|
all_keys = redis.call 'HGETALL', worker_metric_name
redis.call 'HDEL', worker_metric_name, all_keys.keys
end
else
Sidekiq.redis do |redis|
all_keys = redis.hgetall worker_metric_name
redis.hdel worker_metric_name, all_keys.keys
end
end
end

def reset_counter(key)
return unless enabled?

if redis_config?
redis_pool_user.with do |redis|
redis.call 'HDEL', worker_metric_name, key
end
else
Sidekiq.redis do |redis|
redis.hdel worker_metric_name, key
end
end
end

def workers_in_queue
return unless enabled?

if redis_config?
redis_pool_user.with do |redis|
redis.call 'HGETALL', worker_metric_name
end
else
Sidekiq.redis do |redis|
redis.hgetall worker_metric_name
end
end
end

def worker_metric_name
"sidekiq_instrument_trace_workers:#{namespace}:in_queue"
end
end
end
end
end
1 change: 1 addition & 0 deletions sidekiq-instrument.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'statsd-instrument', '>= 2.0.4'
spec.add_dependency 'dogstatsd-ruby', '~> 5.5'
spec.add_dependency 'activesupport', '>= 5.1', '< 7'
spec.add_dependency "redis-client", ">= 0.11.0"

spec.add_development_dependency 'bundler', '~> 2.0', '>= 2.0.2'
spec.add_development_dependency 'rake', '~> 12.0'
Expand Down
33 changes: 33 additions & 0 deletions spec/sidekiq-instrument/client_middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,39 @@
end
end

context 'with WorkerMetrics.enabled true' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
Sidekiq::Instrument::WorkerMetrics.redis_config = {
host: ENV['REDIS_HOST'],
port: ENV['REDIS_PORT'],
db: 0
}
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('1')
end
end

context 'with WorkerMetrics.enabled true and redis_config not provided' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('1')
end
end

context 'without optional DogStatsD client' do
before do
@tmp = Sidekiq::Instrument::Statter.dogstatsd
Expand Down
33 changes: 33 additions & 0 deletions spec/sidekiq-instrument/server_middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,39 @@
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:timing).once
MyWorker.perform_async
end

context 'with WorkerMetrics.enabled true' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Sidekiq::Instrument::WorkerMetrics.redis_config = {
host: ENV['REDIS_HOST'],
port: ENV['REDIS_PORT'],
db: 0
}
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('-1')
end
end

context 'with WorkerMetrics.enabled true, and redis_config not given' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('-1')
end
end
end

context 'when a job fails' do
Expand Down
Loading

0 comments on commit a069402

Please sign in to comment.