diff --git a/bin/liveness_probe b/bin/liveness_probe new file mode 100755 index 00000000..2035d8ad --- /dev/null +++ b/bin/liveness_probe @@ -0,0 +1,56 @@ +#!/usr/bin/env -S ruby --disable=gems + +$file_path = ENV.fetch("RACECAR_LIVENESS_PROBE_FILE_PATH", "") +$liveness_max_interval = Integer(ENV.fetch("RACECAR_LIVENESS_PROBE_MAX_INTERVAL")) rescue nil + +file_path_and_max_interval_set? or exit(1) +liveness_file_exists? or exit(1) +liveness_within_timeout_interval? or exit(1) + +# Didn't exit so all is well +$stderr.puts("Racecar healthcheck OK #{elapased_since_liveness_event}") +exit(0) + + +BEGIN { + def liveness_within_timeout_interval? + result = elapased_since_liveness_event < $liveness_max_interval + if result + true + else + $stdout.puts( + "Racecar healthcheck failed: No liveness within interval #{$liveness_max_interval}s. Last liveness at #{last_liveness_event_at}, #{elapased_since_liveness_event} seconds ago." + ) + false + end + end + + def liveness_file_exists? + result = File.exist?($file_path) + if result + true + else + $stdout.puts("Racecar healthcheck failed: File not found RACECAR_LIVENESS_PROBE_FILE_PATH=#{$file_path}") + false + end + end + + def file_path_and_max_interval_set? + if !$file_path.empty? && $liveness_max_interval.is_a?(Integer) + true + else + $stdout.puts( + "Racecar healthcheck failed: Bad values RACECAR_LIVENESS_PROBE_FILE_PATH=`#{$file_path}` RACECAR_LIVENESS_PROBE_MAX_INTERVAL=`#{$liveness_max_interval}`" + ) + false + end + end + + def elapased_since_liveness_event + Time.now - last_liveness_event_at + end + + def last_liveness_event_at + File.mtime($file_path) + end +} diff --git a/lib/racecar/cli.rb b/lib/racecar/cli.rb index 76081eec..e0951788 100644 --- a/lib/racecar/cli.rb +++ b/lib/racecar/cli.rb @@ -5,6 +5,7 @@ require "fileutils" require "racecar/rails_config_file_loader" require "racecar/daemon" +require "racecar/liveness_probe" module Racecar class Cli @@ -58,6 +59,11 @@ def run $stderr.puts "=> Ctrl-C to shutdown consumer" end + if config.liveness_probe_file_path + $stderr.puts "=> Liveness probe enabled" + config.install_liveness_probe + end + processor = consumer_class.new Racecar.run(processor) nil diff --git a/lib/racecar/config.rb b/lib/racecar/config.rb index ef64ea0f..4452211a 100644 --- a/lib/racecar/config.rb +++ b/lib/racecar/config.rb @@ -167,6 +167,12 @@ class Config < KingKonf::Config for backward compatibility, however this can be quite memory intensive" integer :statistics_interval, default: 1 + desc "Path to a file Racecar will touch to show liveness" + string :liveness_probe_file_path + + desc "Used only by the liveness probe script: Max time (in seconds) between liveness events before process is considered not live" + string :liveness_probe_max_interval + # The error handler must be set directly on the object. attr_reader :error_handler @@ -266,6 +272,14 @@ def instrumenter end attr_writer :instrumenter + def install_liveness_probe + require "active_support/notifications" + @liveness_probe = LivenessProbe + .new(ActiveSupport::Notifications, liveness_probe_file_path) + .tap(&:install) + end + attr_reader :liveness_probe + private def rdkafka_security_config diff --git a/lib/racecar/liveness_probe.rb b/lib/racecar/liveness_probe.rb new file mode 100644 index 00000000..2cd87a1c --- /dev/null +++ b/lib/racecar/liveness_probe.rb @@ -0,0 +1,54 @@ +require "fileutils" + +module Racecar + class LivenessProbe + def initialize(message_bus, file_path) + @message_bus = message_bus + @file_path = file_path + @subscribers = [] + end + + attr_reader :message_bus, :file_path, :subscribers + + def install + unless file_path && file_writeable? + raise( + "Liveness probe configuration error: `liveness_probe_file_path` must be set to a writable file path.\n" \ + " Set `RACECAR_LIVENESS_PROBE_FILE_PATH` and `RACECAR_LIVENESS_MAX_INTERVAL` environment variables." + ) + end + + subscribers << message_bus.subscribe("start_main_loop.racecar") do + touch_liveness_file + end + + subscribers = message_bus.subscribe("shut_down.racecar") do + delete_livess_file + end + + nil + end + + def uninstall + subscribers.each { |s| message_bus.unsubscribe(s) } + end + + private + + def touch_liveness_file + FileUtils.touch(file_path) + end + + def delete_livess_file + FileUtils.rm_rf(file_path) + end + + def file_writeable? + File.write(file_path, "") + File.unlink(file_path) + true + rescue + false + end + end +end diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index f0555748..0536a5f7 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -67,6 +67,8 @@ def run loop do break if @stop_requested resume_paused_partitions + + @instrumenter.instrument("start_main_loop", instrumentation_payload) @instrumenter.instrument("main_loop", instrumentation_payload) do case process_method when :batch then @@ -94,6 +96,7 @@ def run ensure producer.close Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog") + @instrumenter.instrument("shut_down", instrumentation_payload || {}) end def stop diff --git a/spec/integration/kubernetes_probes_spec.rb b/spec/integration/kubernetes_probes_spec.rb new file mode 100644 index 00000000..3105e77b --- /dev/null +++ b/spec/integration/kubernetes_probes_spec.rb @@ -0,0 +1,149 @@ +# frozen_string_literal: true + +require "timeout" +require "securerandom" +require "active_support/notifications" +require "racecar/cli" + +RSpec.describe "kubernetes probes", type: :integration do + describe "liveness probe" do + before do + set_env_vars + reset_config + + input_topic = generate_output_topic_name + create_topic(topic: input_topic) + consumer_class.subscribes_to(input_topic) + Thread.current.name = "main" + end + + after do + stop_racecar + ensure_liveness_file_is_deleted + unset_env_vars + reset_notifications + end + + it "initially fails and passes when the main loop starts" do + expect(Pathname.new(liveness_file_path)).not_to be_readable + expect(run_probe).to be false + + start_racecar + wait_for_main_loop + + expect(run_probe).to be true + end + + it "fails if processing stalls for too long" do + start_racecar + wait_for_main_loop + + expect(run_probe).to be true + + stall_processing(liveness_timeout * 1.1) + + expect(run_probe).to be false + end + + context "even when the timeout is long" do + let(:liveness_timeout) { 10 } + + it "fails immediately after stopping" do + start_racecar + wait_for_main_loop + + stop_racecar + + expect(run_probe).to be false + end + end + + let(:liveness_file_path) { "/tmp/racecar-liveness-file-#{SecureRandom.hex(4)}" } + let(:liveness_timeout) { 1 } + let(:racecar_cli) { Racecar::Cli.new([consumer_class.name.to_s]) } + + let(:consumer_class) do + NoOpConsumer = Class.new(Racecar::Consumer) do + self.group_id = "soon-to-be-ready" + + define_method :process do |_message| + end + end + end + + def start_racecar + @cli_run_thread = Thread.new { Thread.current.name = "cli thread"; racecar_cli.run } + end + + def stop_racecar + Process.kill("INT", Process.pid) + if @cli_run_thread.alive? + @cli_run_thread.wakeup + @cli_run_thread.join(2) + @cli_run_thread.terminate + end + end + + def run_probe + command = "bin/liveness_probe" + output, status = Open3.capture2e(env_vars, command) + status.success? + end + + def stall_processing(time) + test_thread = Thread.current + subscriber = ActiveSupport::Notifications.subscribe(/main_loop/) do |event, *_| + ActiveSupport::Notifications.unsubscribe(subscriber) + + sleep(time) + test_thread.wakeup + sleep + end + + sleep_with_timeout + end + + def wait_for_main_loop + test_thread = Thread.current + subscriber = ActiveSupport::Notifications.subscribe(/main_loop/) do |event, *_| + ActiveSupport::Notifications.unsubscribe(subscriber) + + test_thread.wakeup + end + + sleep_with_timeout + end + + def sleep_with_timeout(max_sleep = 2) + Timeout.timeout(max_sleep) { sleep } + end + end + + def set_env_vars + env_vars.each { |k,v| ENV[k] = v } + end + + def unset_env_vars + env_vars.each { |k,_v| ENV.delete(k) } + end + + def env_vars + { + "RACECAR_LIVENESS_PROBE_FILE_PATH" => liveness_file_path, + "RACECAR_LIVENESS_PROBE_MAX_INTERVAL" => liveness_timeout.to_s, + } + end + + def ensure_liveness_file_is_deleted + File.unlink(liveness_file_path) if File.exist?(liveness_file_path) + end + + def reset_config + Racecar.config = Racecar::Config.new + Racecar.config.max_wait_time = 0.05 + end + + def reset_notifications + Racecar.config.liveness_probe.uninstall + end +end diff --git a/spec/liveness_probe_spec.rb b/spec/liveness_probe_spec.rb new file mode 100644 index 00000000..90b707b3 --- /dev/null +++ b/spec/liveness_probe_spec.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require "racecar/liveness_probe" +require "active_support/notifications" + +RSpec.describe Racecar::LivenessProbe do + let(:probe) { Racecar::LivenessProbe.new(message_bus, file_path_string) } + let(:file_path) { Pathname.new(file_path_string) } + let(:file_path_string) { "/tmp/liveness-probe-test-#{SecureRandom.hex(4)}" } + let(:message_bus) { ActiveSupport::Notifications } + + after do + ActiveSupport::Notifications.notifier = ActiveSupport::Notifications::Fanout.new + FileUtils.rm_rf(file_path) + end + + describe "#install" do + context "when the file is writeable" do + it "touches the file on 'start_main_loop'" do + probe.install + + expect { message_bus.publish("start_main_loop.racecar") }.to(change { file_path.exist? }.from(false).to(true)) + end + + it "deletes the file on 'shut_down'" do + probe.install + + FileUtils.touch(file_path) + + expect { message_bus.publish("shut_down.racecar") }.to(change { file_path.exist? }.from(true).to(false)) + end + end + + context "when the file is not writable" do + context "because the directory does not exist" do + let(:file_path_string) { "directory-does-not-exist/liveness-file" } + + it "raises a helpful message" do + expect { probe.install }.to(raise_error(/Liveness probe configuration error/)) + end + end + + context "because the process does not have write permissions" do + let(:file_path_string) { "/etc" } + + it "raises a helpful message" do + expect { probe.install }.to(raise_error(/Liveness probe configuration error/)) + end + end + end + end + + describe "#uninstall" do + before { probe.install } + + it "stops the probe " do + probe.uninstall + + + expect(file_path).not_to exist + end + + def trigger_probe + message_bus.instrument("start_main_loop.racecar") + end + end +end