Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new configuration value to run multiple consumers in separate threads. #160

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,62 @@ def self.instrumenter
end
end

def self.mutex
@mutex ||= Mutex.new
end

def self.install_signal_handlers
return if @installed

# Stop the consumers on SIGINT, SIGQUIT or SIGTERM.
trap("QUIT") { stop }
trap("INT") { stop }
trap("TERM") { stop }

# Print the consumer config to STDERR on USR1.
trap("USR1") { $stderr.puts config.inspect }
@installed = true
end

def self.runners
@runners ||= []
end

def self.threads
@threads ||= []
end

def self.run(processor)
Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter).run
if config.threaded
run_threaded(processor)
else
@runners << Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter).tap(&:run)
end
end

def self.run_threaded(processor)
# Ensure signal-handlers installation.
install_signal_handlers

# Load the config specific to this processor.
configuration = mutex.synchronize { config.dup }
configuration.load_consumer_class(processor.class)
configuration.validate!

thr = Thread.new do
runner = Runner.new(
processor,
config: configuration,
logger: logger,
instrumenter: instrumenter,
)
mutex.synchronize { runners << runner }
runner.run
end
mutex.synchronize { threads << thr }
end

def self.stop
runners.each(&:stop)
end
end
39 changes: 26 additions & 13 deletions lib/racecar/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "fileutils"
require "racecar/rails_config_file_loader"
require "racecar/daemon"
require 'pry'

module Racecar
class Cli
Expand All @@ -13,30 +14,27 @@ def self.main(args)
def initialize(args)
@parser = build_parser
@parser.parse!(args)
@consumer_name = args.first or raise Racecar::Error, "no consumer specified"

if args.empty?
raise Racecar::Error, "no consumer specified"
else
@consumer_names = args
end
end

def config
Racecar.config
end

def run
$stderr.puts "=> Starting Racecar consumer #{consumer_name}..."
$stderr.puts "=> Starting Racecar consumer #{consumer_names.join(', ')}..."

RailsConfigFileLoader.load! unless config.without_rails?

if File.exist?("config/racecar.rb")
require "./config/racecar"
end

# Find the consumer class by name.
consumer_class = Kernel.const_get(consumer_name)

# Load config defined by the consumer class itself.
config.load_consumer_class(consumer_class)

config.validate!

if config.logfile
$stderr.puts "=> Logging to #{config.logfile}"
Racecar.logger = Logger.new(config.logfile)
Expand All @@ -58,9 +56,24 @@ def run
$stderr.puts "=> Ctrl-C to shutdown consumer"
end

processor = consumer_class.new
consumer_names.each do |consumer_name|
# Find the consumer class by name.
consumer_class = Kernel.const_get(consumer_name)

# conf = config.dup

# # Load config defined by the consumer class itself.
# conf.load_consumer_class(consumer_class)

# conf.validate!

processor = consumer_class.new

$stderr.puts "Starting consumer #{processor.class}"
Racecar.run(processor)
end

Racecar.run(processor)
Racecar.threads.each(&:join)
rescue => e
$stderr.puts "=> Crashed: #{e.class}: #{e}\n#{e.backtrace.join("\n")}"

Expand All @@ -71,7 +84,7 @@ def run

private

attr_reader :consumer_name
attr_reader :consumer_names

def daemonize!
daemon = Daemon.new(File.expand_path(config.pidfile))
Expand Down
8 changes: 6 additions & 2 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class Config < KingKonf::Config
desc "Whether to boot Rails when starting the consumer"
boolean :without_rails, default: false

desc "Run the racecar process with multiple threads and one thread per consumer"
boolean :threaded, default: false

# The error handler must be set directly on the object.
attr_reader :error_handler

Expand Down Expand Up @@ -202,15 +205,16 @@ def on_error(&handler)
end

def rdkafka_consumer
consumer_config = consumer.map do |param|
# TODO: Investigate the intermittent nil values in consumer.
consumer_config = consumer.compact.map do |param|
param.split("=", 2).map(&:strip)
end.to_h
consumer_config.merge!(rdkafka_security_config)
consumer_config
end

def rdkafka_producer
producer_config = producer.map do |param|
producer_config = producer.compact.map do |param|
param.split("=", 2).map(&:strip)
end.to_h
producer_config.merge!(rdkafka_security_config)
Expand Down
8 changes: 4 additions & 4 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def initialize(config, logger, instrumenter = NullInstrumenter)

def poll(timeout_ms)
maybe_select_next_consumer
started_at ||= Time.now
started_at ||= Process.clock_gettime(Process::CLOCK_MONOTONIC)
try ||= 0
remain ||= timeout_ms

Expand Down Expand Up @@ -43,7 +43,7 @@ def poll(timeout_ms)

# XXX: messages are not guaranteed to be from the same partition
def batch_poll(timeout_ms)
@batch_started_at = Time.now
@batch_started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@messages = []
while collect_messages_for_batch? do
remain = remaining_time_ms(timeout_ms, @batch_started_at)
Expand Down Expand Up @@ -162,7 +162,7 @@ def commit_rescue_no_offset(consumer)

def collect_messages_for_batch?
@messages.size < @config.fetch_messages &&
(Time.now - @batch_started_at) < @config.max_wait_time
(Process.clock_gettime(Process::CLOCK_MONOTONIC) - @batch_started_at) < @config.max_wait_time
end

def rdkafka_config(subscription)
Expand Down Expand Up @@ -190,7 +190,7 @@ def rdkafka_config(subscription)
end

def remaining_time_ms(limit_ms, started_at_time)
r = limit_ms - ((Time.now - started_at_time)*1000).round
r = limit_ms - ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at_time)*1000)
r <= 0 ? 0 : r
end
end
Expand Down
6 changes: 4 additions & 2 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def setup_pauses
end

def run
install_signal_handlers
install_signal_handlers unless config.threaded

@stop_requested = false

# Configure the consumer with a producer so it can produce messages and
Expand Down Expand Up @@ -76,13 +77,14 @@ def run
end
end

logger.info "Gracefully shutting down"
logger.info "Gracefully shutting down #{processor.class}"
processor.deliver!
processor.teardown
consumer.commit
@instrumenter.instrument('leave_group') do
consumer.close
end
logger.info "Graceful shutdown of #{processor.class} completed"
end

def stop
Expand Down
24 changes: 11 additions & 13 deletions spec/consumer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def message_generator(messages)

expect { consumer_set.poll(1000) }.to raise_error(Rdkafka::RdkafkaError)

expect(rdconsumer).to have_received(:poll).exactly(4).times
expect(rdconsumer).to have_received(:poll).at_least(:twice).with(less_than(1000))
end

it "skips retries if rescue block was too slow" do
Expand All @@ -160,25 +160,23 @@ def message_generator(messages)
end
end

RSpec::Matchers.define :less_than do |x|
match { |actual| (actual < x) }
end

describe "#batch_poll" do
it "honors timeout on subsequent polls" do
Timecop.freeze do
allow(consumer_set).to receive(:poll) do
Timecop.freeze(Time.now + 0.1)
:fake_msg
end

consumer_set.batch_poll(150)

expect(consumer_set).to have_received(:poll).ordered.with(150)
expect(consumer_set).to have_received(:poll).ordered.with(50)
expect(consumer_set).to have_received(:poll).twice
allow(consumer_set).to receive(:poll) do
:fake_msg
end

consumer_set.batch_poll(150)
expect(consumer_set).to have_received(:poll).at_least(10).with(less_than(150))
end

it "forwards to Rdkafka (as poll)" do
config.fetch_messages = 3
expect(rdconsumer).to receive(:poll).exactly(3).times.with(100).and_return(:msg1, :msg2, :msg3)
expect(rdconsumer).to receive(:poll).exactly(3).times.with(less_than(100)).and_return(:msg1, :msg2, :msg3)
expect(consumer_set.batch_poll(100)).to eq [:msg1, :msg2, :msg3]
end

Expand Down