Skip to content

Commit

Permalink
Initial commit with kafka standard monitoring adapter
Browse files Browse the repository at this point in the history
dsalahutdinov committed May 20, 2021
1 parent bc8e1aa commit e829f0e
Showing 17 changed files with 711 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--format documentation
--color
--require spec_helper
12 changes: 12 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
AllCops:
TargetRubyVersion: 2.5

Style/Documentation:
Enabled: false

Gemspec/RequiredRubyVersion:
Enabled: false

Metrics/BlockLength:
Exclude:
- spec/yabeda/kafka_spec.rb
14 changes: 14 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
language: minimal
sudo: require
services:
- docker

before_script:
- unset BUNDLE_GEMFILE
- docker-compose run app bundle install
script:
- docker-compose run app bundle exec rake

env:
- RAILS_ENABLED=0
- RAILS_ENABLED=1
7 changes: 7 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

source 'https://rubygems.org'

git_source(:github) { |repo_name| "https://github.com/#{repo_name}" }

gemspec
83 changes: 83 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
PATH
remote: .
specs:
yabeda-kafka (0.1.0)
activesupport (>= 4.0)
prometheus-client
ruby-kafka
yabeda

GEM
remote: https://rubygems.org/
specs:
activesupport (6.1.3.2)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 1.6, < 2)
minitest (>= 5.1)
tzinfo (~> 2.0)
zeitwerk (~> 2.3)
ast (2.4.2)
byebug (11.1.3)
concurrent-ruby (1.1.8)
diff-lcs (1.4.4)
digest-crc (0.6.0)
dry-initializer (3.0.4)
i18n (1.8.10)
concurrent-ruby (~> 1.0)
minitest (5.14.4)
parallel (1.20.1)
parser (3.0.1.1)
ast (~> 2.4.1)
prometheus-client (2.1.0)
rainbow (3.0.0)
rake (10.5.0)
regexp_parser (2.1.1)
rexml (3.2.5)
rspec (3.10.0)
rspec-core (~> 3.10.0)
rspec-expectations (~> 3.10.0)
rspec-mocks (~> 3.10.0)
rspec-core (3.10.1)
rspec-support (~> 3.10.0)
rspec-expectations (3.10.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.10.0)
rspec-mocks (3.10.2)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.10.0)
rspec-support (3.10.2)
rubocop (1.15.0)
parallel (~> 1.10)
parser (>= 3.0.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.8, < 3.0)
rexml
rubocop-ast (>= 1.5.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 3.0)
rubocop-ast (1.5.0)
parser (>= 3.0.1.1)
ruby-kafka (1.3.0)
digest-crc
ruby-progressbar (1.11.0)
tzinfo (2.0.4)
concurrent-ruby (~> 1.0)
unicode-display_width (2.0.0)
yabeda (0.9.0)
concurrent-ruby
dry-initializer
zeitwerk (2.4.2)

PLATFORMS
x86_64-linux

DEPENDENCIES
bundler
byebug
rake (~> 10.0)
rspec (~> 3.0)
rubocop (~> 1.15)
yabeda-kafka!

BUNDLED WITH
2.2.15
21 changes: 21 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2021 Dmitriy Salakhutdinov

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,39 @@
# yabeda-kafka
# Yabeda::Kafka

Welcome to your new gem! In this directory, you'll find the files you need to be able to package up your Ruby library into a gem. Put your Ruby code in the file `lib/yabeda/kafka`. To experiment with that code, run `bin/console` for an interactive prompt.

TODO: Delete this and the text above, and describe your gem

## Installation

Add this line to your application's Gemfile:

```ruby
gem 'yabeda-kafka'
```

And then execute:

$ bundle

Or install it yourself as:

$ gem install yabeda-kafka

## Usage

TODO: Write usage instructions here

## Development

After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run `bundle exec rake install`. To release a new version, update the version number in `version.rb`, and then run `bundle exec rake release`, which will create a git tag for the version, push git commits and tags, and push the `.gem` file to [rubygems.org](https://rubygems.org).

## Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/yabeda-kafka.

## License

The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).
11 changes: 11 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

require 'bundler/gem_tasks'
require 'rspec/core/rake_task'
require 'rubocop/rake_task'

RuboCop::RakeTask.new

RSpec::Core::RakeTask.new(:spec)

task default: %i[rubocop spec]
15 changes: 15 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'yabeda/kafka'

# You can add fixtures and/or initialization code here to make experimenting
# with your gem easier. You can also use a different console, if you like.

# (If you use this, don't forget to add pry to your Gemfile!)
# require "pry"
# Pry.start

require 'irb'
IRB.start(__FILE__)
8 changes: 8 additions & 0 deletions bin/setup
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -euo pipefail
IFS=$'\n\t'
set -vx

bundle install

# Do any other automated setup that you need to do here
22 changes: 22 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.4'

services:
app:
image: ruby:latest
environment:
- BUNDLE_PATH=/bundle
- BUNDLE_CONFIG=/app/.bundle/config
- REDIS_URL=redis://redis
command: bash
working_dir: /app
volumes:
- .:/app:cached
- bundler_data:/bundle
tmpfs:
- /tmp

redis:
image: redis

volumes:
bundler_data:
24 changes: 24 additions & 0 deletions lib/yabeda/kafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require 'yabeda'
require 'kafka'
require 'yabeda/kafka/version'
require 'yabeda/kafka/proxy'

module Yabeda
module Kafka
class << self
attr_accessor :registry
end

Yabeda.configure do
group :kafka

Kernel.const_set('PROMETHEUS_NO_AUTO_START', true) unless defined?(PROMETHEUS_NO_AUTO_START)

require 'kafka/prometheus'
Yabeda::Kafka.registry = ::Yabeda::Kafka::Proxy.new(self)
::Kafka::Prometheus.start(Yabeda::Kafka.registry)
end
end
end
78 changes: 78 additions & 0 deletions lib/yabeda/kafka/proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

module Yabeda
module Kafka
class Proxy
attr_reader :group, :metrics

def initialize(group)
@group = group
@metrics = []
end

def counter(name, docstring:, labels:)
metrics.push name

::Yabeda::Kafka::Proxy::Counter.new(
group.counter(name, comment: docstring, tags: labels)
)
end

def histogram(name, docstring:, labels:, buckets: nil)
metrics.push name

::Yabeda::Kafka::Proxy::Histogram.new(
group.histogram(
name,
comment: docstring,
tags: labels,
buckets: buckets
)
)
end

def gauge(name, docstring:, labels:)
metrics.push name
::Yabeda::Kafka::Proxy::Gauge.new(
group.gauge(name, tags: labels, comment: docstring)
)
end

class Counter
attr_reader :yabeda_counter

def initialize(yabeda_counter)
@yabeda_counter = yabeda_counter
end

def increment(labels:, by: 1)
yabeda_counter.increment(labels, by: by)
end
end

class Histogram
attr_reader :yabeda_histogram

def initialize(yabeda_histogram)
@yabeda_histogram = yabeda_histogram
end

def observe(value, labels:)
yabeda_histogram.measure(labels, value)
end
end

class Gauge
attr_reader :yabeda_gauge

def initialize(yabeda_gauge)
@yabeda_gauge = yabeda_gauge
end

def set(value, labels:)
yabeda_gauge.set(labels, value)
end
end
end
end
end
7 changes: 7 additions & 0 deletions lib/yabeda/kafka/version.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

module Yabeda
module Kafka
VERSION = '0.1.0'
end
end
17 changes: 17 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require 'bundler/setup'
require 'yabeda/kafka'
require 'byebug'

RSpec.configure do |config|
# Enable flags like --only-failures and --next-failure
config.example_status_persistence_file_path = '.rspec_status'

# Disable RSpec exposing methods globally on `Module` and `main`
config.disable_monkey_patching!

config.expect_with :rspec do |c|
c.syntax = :expect
end
end
307 changes: 307 additions & 0 deletions spec/yabeda/kafka_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
# frozen_string_literal: true

RSpec.describe Yabeda::Kafka do
before(:all) { Yabeda.configure! }

it 'has a version number' do
expect(Yabeda::Kafka::VERSION).not_to be nil
end

it 'does something useful' do
expect(Yabeda.kafka.api_calls).to be_kind_of(::Yabeda::Counter)
end

describe 'metrics hooks' do
let(:exception) { false }
let(:registry) { Yabeda.kafka }

before do
# clear yabeda metrics
Yabeda::Kafka.registry.metrics.each { |m| registry.send(m).values.clear }

instrumenter = if exception
Kafka::Instrumenter.new(client_id: 'test', exception: exception)
else
Kafka::Instrumenter.new(client_id: 'test')
end
instrumenter.instrument(hook, payload)
end

context 'when requesting a connection' do
let(:key) { { client: 'test', api: 'foo', broker: 'somehost' } }
let(:payload) { { broker_host: 'somehost', api: 'foo', request_size: 101, response_size: 4000 } }
let(:hook) { 'request.connection' }

it 'emits metrics to the api_calls' do
expect(registry.api_calls.values[key]).to eq 1
end

it 'emits metrics to api_latency' do
expect(registry.api_latency.values[key]).to be_kind_of(Float)
end

it 'emits metrics to api_request_size' do
expect(registry.api_request_size.values[key]).not_to be_nil
end

it 'emits metrics to api_response_size' do
expect(registry.api_response_size.values[key]).not_to be_nil
end

context 'with expection' do
let(:exception) { true }

it 'emits metrics to api_errors' do
expect(registry.api_errors.values[key]).to eq 1
end
end
end

context 'when a consumer is processing a message' do
let(:key) { { client: 'test', group_id: 'group1', topic: 'AAA', partition: 4 } }
let(:payload) do
{
group_id: 'group1',
topic: 'AAA',
partition: 4,
offset: 1,
offset_lag: 500,
create_time: Time.now - 5
}
end
let(:hook) { 'process_message.consumer' }

it 'emits metrics to consumer_offset_lag' do
expect(registry.consumer_offset_lag.values[key]).to eq 500
end

it 'emits metrics to consumer_process_messages' do
expect(registry.consumer_process_messages.values[key]).to eq 1
end

it 'emits metrics to consumer_process_message_latency' do
expect(registry.consumer_process_message_latency.values[key]).not_to be_nil
end

it 'emits metrics to consumer_time_lag' do
expect(registry.consumer_time_lag.values[key]).to be > 0
end

context 'with expection' do
let(:exception) { true }

it 'emits metrics to consumer_process_message_errors' do
expect(registry.consumer_process_message_errors.values[key]).to eq 1
end
end
end

context 'when a consumer is processing a batch' do
let(:key) { { client: 'test', group_id: 'group1', topic: 'AAA', partition: 4 } }
let(:payload) do
{
group_id: 'group1',
topic: 'AAA',
partition: 4,
last_offset: 100,
last_create_time: Time.now,
message_count: 7
}
end
let(:hook) { 'process_batch.consumer' }

it 'emits metrics consumer_process_messages' do
expect(registry.consumer_process_messages.values[key]).to eq 7
end

context 'with expection' do
let(:exception) { true }

it 'emits metrics to consumer_process_batch_errors' do
expect(registry.consumer_process_batch_errors.values[key]).to eq 1
end
end
end

context 'when a consumer is fetching a batch' do
let(:key) { { client: 'test', group_id: 'group1', topic: 'AAA', partition: 4 } }
let(:payload) do
{
group_id: 'group1',
topic: 'AAA',
partition: 4,
offset_lag: 7,
message_count: 123
}
end
let(:hook) { 'fetch_batch.consumer' }

it 'emits metrics consumer_offset_lag' do
expect(registry.consumer_offset_lag.values[key]).to eq 7
end

it 'emits metrics consumer_batch_size' do
expect(registry.consumer_batch_size.values[key]).to be > 0
end
end

context 'when a consumer is joining a group' do
let(:key) { { client: 'test', group_id: 'group1' } }
let(:payload) { { group_id: 'group1' } }
let(:hook) { 'join_group.consumer' }

it 'emits metrics consumer_join_group' do
expect(registry.consumer_join_group.values[key]).to be > 0
end

context 'with expection' do
let(:exception) { true }

it 'emits metrics to consumer_join_group_errors' do
expect(registry.consumer_join_group_errors.values[key]).to eq 1
end
end
end

context 'when a consumer is syncing a group' do
let(:key) { { client: 'test', group_id: 'group1' } }
let(:payload) { { group_id: 'group1' } }
let(:hook) { 'sync_group.consumer' }

it 'emits metrics consumer_sync_group' do
expect(registry.consumer_sync_group.values[key]).to be > 0
end

context 'with expection' do
let(:exception) { true }

it 'emits metrics to consumer_sync_group_errors' do
expect(registry.consumer_sync_group_errors.values[key]).to eq 1
end
end
end

context 'when a consumer is leaving a group' do
let(:key) { { client: 'test', group_id: 'group1' } }
let(:payload) { { group_id: 'group1' } }
let(:hook) { 'leave_group.consumer' }

it 'emits metrics consumer_leave_group' do
expect(registry.consumer_leave_group.values[key]).to be > 0
end

context 'with expection' do
let(:exception) { true }

it 'emits metrics to consumer_leave_group_errors' do
expect(registry.consumer_leave_group_errors.values[key]).to eq 1
end
end
end

context 'when a consumer pauses status' do
let(:key) { { client: 'test', group_id: 'group1', topic: 'AAA', partition: 4 } }
let(:payload) { { group_id: 'group1', topic: 'AAA', partition: 4, duration: 111 } }
let(:hook) { 'pause_status.consumer' }

it 'emits metrics to consumer_pause_duration' do
expect(registry.consumer_pause_duration.values[key]).to eq 111
end
end

context 'when a producer produces a message' do
let(:key) { { client: 'test', topic: 'AAA' } }
let(:payload) do
{
group_id: 'group1',
topic: 'AAA',
partition: 4,
buffer_size: 1000,
max_buffer_size: 10_000,
message_size: 123
}
end
let(:hook) { 'produce_message.producer' }

it 'emits metrics producer_produced_messages' do
expect(registry.producer_produced_messages.values[key]).to eq 1
end

it 'emits metric producer_message_size' do
expect(registry.producer_message_size.values[key]).to be > 0
end

it 'emits metric buffer_fill_ratio' do
expect(registry.producer_buffer_fill_ratio.values[{ client: 'test' }]).to be > 0
end
end

context 'when a producer gets topic error' do
let(:key) { { client: 'test', topic: 'AAA' } }
let(:payload) { { group_id: 'group1', topic: 'AAA' } }
let(:hook) { 'topic_error.producer' }

it 'emits metrics ack_error' do
expect(registry.producer_ack_errors.values[key]).to eq 1
end
end

context 'when a producer gets buffer overflow' do
let(:key) { { client: 'test', topic: 'AAA' } }
let(:payload) { { topic: 'AAA' } }
let(:hook) { 'buffer_overflow.producer' }

it 'emits metrics producer_produce_errors' do
expect(registry.producer_produce_errors.values[key]).to eq 1
end
end

context 'when a producer deliver_messages' do
let(:key) { { client: 'test' } }
let(:payload) { { delivered_message_count: 123, attempts: 2 } }
let(:hook) { 'deliver_messages.producer' }

it 'emits metrics producer_deliver_messages' do
expect(registry.producer_deliver_messages.values[key]).to eq 123
end

it 'emits metrics producer_deliver_attempts' do
expect(registry.producer_deliver_attempts.values[key]).to be > 0
end
end

context 'when a asynch producer enqueues a message' do
let(:key) { { client: 'test', topic: 'AAA' } }
let(:payload) { { group_id: 'group1', topic: 'AAA' } }
let(:hook) { 'topic_error.async_producer' }

it 'emits metrics async_producer_queue_size' do
expect(registry.async_producer_queue_size.values).not_to be_nil
end

it 'emits metrics async_producer_queue fill_ratio' do
expect(registry.async_producer_queue_fill_ratio.values).not_to be_nil
end
end

context 'when a asynch producer gets buffer overflow' do
let(:key) { { client: 'test', topic: 'AAA' } }
let(:payload) { { topic: 'AAA' } }
let(:hook) { 'buffer_overflow.async_producer' }

it 'emits metrics async_producer_produce_errors' do
expect(registry.async_producer_produce_errors.values[key]).to eq 1
end
end

context 'when a asynch producer gets dropped messages' do
let(:key) { { client: 'test' } }
let(:payload) { { message_count: 4 } }
let(:hook) { 'drop_messages.async_producer' }

it 'emits metrics async_producer_dropped_messages' do
expect(registry.async_producer_dropped_messages.values[key]).to eq 4
end
end
end
end
43 changes: 43 additions & 0 deletions yabeda-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

lib = File.expand_path('lib', __dir__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'yabeda/kafka/version'

Gem::Specification.new do |spec|
spec.name = 'yabeda-kafka'
spec.version = Yabeda::Kafka::VERSION
spec.authors = ['Dmitry Salahutdinov']
spec.email = ['dsalahutdinov@gmail.com']

spec.summary = 'Monitoring for ruby-kafka based on yabeda toolkit'
spec.description = 'Extends Yabeda metrics to collect ruby-kafka metrics'
spec.homepage = 'https://github.com/yabeda-rb/yabeda-kafka'
spec.license = 'MIT'

spec.metadata['homepage_uri'] = spec.homepage
spec.metadata['source_code_uri'] = 'https://github.com/yabeda-rb/yabeda-kafka'

# Specify which files should be added to the gem when it is released.
# The `git ls-files -z` loads the files in the RubyGem that have been added into git.
spec.files = Dir.chdir(File.expand_path(__dir__)) do
`git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
end
spec.bindir = 'exe'
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ['lib']

spec.add_dependency 'activesupport', '>= 4.0'
spec.add_dependency 'ruby-kafka'
spec.add_dependency 'yabeda'

# TODO: remove prometheus-client dependency for kafka/monitoring
# https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/prometheus.rb
spec.add_dependency 'prometheus-client'

spec.add_development_dependency 'bundler'
spec.add_development_dependency 'byebug'
spec.add_development_dependency 'rake', '~> 10.0'
spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_development_dependency 'rubocop', '~> 1.15'
end

0 comments on commit e829f0e

Please sign in to comment.