From 5d41096af7f883bbfe41bef2d154ee746bc70d14 Mon Sep 17 00:00:00 2001 From: Stefan Breunig Date: Wed, 24 Feb 2021 16:02:00 +0100 Subject: [PATCH] create a racecar delivery handle wrapper to provide context in error messages --- docker-compose.yml | 2 +- lib/racecar/consumer.rb | 27 ++++----- lib/racecar/ctl.rb | 4 +- lib/racecar/message_delivery_error.rb | 21 +++---- lib/racecar/message_delivery_handle.rb | 48 ++++++++++++++++ spec/integration/consumer_spec.rb | 76 ++++++++++++++++++++++++++ spec/message_delivery_error_spec.rb | 21 +++++-- spec/runner_spec.rb | 2 +- 8 files changed, 167 insertions(+), 34 deletions(-) create mode 100644 lib/racecar/message_delivery_handle.rb diff --git a/docker-compose.yml b/docker-compose.yml index dc0fa86d..05d2f8e7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '2' +version: '2.1' services: zookeeper: image: confluentinc/cp-zookeeper:5.5.1 diff --git a/lib/racecar/consumer.rb b/lib/racecar/consumer.rb index 11e0cacb..554f0b2c 100644 --- a/lib/racecar/consumer.rb +++ b/lib/racecar/consumer.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "racecar/message_delivery_error" +require "racecar/message_delivery_handle" module Racecar class Consumer @@ -36,7 +37,7 @@ def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) @producer = producer - @delivery_handles = [] + @message_delivery_handles = [] @consumer = consumer @@ -52,12 +53,12 @@ def teardown; end # (e.g. downtime, configuration issue) or specific to the message being sent. The # caller must handle the latter cases or run into head of line blocking. def deliver! - @delivery_handles ||= [] - if @delivery_handles.any? - instrumentation_payload = { delivered_message_count: @delivery_handles.size } + @message_delivery_handles ||= [] + if @message_delivery_handles.any? + instrumentation_payload = { delivered_message_count: @message_delivery_handles.size } @instrumenter.instrument('deliver_messages', instrumentation_payload) do - @delivery_handles.each do |handle| + @message_delivery_handles.each do |handle| # rdkafka-ruby checks every wait_timeout seconds if the message was # successfully delivered, up to max_wait_timeout seconds before raising # Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to @@ -69,25 +70,24 @@ def deliver! # changing the interface). handle.wait(max_wait_timeout: 60, wait_timeout: 0.1) rescue Rdkafka::AbstractHandle::WaitTimeoutError => e - partition = MessageDeliveryError.partition_from_delivery_handle(handle) # ideally we could use the logger passed to the Runner, but it is not # available here. The runner sets it for Rdkafka, though, so we can use # that instead. - @config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)" + @config.logger.debug "Still trying to deliver message to (partition #{handle.partition_text})... (will try up to Racecar.config.message_timeout)" retry rescue Rdkafka::RdkafkaError => e raise MessageDeliveryError.new(e, handle) end end end - @delivery_handles.clear + @message_delivery_handles.clear end protected # https://github.com/appsignal/rdkafka-ruby#producing-messages def produce(payload, topic:, key: nil, partition_key: nil, headers: nil, create_time: nil) - @delivery_handles ||= [] + @message_delivery_handles ||= [] message_size = payload.respond_to?(:bytesize) ? payload.bytesize : 0 instrumentation_payload = { value: payload, @@ -97,18 +97,19 @@ def produce(payload, topic:, key: nil, partition_key: nil, headers: nil, create_ topic: topic, message_size: message_size, create_time: Time.now, - buffer_size: @delivery_handles.size, + buffer_size: @message_delivery_handles.size, } @instrumenter.instrument("produce_message", instrumentation_payload) do - @delivery_handles << @producer.produce( + params = { topic: topic, - payload: payload, key: key, partition_key: partition_key, timestamp: create_time, headers: headers, - ) + } + handle = @producer.produce(payload: payload, **params) + @message_delivery_handles << MessageDeliveryHandle.new(handle, **params) end end diff --git a/lib/racecar/ctl.rb b/lib/racecar/ctl.rb index 0830d7a8..c4eef7b4 100644 --- a/lib/racecar/ctl.rb +++ b/lib/racecar/ctl.rb @@ -4,6 +4,7 @@ require "racecar/rails_config_file_loader" require "racecar/daemon" require "racecar/message_delivery_error" +require "racecar/message_delivery_handle" module Racecar class Ctl @@ -106,7 +107,8 @@ def produce(args) begin handle.wait(max_wait_timeout: Racecar.config.message_timeout) rescue Rdkafka::RdkafkaError => e - raise MessageDeliveryError.new(e, handle) + wrapped_handle = MessageDeliveryHandle.new(handle, key: message.key, topic: message.topic, key: nil, partition_key: nil , timestamp: nil, headers: nil) + raise MessageDeliveryError.new(e, wrapped_handle) end $stderr.puts "=> Delivered message to Kafka cluster" diff --git a/lib/racecar/message_delivery_error.rb b/lib/racecar/message_delivery_error.rb index fbc392dd..b1b41baf 100644 --- a/lib/racecar/message_delivery_error.rb +++ b/lib/racecar/message_delivery_error.rb @@ -1,22 +1,17 @@ # frozen_string_literal: true +require "racecar/message_delivery_handle" + module Racecar # MessageDeliveryError wraps an Rdkafka error and tries to give # specific hints on how to debug or resolve the error within the # Racecar context. class MessageDeliveryError < StandardError - # partition_from_delivery_handle takes an rdkafka delivery handle - # and returns a human readable version of the partition. It handles - # the case where the partition is unknown. - def self.partition_from_delivery_handle(delivery_handle) - partition = delivery_handle&.create_result&.partition - # -1 is rdkafka-ruby's default value, which gets eventually set by librdkafka - return "no yet known" if partition.nil? || partition == -1 - partition.to_s - end - def initialize(rdkafka_error, delivery_handle) raise rdkafka_error unless rdkafka_error.is_a?(Rdkafka::RdkafkaError) + if !delivery_handle.is_a?(Racecar::MessageDeliveryHandle) + raise TypeError, "expected a Racecar::MessageDeliveryHandle, got #{delivery_handle.class}" + end @rdkafka_error = rdkafka_error @delivery_handle = delivery_handle @@ -66,10 +61,8 @@ def explain EOM when :unknown_topic_or_part # 3 - partition = self.class.partition_from_delivery_handle(@delivery_handle) - <<~EOM - Could not deliver message, since the targeted topic or partition (#{partition}) does not exist. + Could not deliver message, since the targeted topic (#{@delivery_handle.topic}) or partition (#{@delivery_handle.partition_text}) does not exist. Check that there are no typos, or that the broker's "auto.create.topics.enable" is enabled. For freshly created topics with auto create enabled, this may appear in the beginning (race condition on creation and publishing). @@ -92,7 +85,7 @@ def explain when :topic_authorization_failed # 29 <<~EOM - Failed to deliver message because of insufficient authorization to write into the topic. + Failed to deliver message because of insufficient authorization to write into the topic "#{@delivery_handle.topic}". Double check that it is not a race condition on topic creation. If it isn't, verify the ACLs are correct. diff --git a/lib/racecar/message_delivery_handle.rb b/lib/racecar/message_delivery_handle.rb new file mode 100644 index 00000000..6a3fc1e5 --- /dev/null +++ b/lib/racecar/message_delivery_handle.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +require "forwardable" + +module Racecar + # MessageDeliveryHandle is a wrapper around Rdkafka::Producer::DeliveryHandle, mostly + # to keep context around. + class MessageDeliveryHandle + extend Forwardable + + attr_reader :rdkafka_handle, :topic, :key, :partition_key, :timestamp, :headers + + def initialize(rdkafka_handle, topic:, key:, partition_key:, timestamp:, headers:) + if !rdkafka_handle.is_a?(Rdkafka::Producer::DeliveryHandle) + raise TypeError, "expected a Rdkafka::Producer::DeliveryHandle, got #{rdkafka_handle.class}" + end + + @rdkafka_handle = rdkafka_handle + @topic = topic + @key = key + @partition_key = partition_key + @timestamp = timestamp + @headers = headers + end + + def_delegators :@rdkafka_handle, :wait, :pending? + + # offset returns the offset of the delivered message. If the offset is not yet + # known it will be set to -1. + def offset + @rdkafka_handle.create_result.offset || -1 + end + + # partition returns the assigned partition of the message. If the partition is not + # yet known it will be set to -1. + def partition + @rdkafka_handle.create_result.partition || -1 + end + + # partition text returns a string describing the partition of the message. If the + # partition is not yet known, it will return a readable message saying so. + def partition_text + part = partition + return "no yet known" if part == -1 + part.to_s + end + end +end diff --git a/spec/integration/consumer_spec.rb b/spec/integration/consumer_spec.rb index f3564b60..0b5caca9 100644 --- a/spec/integration/consumer_spec.rb +++ b/spec/integration/consumer_spec.rb @@ -13,6 +13,82 @@ class NoProcessConsumer < Racecar::Consumer end RSpec.describe "running a Racecar consumer", type: :integration do + context "produce" do + let(:instrumenter) { Racecar::NullInstrumenter } + let(:rdkafka_delivery_handle) { Rdkafka::Producer::DeliveryHandle.new } + let(:rdkafka_producer) { instance_double(Rdkafka::Producer, produce: rdkafka_delivery_handle, close: nil) } + let(:consumer) do + Racecar::Consumer.new.tap do |c| + c.configure(producer: rdkafka_producer, consumer: nil, instrumenter: instrumenter) + end + end + + before { Timecop.freeze } + after { Timecop.return } + + it "instruments" do + allow(instrumenter).to receive(:instrument).and_call_original + + consumer.send(:produce, "a_payload", + topic: "a_topic", + key: "a_key", + partition_key: "a_part_key", + headers: "some_headers", + create_time: Time.parse("2000-01-01") + ) + + expect(instrumenter).to have_received(:instrument).with( + "produce_message", + buffer_size: 0, + create_time: Time.now, + headers: "some_headers", + key: "a_key", + message_size: "a_payload".size, + partition_key: "a_part_key", + topic: "a_topic", + value: "a_payload", + ) + end + + it "passes correct fields to rdkakfa" do + timestamp = Time.parse("2000-01-01") + + consumer.send(:produce, "a_payload", + topic: "a_topic", + key: "a_key", + partition_key: "a_part_key", + headers: "some_headers", + create_time: timestamp + ) + + expect(rdkafka_producer).to have_received(:produce).with( + timestamp: timestamp, + headers: "some_headers", + key: "a_key", + partition_key: "a_part_key", + topic: "a_topic", + payload: "a_payload", + ) + end + + it "creates sensible message delivery handles" do + timestamp = Time.parse("2000-01-01") + + consumer.send(:produce, "a_payload", + topic: "a_topic", + key: "a_key", + partition_key: "a_part_key", + headers: "some_headers", + create_time: timestamp + ) + + handles = consumer.instance_variable_get(:@message_delivery_handles) + expect(handles.size).to eq 1 + expect(handles.first).to be_kind_of Racecar::MessageDeliveryHandle + expect(handles.first.topic).to eq "a_topic" + end + end + context "when an error occurs trying to start the runner" do context "when there are no subscriptions" do it "raises an exception" do diff --git a/spec/message_delivery_error_spec.rb b/spec/message_delivery_error_spec.rb index 4b51591d..99df9f3c 100644 --- a/spec/message_delivery_error_spec.rb +++ b/spec/message_delivery_error_spec.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "racecar/message_delivery_handle" + RSpec.describe Racecar::MessageDeliveryError do let(:rdkafka_msg_timed_out) { Rdkafka::RdkafkaError.new(-192) } let(:rdkafka_unknown_topic_or_part) { Rdkafka::RdkafkaError.new(3) } @@ -9,18 +11,29 @@ dh[:offset] = 42 end end + let(:racecar_delivery_handle) do + Racecar::MessageDeliveryHandle.new( + rdkafka_delivery_handle, + topic: "a_topic", + key: "a_key", + partition_key: "a_partition_key", + timestamp: "a_timestamp", + headers: "some_headers" + ) + end it "passes through error code" do - error = described_class.new(rdkafka_msg_timed_out, rdkafka_delivery_handle) + error = described_class.new(rdkafka_msg_timed_out, racecar_delivery_handle) expect(error.code).to eq rdkafka_msg_timed_out.code end it "includes partition of delivery handle" do - error = described_class.new(rdkafka_unknown_topic_or_part, rdkafka_delivery_handle) + error = described_class.new(rdkafka_unknown_topic_or_part, racecar_delivery_handle) expect(error.to_s).to include "(37)" end - it "handles delivery handle being nil" do - described_class.new(rdkafka_unknown_topic_or_part, nil).to_s + it "includes topic of delivery handle" do + error = described_class.new(rdkafka_unknown_topic_or_part, racecar_delivery_handle) + expect(error.to_s).to include "a_topic" end end diff --git a/spec/runner_spec.rb b/spec/runner_spec.rb index ee68ead7..83c1c0dc 100644 --- a/spec/runner_spec.rb +++ b/spec/runner_spec.rb @@ -181,7 +181,7 @@ def close end end -class FakeDeliveryHandle +class FakeDeliveryHandle < Rdkafka::Producer::DeliveryHandle def initialize(kafka, msg, delivery_callback) @kafka = kafka @msg = msg