From a0694020f6c023fe631e75f27e6273e73eebcef2 Mon Sep 17 00:00:00 2001 From: Manoj Naduviledath Date: Mon, 3 Apr 2023 16:01:27 -0500 Subject: [PATCH] Add stats for Workers in queue (#21) * Add stats for Workers in queue * review * redis_version update * specify host port * upgrade workflow step * more specs * execute tests only for redis 7 * fix reset_counters --------- Co-authored-by: Manoj Naduviledath --- .github/workflows/ci.yml | 4 +- lib/sidekiq/instrument.rb | 1 + lib/sidekiq/instrument/middleware/client.rb | 4 + .../instrument/middleware/instrument.rb | 14 ++ lib/sidekiq/instrument/middleware/server.rb | 5 +- lib/sidekiq/instrument/version.rb | 2 +- lib/sidekiq/instrument/worker.rb | 11 +- lib/sidekiq/instrument/worker_metrics.rb | 106 ++++++++++++++ sidekiq-instrument.gemspec | 1 + .../client_middleware_spec.rb | 33 +++++ .../server_middleware_spec.rb | 33 +++++ spec/sidekiq-instrument/worker_spec.rb | 133 ++++++++++++------ 12 files changed, 298 insertions(+), 49 deletions(-) create mode 100644 lib/sidekiq/instrument/middleware/instrument.rb create mode 100644 lib/sidekiq/instrument/worker_metrics.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ed3fed..04d0448 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: ruby-version: [2.6, 2.7, 3.0] - redis-version: [4, 5, 6] + redis-version: [7] steps: - name: Checkout project @@ -28,7 +28,7 @@ jobs: bundler-cache: true # runs 'bundle install' and caches installed gems automatically - name: Start Redis - uses: supercharge/redis-github-action@1.4.0 + uses: supercharge/redis-github-action@1.5.0 with: redis-version: ${{ matrix.redis-version }} diff --git a/lib/sidekiq/instrument.rb b/lib/sidekiq/instrument.rb index 3da8192..bbd48b0 100644 --- a/lib/sidekiq/instrument.rb +++ b/lib/sidekiq/instrument.rb @@ -6,6 +6,7 @@ require "sidekiq/instrument/worker" require "sidekiq/instrument/middleware/client" require "sidekiq/instrument/middleware/server" +require "sidekiq/instrument/worker_metrics" module Sidekiq module Instrument diff --git a/lib/sidekiq/instrument/middleware/client.rb b/lib/sidekiq/instrument/middleware/client.rb index 8cd7d43..1e40816 100644 --- a/lib/sidekiq/instrument/middleware/client.rb +++ b/lib/sidekiq/instrument/middleware/client.rb @@ -1,4 +1,7 @@ +# frozen_string_literal: true + require 'sidekiq/instrument/mixin' +require 'active_support/core_ext/string/inflections' module Sidekiq::Instrument class ClientMiddleware @@ -10,6 +13,7 @@ def call(worker_class, job, queue, redis_pool) class_instance = klass.new Statter.statsd.increment(metric_name(class_instance, 'enqueue')) Statter.dogstatsd&.increment('sidekiq.enqueue', worker_dog_options(class_instance)) + WorkerMetrics.trace_workers_increment_counter(klass.name.underscore, redis_pool) result = yield Statter.dogstatsd&.flush(sync: true) result diff --git a/lib/sidekiq/instrument/middleware/instrument.rb b/lib/sidekiq/instrument/middleware/instrument.rb new file mode 100644 index 0000000..bbd48b0 --- /dev/null +++ b/lib/sidekiq/instrument/middleware/instrument.rb @@ -0,0 +1,14 @@ +require "active_support/core_ext/class/attribute" +require "statsd/instrument" + +require "sidekiq/instrument/statter" +require "sidekiq/instrument/version" +require "sidekiq/instrument/worker" +require "sidekiq/instrument/middleware/client" +require "sidekiq/instrument/middleware/server" +require "sidekiq/instrument/worker_metrics" + +module Sidekiq + module Instrument + end +end diff --git a/lib/sidekiq/instrument/middleware/server.rb b/lib/sidekiq/instrument/middleware/server.rb index 8a4fad3..78c953e 100644 --- a/lib/sidekiq/instrument/middleware/server.rb +++ b/lib/sidekiq/instrument/middleware/server.rb @@ -1,4 +1,7 @@ +# frozen_string_literal: true + require 'sidekiq/instrument/mixin' +require 'active_support/core_ext/string/inflections' module Sidekiq::Instrument class ServerMiddleware @@ -9,6 +12,7 @@ def call(worker, job, queue, &block) Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker)) start_time = Time.now + WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore) yield block execution_time_ms = (Time.now - start_time) * 1000 Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms) @@ -22,4 +26,3 @@ def call(worker, job, queue, &block) end end end - diff --git a/lib/sidekiq/instrument/version.rb b/lib/sidekiq/instrument/version.rb index 0f0cd7d..9f99739 100644 --- a/lib/sidekiq/instrument/version.rb +++ b/lib/sidekiq/instrument/version.rb @@ -1,5 +1,5 @@ module Sidekiq module Instrument - VERSION = '0.5.5' + VERSION = '0.5.6' end end diff --git a/lib/sidekiq/instrument/worker.rb b/lib/sidekiq/instrument/worker.rb index b2fc4d0..6b626f0 100644 --- a/lib/sidekiq/instrument/worker.rb +++ b/lib/sidekiq/instrument/worker.rb @@ -28,7 +28,7 @@ def perform working = Sidekiq::Workers.new.count Statter.statsd.gauge('shared.sidekiq.stats.working', working) Statter.dogstatsd&.gauge('sidekiq.working', working) - + send_worker_metrics Sidekiq::Queue.all.each do |queue| Statter.statsd.gauge("shared.sidekiq.#{queue.name}.size", queue.size) Statter.dogstatsd&.gauge('sidekiq.queue.size', queue.size, tags: dd_tags(queue)) @@ -59,5 +59,14 @@ def perform def dd_tags(queue) ["queue:#{queue.name}"] end + + def send_worker_metrics + return unless WorkerMetrics.enabled + + WorkerMetrics.workers_in_queue.each do |key, value| + Statter.statsd.gauge("shared.sidekiq.trace.inqueue.#{key}", value) + Statter.dogstatsd&.gauge("shared.sidekiq.trace.inqueue.#{key}", value) + end + end end end diff --git a/lib/sidekiq/instrument/worker_metrics.rb b/lib/sidekiq/instrument/worker_metrics.rb new file mode 100644 index 0000000..f8dcfdf --- /dev/null +++ b/lib/sidekiq/instrument/worker_metrics.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'redis' +require 'redis-client' +module Sidekiq + module Instrument + # Stores worker count with a key sidekiq_instrument_trace_workers:#{namespace}:in_queue + # Values are hash having keys as worker names. + class WorkerMetrics + class_attribute :enabled, :namespace, :redis_config + + class_attribute :redis_password + + class << self + def redis_pool_user + @redis_pool_user ||= begin + redis_client_config = RedisClient.config(**redis_config) + @redis = redis_client_config.new_pool( + timeout: 0.5, size: Integer(ENV.fetch('RAILS_MAX_THREADS', 5)) + ) + end + end + + def reset_redis + @redis = nil + end + + def trace_workers_increment_counter(klass_name, sidekiq_redis_pool_user) + return unless enabled? + + if redis_config? + redis_pool_user.with do |redis| + redis.call 'HINCRBY', worker_metric_name, klass_name, 1 + end + else + sidekiq_redis_pool_user.with do |redis| + redis.hincrby worker_metric_name, klass_name, 1 + end + end + end + + def trace_workers_decrement_counter(klass_name) + return unless enabled? + + if redis_config? + redis_pool_user.with do |redis| + redis.call 'HINCRBY', worker_metric_name, klass_name, -1 + end + else + Sidekiq.redis do |redis| + redis.hincrby worker_metric_name, klass_name, -1 + end + end + end + + def reset_counters + return unless enabled? + + if redis_config? + redis_pool_user.with do |redis| + all_keys = redis.call 'HGETALL', worker_metric_name + redis.call 'HDEL', worker_metric_name, all_keys.keys + end + else + Sidekiq.redis do |redis| + all_keys = redis.hgetall worker_metric_name + redis.hdel worker_metric_name, all_keys.keys + end + end + end + + def reset_counter(key) + return unless enabled? + + if redis_config? + redis_pool_user.with do |redis| + redis.call 'HDEL', worker_metric_name, key + end + else + Sidekiq.redis do |redis| + redis.hdel worker_metric_name, key + end + end + end + + def workers_in_queue + return unless enabled? + + if redis_config? + redis_pool_user.with do |redis| + redis.call 'HGETALL', worker_metric_name + end + else + Sidekiq.redis do |redis| + redis.hgetall worker_metric_name + end + end + end + + def worker_metric_name + "sidekiq_instrument_trace_workers:#{namespace}:in_queue" + end + end + end + end +end diff --git a/sidekiq-instrument.gemspec b/sidekiq-instrument.gemspec index f3fc6e0..1bd619a 100644 --- a/sidekiq-instrument.gemspec +++ b/sidekiq-instrument.gemspec @@ -20,6 +20,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'statsd-instrument', '>= 2.0.4' spec.add_dependency 'dogstatsd-ruby', '~> 5.5' spec.add_dependency 'activesupport', '>= 5.1', '< 7' + spec.add_dependency "redis-client", ">= 0.11.0" spec.add_development_dependency 'bundler', '~> 2.0', '>= 2.0.2' spec.add_development_dependency 'rake', '~> 12.0' diff --git a/spec/sidekiq-instrument/client_middleware_spec.rb b/spec/sidekiq-instrument/client_middleware_spec.rb index 8784671..8b9c162 100644 --- a/spec/sidekiq-instrument/client_middleware_spec.rb +++ b/spec/sidekiq-instrument/client_middleware_spec.rb @@ -39,6 +39,39 @@ end end + context 'with WorkerMetrics.enabled true' do + let(:worker_metric_name) do + "sidekiq_instrument_trace_workers::in_queue" + end + it 'increments the enqueue counter' do + Sidekiq::Instrument::WorkerMetrics.enabled = true + Redis.new.hdel worker_metric_name ,'my_other_worker' + Sidekiq::Instrument::WorkerMetrics.redis_config = { + host: ENV['REDIS_HOST'], + port: ENV['REDIS_PORT'], + db: 0 + } + MyOtherWorker.perform_async + expect( + Redis.new.hget worker_metric_name ,'my_other_worker' + ).to eq('1') + end + end + + context 'with WorkerMetrics.enabled true and redis_config not provided' do + let(:worker_metric_name) do + "sidekiq_instrument_trace_workers::in_queue" + end + it 'increments the enqueue counter' do + Sidekiq::Instrument::WorkerMetrics.enabled = true + Redis.new.hdel worker_metric_name ,'my_other_worker' + MyOtherWorker.perform_async + expect( + Redis.new.hget worker_metric_name ,'my_other_worker' + ).to eq('1') + end + end + context 'without optional DogStatsD client' do before do @tmp = Sidekiq::Instrument::Statter.dogstatsd diff --git a/spec/sidekiq-instrument/server_middleware_spec.rb b/spec/sidekiq-instrument/server_middleware_spec.rb index e24e795..40f11e2 100644 --- a/spec/sidekiq-instrument/server_middleware_spec.rb +++ b/spec/sidekiq-instrument/server_middleware_spec.rb @@ -38,6 +38,39 @@ expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:timing).once MyWorker.perform_async end + + context 'with WorkerMetrics.enabled true' do + let(:worker_metric_name) do + "sidekiq_instrument_trace_workers::in_queue" + end + it 'increments the enqueue counter' do + Sidekiq::Instrument::WorkerMetrics.enabled = true + Sidekiq::Instrument::WorkerMetrics.redis_config = { + host: ENV['REDIS_HOST'], + port: ENV['REDIS_PORT'], + db: 0 + } + Redis.new.hdel worker_metric_name ,'my_other_worker' + MyOtherWorker.perform_async + expect( + Redis.new.hget worker_metric_name ,'my_other_worker' + ).to eq('-1') + end + end + + context 'with WorkerMetrics.enabled true, and redis_config not given' do + let(:worker_metric_name) do + "sidekiq_instrument_trace_workers::in_queue" + end + it 'increments the enqueue counter' do + Sidekiq::Instrument::WorkerMetrics.enabled = true + Redis.new.hdel worker_metric_name ,'my_other_worker' + MyOtherWorker.perform_async + expect( + Redis.new.hget worker_metric_name ,'my_other_worker' + ).to eq('-1') + end + end end context 'when a job fails' do diff --git a/spec/sidekiq-instrument/worker_spec.rb b/spec/sidekiq-instrument/worker_spec.rb index 4dadec1..6be8570 100644 --- a/spec/sidekiq-instrument/worker_spec.rb +++ b/spec/sidekiq-instrument/worker_spec.rb @@ -3,69 +3,114 @@ RSpec.describe Sidekiq::Instrument::Worker do describe '#perform' do let(:worker) { described_class.new } - - it 'triggers the correct default gauges' do - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.processed') - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.workers') - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.pending') - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.failed') - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.working') + let(:worker_metric_name) do + "sidekiq_instrument_trace_workers::in_queue" end - - it 'allows overriding gauges via constant' do - stub_const("#{described_class}::METRIC_NAMES", { enqueued: nil }) - - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.enqueued') - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.working') + before do + Redis.new.hdel worker_metric_name ,'my_other_worker' + Redis.new.hdel worker_metric_name ,'my_worker' end + shared_examples 'worker behavior' do |expected_stats| + it 'triggers the correct default gauges' do + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.processed') + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.workers') + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.pending') + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.failed') + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.working') + end - context 'with DogStatsD client' do - let(:dogstatsd) { Sidekiq::Instrument::Statter.dogstatsd } + it 'allows overriding gauges via constant' do + stub_const("#{described_class}::METRIC_NAMES", { enqueued: nil }) - it 'sends the appropriate metrics via DogStatsD' do - allow(dogstatsd).to receive(:gauge).with('sidekiq.queue.size', any_args).at_least(:once) - allow(dogstatsd).to receive(:gauge).with('sidekiq.queue.latency', any_args).at_least(:once) + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.enqueued') + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.stats.working') + end - expect(dogstatsd).to receive(:gauge).with('sidekiq.processed', anything) - expect(dogstatsd).to receive(:gauge).with('sidekiq.workers', anything) - expect(dogstatsd).to receive(:gauge).with('sidekiq.pending', anything) - expect(dogstatsd).to receive(:gauge).with('sidekiq.failed', anything) - expect(dogstatsd).to receive(:gauge).with('sidekiq.working', anything) + context 'with DogStatsD client' do + let(:dogstatsd) { Sidekiq::Instrument::Statter.dogstatsd } - worker.perform + it 'sends the appropriate metrics via DogStatsD' do + allow(dogstatsd).to receive(:gauge).with('sidekiq.queue.size', any_args).at_least(:once) + allow(dogstatsd).to receive(:gauge).with('sidekiq.queue.latency', any_args).at_least(:once) + expected_stats.each do |ex| + expect(dogstatsd).to receive(:gauge).with(ex, anything) + end + worker.perform + end end - end - context 'without optional DogStatsD client' do - before do - @tmp = Sidekiq::Instrument::Statter.dogstatsd - Sidekiq::Instrument::Statter.dogstatsd = nil - end + context 'without optional DogStatsD client' do + before do + @tmp = Sidekiq::Instrument::Statter.dogstatsd + Sidekiq::Instrument::Statter.dogstatsd = nil + Sidekiq::Instrument::WorkerMetrics.enabled = false + end - after do - Sidekiq::Instrument::Statter.dogstatsd = @tmp + after do + Sidekiq::Instrument::Statter.dogstatsd = @tmp + end + + it 'does not error' do + expect { MyWorker.perform_async }.not_to raise_error + end end - it 'does not error' do - expect { MyWorker.perform_async }.not_to raise_error + context 'when jobs in queues' do + before do + Sidekiq::Testing.disable! do + Sidekiq::Queue.all.each(&:clear) + MyWorker.perform_async + end + end + + it 'gauges the size of the queues' do + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.default.size') + end + + it 'gauges the latency of the queues' do + expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.default.latency') + end + end + end + context 'when WorkerMetrics disabled' do + before do + Sidekiq::Instrument::WorkerMetrics.enabled = false end + it_behaves_like 'worker behavior', %w[ + sidekiq.processed + sidekiq.workers + sidekiq.pending + sidekiq.failed + sidekiq.working + ] end - context 'when jobs in queues' do + context 'when WorkerMetrics enabled' do before do - Sidekiq::Testing.disable! do - Sidekiq::Queue.all.each(&:clear) - MyWorker.perform_async + Sidekiq::Instrument::WorkerMetrics.enabled = true + Sidekiq.configure_client do |c| + c.client_middleware do |chain| + chain.add Sidekiq::Instrument::ClientMiddleware + end end - end - it 'gauges the size of the queues' do - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.default.size') - end + MyOtherWorker.perform_async - it 'gauges the latency of the queues' do - expect { worker.perform }.to trigger_statsd_gauge('shared.sidekiq.default.latency') + Sidekiq.configure_client do |c| + c.client_middleware do |chain| + chain.remove Sidekiq::Instrument::ClientMiddleware + end + end end + + it_behaves_like 'worker behavior', %w[ + shared.sidekiq.trace.inqueue.my_other_worker + sidekiq.processed + sidekiq.workers + sidekiq.pending + sidekiq.failed + sidekiq.working + ] end end end