Skip to content

Commit

Permalink
implement probe for ruby-kafka (#1)
Browse files Browse the repository at this point in the history
* implement probe for ruby-kafka

* fix rubocop warnings

* code review fixes

* support multi-threaded consumers

* ruby-kafka: add verbose option
  • Loading branch information
mugimaru authored Jul 19, 2022
1 parent c988a1a commit 951c71c
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 8 deletions.
6 changes: 5 additions & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ Style/Documentation:

Metrics/BlockLength:
Exclude:
- 'spec/**/*.rb'
- "spec/**/*.rb"
- "http_health_check.gemspec"

Style/CommentedKeyword:
Enabled: false
20 changes: 13 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# CHANGELOG.md

## 0.1.1 (2022-07-17)
## 0.3.0 (unreleased)

Features:

- implement basic functionality
- add builtin sidekiq probe
- add builtin delayed job probe
- add ruby-kafka probe

## 0.2.1 (2022-07-18)

Fix:

- fix gemspec

## 0.2.0 (2022-07-18)

Expand All @@ -18,8 +22,10 @@ Fix:

- fix builtin probes requirement

## 0.2.1 (2022-07-18)
## 0.1.1 (2022-07-17)

Fix:
Features:

- fix gemspec
- implement basic functionality
- add builtin sidekiq probe
- add builtin delayed job probe
13 changes: 13 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ PATH
GEM
remote: https://rubygems.org/
specs:
activesupport (5.2.8.1)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 0.7, < 2)
minitest (~> 5.1)
tzinfo (~> 1.1)
ast (2.4.2)
concurrent-ruby (1.1.10)
diff-lcs (1.5.0)
docile (1.4.0)
dotenv (2.7.6)
i18n (1.12.0)
concurrent-ruby (~> 1.0)
minitest (5.15.0)
parallel (1.22.1)
parser (3.1.2.0)
ast (~> 2.4.1)
Expand Down Expand Up @@ -57,6 +66,9 @@ GEM
simplecov (~> 0.19)
simplecov-html (0.12.3)
simplecov_json_formatter (0.1.4)
thread_safe (0.3.6)
tzinfo (1.2.9)
thread_safe (~> 0.1)
unicode-display_width (1.8.0)
webrick (1.7.0)

Expand All @@ -66,6 +78,7 @@ PLATFORMS
x86_64-linux

DEPENDENCIES
activesupport (~> 5.0)
dotenv (~> 2.7.6)
http_health_check!
rake (~> 13.0)
Expand Down
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,59 @@ module Delayed::AfterFork
end
```

### Karafka ~> 1.4

Ruby-kafka probe is disabled by default as it requires app-specific configuration to work properly. Example usage with karafka framework:

```ruby
# ./karafka.rb

class KarafkaApp < Karafka::App
# ...
# karafka app configuration
# ...
end

KarafkaApp.boot!

HttpHealthCheck.run_server_async(
port: 5555,
rack_app: HttpHealthCheck::RackApp.configure do |c|
c.probe '/readiness/karafka', HttpHealthCheck::Probes::RubyKafka.new(
consumer_groups: KarafkaApp.consumer_groups.map(&:id),
# default heartbeat interval is 3 seconds, but we want to give it
# an ability to skip a few before failing the probe
heartbeat_interval_sec: 10,
# includes a list of topics and partitions into response for every consumer thread. false by default
verbose: false
)
end
)
```

Ruby kafka probe supports multi-threaded setups, i.e. if you are using karafka and you define multiple blocks with the same consumer group like

```ruby
class KarafkaApp < Karafka::App
consumer_groups.draw do
consumer_group 'foo' do
# ...
end
end

consumer_groups.draw do
consumer_group 'foo' do
# ...
end
end
end

KarafkaApp.consumer_groups.map(&:id)
# => ['foo', 'foo']
```

ruby-kafka probe will count heartbeats from multiple threads.

### Kubernetes deployment example

```yaml
Expand Down Expand Up @@ -166,6 +219,20 @@ HttpHealthCheck.configure do |config|
end
```

#### [ruby-kafka](./lib/http_health_check/probes/ruby_kafka.rb)

ruby-kafka probe is expected to be configured with consumer groups list. It subscribes to ruby-kafka's `heartbeat.consumer.kafka` ActiveSupport notification and tracks heartbeats for every given consumer group.
It expects a heartbeat every `:heartbeat_interval_sec` (10 seconds by default).

```ruby
heartbeat_app = HttpHealthCheck::RackApp.configure do |c|
c.probe '/readiness/kafka', HttpHealthCheck::Probes::Karafka.new(
consumer_groups: ['consumer-one', 'consumer-two'],
heartbeat_interval_sec: 42
)
end
```

## Development

After checking out the repo, run `bin/setup` to install dependencies. You can also run `bin/console` for an interactive prompt that will allow you to experiment.
Expand Down
1 change: 1 addition & 0 deletions http_health_check.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'rack', '~> 2.0'
spec.add_dependency 'webrick'

spec.add_development_dependency 'activesupport', '~> 5.0'
spec.add_development_dependency 'dotenv', '~> 2.7.6'
spec.add_development_dependency 'redis', '~> 4.2.5'
spec.add_development_dependency 'rspec', '~> 3.2'
Expand Down
1 change: 1 addition & 0 deletions lib/http_health_check/probes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'probes/sidekiq' if defined?(::Sidekiq)
require_relative 'probes/delayed_job' if defined?(::Delayed::Job)
require_relative 'probes/ruby_kafka' if defined?(::Kafka::Consumer)

module HttpHealthCheck
module Probes; end
Expand Down
67 changes: 67 additions & 0 deletions lib/http_health_check/probes/ruby_kafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

module HttpHealthCheck
module Probes
class RubyKafka
Heartbeat = Struct.new(:time, :group, :topic_partitions)
include ::HttpHealthCheck::Probe

def initialize(opts = {})
@heartbeat_event_name = opts.fetch(:heartbeat_event_name, /heartbeat.consumer.kafka/)
@heartbeat_interval_sec = opts.fetch(:heartbeat_interval_sec, 10)
@verbose = opts.fetch(:verbose, false)
@consumer_groups = opts.fetch(:consumer_groups)
.each_with_object(Hash.new(0)) { |group, hash| hash[group] += 1 }
@heartbeats = {}
@timer = opts.fetch(:timer, Time)

setup_subscriptions
end

def probe(_env)
now = @timer.now
failed_heartbeats = select_failed_heartbeats(now)
return probe_ok groups: meta_from_heartbeats(@heartbeats, now) if failed_heartbeats.empty?

probe_error failed_groups: meta_from_heartbeats(failed_heartbeats, now)
end

private

def select_failed_heartbeats(now)
@consumer_groups.each_with_object({}) do |(group, concurrency), hash|
heartbeats = @heartbeats[group] || {}
ok_heartbeats_count = heartbeats.count { |_id, hb| hb.time + @heartbeat_interval_sec >= now }
hash[group] = heartbeats if ok_heartbeats_count < concurrency
end
end

def meta_from_heartbeats(heartbeats_hash, now) # rubocop: disable Metrics/MethodLength, Metrics/AbcSize
heartbeats_hash.each_with_object({}) do |(group, heartbeats), hash|
concurrency = @consumer_groups[group]
if heartbeats.empty?
hash[group] = { had_heartbeat: false, concurrency: concurrency }
next
end

hash[group] = { had_heartbeat: true, concurrency: concurrency, threads: {} }
heartbeats.each do |thread_id, heartbeat|
thread_meta = { seconds_since_last_heartbeat: now - heartbeat.time }
thread_meta[:topic_partitions] = heartbeat.topic_partitions if @verbose
hash[group][:threads][thread_id] = thread_meta
end
end
end

def setup_subscriptions
ActiveSupport::Notifications.subscribe(@heartbeat_event_name) do |*args|
event = ActiveSupport::Notifications::Event.new(*args)
group = event.payload[:group_id]

@heartbeats[group] ||= {}
@heartbeats[group][event.transaction_id] = Heartbeat.new(event.time, group, event.payload[:topic_partitions])
end
end
end
end
end
121 changes: 121 additions & 0 deletions spec/http_health_check/probes/ruby_kafka_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# frozen_string_literal: true

require 'active_support/notifications'
require_relative '../../../lib/http_health_check/probes/ruby_kafka'

describe HttpHealthCheck::Probes::RubyKafka do
let(:timer) { double(Time) }
let(:event_name) { 'fake.heartbeat.consumer.kafka' }
let(:consumer_groups) { nil }
let(:verbose) { true }
let!(:probe) do
described_class.new(
consumer_groups: consumer_groups,
timer: timer,
heartbeat_event_name: event_name,
verbose: verbose
)
end

def emit_hb_event(group, topic_partitions: nil)
topic_partitions ||= { 'some_topic' => %w[1 2] }
ActiveSupport::Notifications.instrument(event_name, group_id: group, topic_partitions: topic_partitions) {}
end

context 'with list of consumer groups' do
let(:group) { 'important-consumer' }
let(:consumer_groups) { [group] }

context 'when heartbeat is expired' do
it 'returns an error' do
topic_partitions = { 'foo' => %w[1 2], 'bar' => %w[3 4] }
emit_hb_event(group, topic_partitions: topic_partitions)

expect(timer).to receive(:now).and_return(Time.now + 15)
result = probe.call(nil)
expect(result).not_to be_ok

meta = result.meta[:failed_groups][group]
expect(meta[:had_heartbeat]).to eq(true)
expect(meta[:threads].size).to eq(1)

thread_meta = meta[:threads].values.first
expect(thread_meta[:seconds_since_last_heartbeat]).to be_within(1).of(15)
expect(thread_meta[:topic_partitions]).to eq(topic_partitions)
end
end

context 'with multiple threads' do
let(:consumer_groups) { [group, group] }

context 'when number of heartbeats is equal to group concurrency' do
it 'returns ok' do
emit_hb_event(group)
Thread.new { emit_hb_event(group) }
sleep(0.01)

expect(timer).to receive(:now).and_return(Time.now)
result = probe.call(nil)
expect(result).to be_ok
end
end

context 'when some heartbeats are missing' do
it 'returns an error' do
Thread.new { emit_hb_event(group) }
sleep(0.01)

expect(timer).to receive(:now).and_return(Time.now)
result = probe.call(nil)
expect(result).not_to be_ok

meta = result.meta[:failed_groups][group]
expect(meta[:had_heartbeat]).to eq(true)
expect(meta[:threads].size).to eq(1)
end
end
end

context 'when heartbeat had not been emitted yet' do
it 'return an error' do
expect(timer).to receive(:now).and_return(Time.now)

result = probe.call(nil)
expect(result).not_to be_ok

meta = result.meta[:failed_groups][group]
expect(meta[:had_heartbeat]).to eq(false)
end
end

context 'when it noted specified group heartbeat recently' do
it 'returns ok' do
emit_hb_event(group)

expect(timer).to receive(:now).and_return(Time.now + 5)
result = probe.call(nil)
expect(result).to be_ok

meta = result.meta[:groups][group]
thread_meta = meta[:threads].values.first
expect(thread_meta[:seconds_since_last_heartbeat]).to be_within(1).of(5)
end
end

context 'when verbose=false' do
let(:verbose) { false }

it 'does not include topic_partitions into result meta' do
emit_hb_event(group)

expect(timer).to receive(:now).and_return(Time.now + 5)
result = probe.call(nil)
expect(result).to be_ok

meta = result.meta[:groups][group]
thread_meta = meta[:threads].values.first
expect(thread_meta.keys).not_to include(:topic_partitions)
end
end
end
end

0 comments on commit 951c71c

Please sign in to comment.