Skip to content

Commit

Permalink
File based Kubernetes liveness probe
Browse files Browse the repository at this point in the history
- Triggered by instrumentation events
- The probe is first class object
- The check script is Ruby 😁
  • Loading branch information
bestie committed Mar 7, 2023
1 parent 2709291 commit 40be758
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 0 deletions.
56 changes: 56 additions & 0 deletions bin/liveness_probe
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env -S ruby --disable=gems

$file_path = ENV.fetch("RACECAR_LIVENESS_PROBE_FILE_PATH", "")
$liveness_max_interval = Integer(ENV.fetch("RACECAR_LIVENESS_PROBE_MAX_INTERVAL")) rescue nil

file_path_and_max_interval_set? or exit(1)
liveness_file_exists? or exit(1)
liveness_within_timeout_interval? or exit(1)

# Didn't exit so all is well
$stderr.puts("Racecar healthcheck OK #{elapased_since_liveness_event}")
exit(0)


BEGIN {
def liveness_within_timeout_interval?
result = elapased_since_liveness_event < $liveness_max_interval
if result
true
else
$stdout.puts(
"Racecar healthcheck failed: No liveness within interval #{$liveness_max_interval}s. Last liveness at #{last_liveness_event_at}, #{elapased_since_liveness_event} seconds ago."
)
false
end
end

def liveness_file_exists?
result = File.exist?($file_path)
if result
true
else
$stdout.puts("Racecar healthcheck failed: File not found RACECAR_LIVENESS_PROBE_FILE_PATH=#{$file_path}")
false
end
end

def file_path_and_max_interval_set?
if !$file_path.empty? && $liveness_max_interval.is_a?(Integer)
true
else
$stdout.puts(
"Racecar healthcheck failed: Bad values RACECAR_LIVENESS_PROBE_FILE_PATH=`#{$file_path}` RACECAR_LIVENESS_PROBE_MAX_INTERVAL=`#{$liveness_max_interval}`"
)
false
end
end

def elapased_since_liveness_event
Time.now - last_liveness_event_at
end

def last_liveness_event_at
File.mtime($file_path)
end
}
6 changes: 6 additions & 0 deletions lib/racecar/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "fileutils"
require "racecar/rails_config_file_loader"
require "racecar/daemon"
require "racecar/liveness_probe"

module Racecar
class Cli
Expand Down Expand Up @@ -58,6 +59,11 @@ def run
$stderr.puts "=> Ctrl-C to shutdown consumer"
end

if config.liveness_probe_file_path
$stderr.puts "=> Liveness probe enabled"
config.install_liveness_probe
end

processor = consumer_class.new
Racecar.run(processor)
nil
Expand Down
14 changes: 14 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ class Config < KingKonf::Config
for backward compatibility, however this can be quite memory intensive"
integer :statistics_interval, default: 1

desc "Path to a file Racecar will touch to show liveness"
string :liveness_probe_file_path

desc "Used only by the liveness probe script: Max time (in seconds) between liveness events before process is considered not live"
string :liveness_probe_max_interval

# The error handler must be set directly on the object.
attr_reader :error_handler

Expand Down Expand Up @@ -266,6 +272,14 @@ def instrumenter
end
attr_writer :instrumenter

def install_liveness_probe
require "active_support/notifications"
@liveness_probe = LivenessProbe
.new(ActiveSupport::Notifications, liveness_probe_file_path)
.tap(&:install)
end
attr_reader :liveness_probe

private

def rdkafka_security_config
Expand Down
54 changes: 54 additions & 0 deletions lib/racecar/liveness_probe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require "fileutils"

module Racecar
class LivenessProbe
def initialize(message_bus, file_path)
@message_bus = message_bus
@file_path = file_path
@subscribers = []
end

attr_reader :message_bus, :file_path, :subscribers

def install
unless file_path && file_writeable?
raise(
"Liveness probe configuration error: `liveness_probe_file_path` must be set to a writable file path.\n" \
" Set `RACECAR_LIVENESS_PROBE_FILE_PATH` and `RACECAR_LIVENESS_MAX_INTERVAL` environment variables."
)
end

subscribers << message_bus.subscribe("start_main_loop.racecar") do
touch_liveness_file
end

subscribers = message_bus.subscribe("shut_down.racecar") do
delete_livess_file
end

nil
end

def uninstall
subscribers.each { |s| message_bus.unsubscribe(s) }
end

private

def touch_liveness_file
FileUtils.touch(file_path)
end

def delete_livess_file
FileUtils.rm_rf(file_path)
end

def file_writeable?
File.write(file_path, "")
File.unlink(file_path)
true
rescue
false
end
end
end
3 changes: 3 additions & 0 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def run
loop do
break if @stop_requested
resume_paused_partitions

@instrumenter.instrument("start_main_loop", instrumentation_payload)
@instrumenter.instrument("main_loop", instrumentation_payload) do
case process_method
when :batch then
Expand Down Expand Up @@ -94,6 +96,7 @@ def run
ensure
producer.close
Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog")
@instrumenter.instrument("shut_down", instrumentation_payload || {})
end

def stop
Expand Down
149 changes: 149 additions & 0 deletions spec/integration/kubernetes_probes_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# frozen_string_literal: true

require "timeout"
require "securerandom"
require "active_support/notifications"
require "racecar/cli"

RSpec.describe "kubernetes probes", type: :integration do
describe "liveness probe" do
before do
set_env_vars
reset_config

input_topic = generate_output_topic_name
create_topic(topic: input_topic)
consumer_class.subscribes_to(input_topic)
Thread.current.name = "main"
end

after do
stop_racecar
ensure_liveness_file_is_deleted
unset_env_vars
reset_notifications
end

it "initially fails and passes when the main loop starts" do
expect(Pathname.new(liveness_file_path)).not_to be_readable
expect(run_probe).to be false

start_racecar
wait_for_main_loop

expect(run_probe).to be true
end

it "fails if processing stalls for too long" do
start_racecar
wait_for_main_loop

expect(run_probe).to be true

stall_processing(liveness_timeout * 1.1)

expect(run_probe).to be false
end

context "even when the timeout is long" do
let(:liveness_timeout) { 10 }

it "fails immediately after stopping" do
start_racecar
wait_for_main_loop

stop_racecar

expect(run_probe).to be false
end
end

let(:liveness_file_path) { "/tmp/racecar-liveness-file-#{SecureRandom.hex(4)}" }
let(:liveness_timeout) { 1 }
let(:racecar_cli) { Racecar::Cli.new([consumer_class.name.to_s]) }

let(:consumer_class) do
NoOpConsumer = Class.new(Racecar::Consumer) do
self.group_id = "soon-to-be-ready"

define_method :process do |_message|
end
end
end

def start_racecar
@cli_run_thread = Thread.new { Thread.current.name = "cli thread"; racecar_cli.run }
end

def stop_racecar
Process.kill("INT", Process.pid)
if @cli_run_thread.alive?
@cli_run_thread.wakeup
@cli_run_thread.join(2)
@cli_run_thread.terminate
end
end

def run_probe
command = "bin/liveness_probe"
output, status = Open3.capture2e(env_vars, command)
status.success?
end

def stall_processing(time)
test_thread = Thread.current
subscriber = ActiveSupport::Notifications.subscribe(/main_loop/) do |event, *_|
ActiveSupport::Notifications.unsubscribe(subscriber)

sleep(time)
test_thread.wakeup
sleep
end

sleep_with_timeout
end

def wait_for_main_loop
test_thread = Thread.current
subscriber = ActiveSupport::Notifications.subscribe(/main_loop/) do |event, *_|
ActiveSupport::Notifications.unsubscribe(subscriber)

test_thread.wakeup
end

sleep_with_timeout
end

def sleep_with_timeout(max_sleep = 2)
Timeout.timeout(max_sleep) { sleep }
end
end

def set_env_vars
env_vars.each { |k,v| ENV[k] = v }
end

def unset_env_vars
env_vars.each { |k,_v| ENV.delete(k) }
end

def env_vars
{
"RACECAR_LIVENESS_PROBE_FILE_PATH" => liveness_file_path,
"RACECAR_LIVENESS_PROBE_MAX_INTERVAL" => liveness_timeout.to_s,
}
end

def ensure_liveness_file_is_deleted
File.unlink(liveness_file_path) if File.exist?(liveness_file_path)
end

def reset_config
Racecar.config = Racecar::Config.new
Racecar.config.max_wait_time = 0.05
end

def reset_notifications
Racecar.config.liveness_probe.uninstall
end
end
67 changes: 67 additions & 0 deletions spec/liveness_probe_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

require "racecar/liveness_probe"
require "active_support/notifications"

RSpec.describe Racecar::LivenessProbe do
let(:probe) { Racecar::LivenessProbe.new(message_bus, file_path_string) }
let(:file_path) { Pathname.new(file_path_string) }
let(:file_path_string) { "/tmp/liveness-probe-test-#{SecureRandom.hex(4)}" }
let(:message_bus) { ActiveSupport::Notifications }

after do
ActiveSupport::Notifications.notifier = ActiveSupport::Notifications::Fanout.new
FileUtils.rm_rf(file_path)
end

describe "#install" do
context "when the file is writeable" do
it "touches the file on 'start_main_loop'" do
probe.install

expect { message_bus.publish("start_main_loop.racecar") }.to(change { file_path.exist? }.from(false).to(true))
end

it "deletes the file on 'shut_down'" do
probe.install

FileUtils.touch(file_path)

expect { message_bus.publish("shut_down.racecar") }.to(change { file_path.exist? }.from(true).to(false))
end
end

context "when the file is not writable" do
context "because the directory does not exist" do
let(:file_path_string) { "directory-does-not-exist/liveness-file" }

it "raises a helpful message" do
expect { probe.install }.to(raise_error(/Liveness probe configuration error/))
end
end

context "because the process does not have write permissions" do
let(:file_path_string) { "/etc" }

it "raises a helpful message" do
expect { probe.install }.to(raise_error(/Liveness probe configuration error/))
end
end
end
end

describe "#uninstall" do
before { probe.install }

it "stops the probe " do
probe.uninstall


expect(file_path).not_to exist
end

def trigger_probe
message_bus.instrument("start_main_loop.racecar")
end
end
end

0 comments on commit 40be758

Please sign in to comment.