From d457d9ebfdff59176b33ee797d6ddff27d2d6705 Mon Sep 17 00:00:00 2001 From: Satana de Sant'Ana Date: Wed, 7 Jun 2023 01:18:44 +0100 Subject: [PATCH] delay_serialization: implement feature Add the `delay_serialization` option, allowing users to delay expensive serialization until a more convenient time, such as after an HTTP request has completed. In multi-threaded mode, it causes serialization to happen inside the sender thread. Also, support the `sender_queue_size` in `single_thread` mode, so that it can benefit from the new `delay_serialization` option. Messages are now queued (possibly unserialized) until `sender_queue_size` is reached or `#flush` is called. It may be set to `Float::INFINITY`, so that messages are indefinitely queued until an explicit `#flush`. Fix #271 Co-Authored-By: Blake Williams --- CHANGELOG.md | 15 ++++++ README.md | 12 ++++- lib/datadog/statsd.rb | 13 ++++- lib/datadog/statsd/forwarder.rb | 9 +++- lib/datadog/statsd/message_buffer.rb | 10 +++- lib/datadog/statsd/sender.rb | 5 +- lib/datadog/statsd/single_thread_sender.rb | 18 ++++++- lib/datadog/statsd/version.rb | 2 +- spec/integrations/delay_serialization_spec.rb | 38 +++++++++++++++ spec/statsd/forwarder_spec.rb | 10 ++++ spec/statsd/message_buffer_spec.rb | 5 ++ spec/statsd/single_thread_sender_spec.rb | 47 ++++++++++++++++--- 12 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 spec/integrations/delay_serialization_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 5be99e68..221b31bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,19 @@ [//]: # (comment: Don't forget to update lib/datadog/statsd/version.rb:DogStatsd::Statsd::VERSION when releasing a new version) +## 5.6.0 / 2023.06.07 + + * [FEATURE] Add the `delay_serialization` option, allowing users to delay + expensive serialization until a more convenient time, such as after an HTTP + request has completed. In multi-threaded mode, it causes serialization to + happen inside the sender thread. [#271][] by [@pudiva][] + + * [FEATURE] Also, support the `sender_queue_size` in `single_thread` mode, so + that it can benefit from the new `delay_serialization` option. Messages are + now queued (possibly unserialized) until `sender_queue_size` is reached or + `#flush` is called. It may be set to `Float::INFINITY`, so that messages + are indefinitely queued until an explicit `#flush`. [#271][] by [@pudiva][] + ## 5.5.0 / 2022.06.01 * [FEATURE] Add `distribution_time` method to facilitate measuring timing of a yielded block. [#248][] by [@jordan-brough][] @@ -431,6 +444,7 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1 [#257]: https://github.com/DataDog/dogstatsd-ruby/issues/257 [#258]: https://github.com/DataDog/dogstatsd-ruby/issues/258 [#260]: https://github.com/DataDog/dogstatsd-ruby/issues/260 +[#271]: https://github.com/DataDog/dogstatsd-ruby/issues/271 [@AMekss]: https://github.com/AMekss [@abicky]: https://github.com/abicky [@adimitrov]: https://github.com/adimitrov @@ -469,3 +483,4 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1 [@delner]: https://github.com/delner [@tenderlove]: https://github.com/tenderlove [@zachmccormick]: https://github.com/zachmccormick +[@pudiva]: https://github.com/pudiva diff --git a/README.md b/README.md index 76339b06..6d2eec15 100644 --- a/README.md +++ b/README.md @@ -183,7 +183,7 @@ There is also an implicit message which closes the queue which will cause the se statsd = Datadog::Statsd.new('localhost', 8125) ``` -The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512). +The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048), UDS (512) and `single_thread: true` (1). The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`. @@ -209,6 +209,16 @@ By default, instances of `Datadog::Statsd` are thread-safe and we recommend that When using the `single_thread: true` mode, instances of `Datadog::Statsd` are still thread-safe, but you may run into contention on heavily-threaded applications, so we don’t recommend (for performance reasons) reusing these instances. +### Delaying serialization + +By default, message serialization happens synchronously whenever stat methods such as `#increment` gets called, blocking the caller. If the blocking is impacting your program's performance, you may want to consider the `delay_serialization: true` mode. + +The `delay_serialization: true` mode delays the serialization of metrics to avoid the wait when submitting metrics. Serialization will still have to happen at some point, but it might be postponed until at a more convenient time, such as after an HTTP request has completed. + +In `single_thread: true` mode, you'll probably want to set `sender_queue_size:` from it's default of `1` to some greater value, so that it can benefit from `delay_serialization: true`. Messages will then be queued unserialized in the sender queue and processed normally whenever `sender_queue_size` is reached or `#flush` is called. You might set `sender_queue_size: Float::INFINITY` to allow for an unbounded queue that will only be processed on explicit `#flush`. + +In `single_thread: false` mode, `delay_serialization: true`, will cause serialization to happen inside the sender thread. + ## Versioning This Ruby gem is using [Semantic Versioning](https://guides.rubygems.org/patterns/#semantic-versioning) but please note that supported Ruby versions can change in a minor release of this library. diff --git a/lib/datadog/statsd.rb b/lib/datadog/statsd.rb index 034769cd..043f4977 100644 --- a/lib/datadog/statsd.rb +++ b/lib/datadog/statsd.rb @@ -76,11 +76,12 @@ def tags # @option [Logger] logger for debugging # @option [Integer] buffer_max_payload_size max bytes to buffer # @option [Integer] buffer_max_pool_size max messages to buffer - # @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only) + # @option [Integer] sender_queue_size size of the sender queue in number of buffers # @option [Numeric] buffer_flush_interval interval in second to flush buffer # @option [String] socket_path unix socket path # @option [Float] default sample rate if not overridden # @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread + # @option [Boolean] delay_serialization delays stat serialization def initialize( host = nil, port = nil, @@ -100,6 +101,7 @@ def initialize( logger: nil, single_thread: false, + delay_serialization: false, telemetry_enable: true, telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL @@ -112,6 +114,7 @@ def initialize( @prefix = @namespace ? "#{@namespace}.".freeze : nil @serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags) @sample_rate = sample_rate + @delay_serialization = delay_serialization @forwarder = Forwarder.new( connection_cfg: ConnectionCfg.new( @@ -133,6 +136,7 @@ def initialize( sender_queue_size: sender_queue_size, telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil, + serializer: serializer ) end @@ -425,7 +429,12 @@ def send_stats(stat, delta, type, opts = EMPTY_OPTIONS) sample_rate = opts[:sample_rate] || @sample_rate || 1 if sample_rate == 1 || opts[:pre_sampled] || rand <= sample_rate - full_stat = serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate) + full_stat = + if @delay_serialization + [[stat, delta, type], {tags: opts[:tags], sample_rate: sample_rate}] + else + serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate) + end forwarder.send_message(full_stat) end diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index 88624eb7..4e5f80e5 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -21,7 +21,9 @@ def initialize( single_thread: false, - logger: nil + logger: nil, + + serializer: ) @transport_type = connection_cfg.transport_type @@ -52,8 +54,10 @@ def initialize( max_payload_size: buffer_max_payload_size, max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE, overflowing_stategy: buffer_overflowing_stategy, + serializer: serializer ) + sender_queue_size ||= 1 if single_thread sender_queue_size ||= (@transport_type == :udp ? UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE) @@ -61,7 +65,8 @@ def initialize( SingleThreadSender.new( buffer, logger: logger, - flush_interval: buffer_flush_interval) : + flush_interval: buffer_flush_interval, + queue_size: sender_queue_size) : Sender.new( buffer, logger: logger, diff --git a/lib/datadog/statsd/message_buffer.rb b/lib/datadog/statsd/message_buffer.rb index 0be43d66..48debfef 100644 --- a/lib/datadog/statsd/message_buffer.rb +++ b/lib/datadog/statsd/message_buffer.rb @@ -8,7 +8,8 @@ class MessageBuffer def initialize(connection, max_payload_size: nil, max_pool_size: DEFAULT_BUFFER_POOL_SIZE, - overflowing_stategy: :drop + overflowing_stategy: :drop, + serializer: ) raise ArgumentError, 'max_payload_size keyword argument must be provided' unless max_payload_size raise ArgumentError, 'max_pool_size keyword argument must be provided' unless max_pool_size @@ -17,12 +18,19 @@ def initialize(connection, @max_payload_size = max_payload_size @max_pool_size = max_pool_size @overflowing_stategy = overflowing_stategy + @serializer = serializer @buffer = String.new clear_buffer end def add(message) + # Serializes the message if it hasn't been already. Part of the + # delay_serialization feature. + if message.is_a?(Array) + message = @serializer.to_stat(*message[0], **message[1]) + end + message_size = message.bytesize return nil unless message_size > 0 # to avoid adding empty messages to the buffer diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index a5e863e2..5c2010c0 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -84,7 +84,10 @@ def add(message) if message_queue.length <= @queue_size message_queue << message else - @telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry + if @telemetry + bytesize = message.respond_to?(:bytesize) ? message.bytesize : 0 + @telemetry.dropped_queue(packets: 1, bytes: bytesize) + end end end diff --git a/lib/datadog/statsd/single_thread_sender.rb b/lib/datadog/statsd/single_thread_sender.rb index b15e1139..7750fe19 100644 --- a/lib/datadog/statsd/single_thread_sender.rb +++ b/lib/datadog/statsd/single_thread_sender.rb @@ -7,10 +7,12 @@ class Statsd # It is using current Process.PID to check it is the result of a recent fork # and it is reseting the MessageBuffer if that's the case. class SingleThreadSender - def initialize(message_buffer, logger: nil, flush_interval: nil) + def initialize(message_buffer, logger: nil, flush_interval: nil, queue_size: 1) @message_buffer = message_buffer @logger = logger @mx = Mutex.new + @queue_size = queue_size + @queue = [] @flush_timer = if flush_interval Datadog::Statsd::Timer.new(flush_interval) { flush } else @@ -26,15 +28,21 @@ def add(message) # not send, they belong to the parent process, let's clear the buffer. if forked? @message_buffer.reset + @queue.clear @flush_timer.start if @flush_timer && @flush_timer.stop? update_fork_pid end - @message_buffer.add(message) + + @queue << message + if @queue.size >= @queue_size + drain_queue + end } end def flush(*) @mx.synchronize { + drain_queue @message_buffer.flush() } end @@ -53,6 +61,12 @@ def rendez_vous() private + def drain_queue + while msg = @queue.shift + @message_buffer.add(msg) + end + end + # below are "fork management" methods to be able to clean the MessageBuffer # if it detects that it is running in a unknown PID. diff --git a/lib/datadog/statsd/version.rb b/lib/datadog/statsd/version.rb index fa84ab0d..c7979943 100644 --- a/lib/datadog/statsd/version.rb +++ b/lib/datadog/statsd/version.rb @@ -4,6 +4,6 @@ module Datadog class Statsd - VERSION = '5.5.0' + VERSION = '5.6.0' end end diff --git a/spec/integrations/delay_serialization_spec.rb b/spec/integrations/delay_serialization_spec.rb new file mode 100644 index 00000000..63884dca --- /dev/null +++ b/spec/integrations/delay_serialization_spec.rb @@ -0,0 +1,38 @@ +require "spec_helper" + +describe "Delayed serialization mode" do + it "defers serialization to message buffer" do + buffer = double(Datadog::Statsd::MessageBuffer) + # expects an Array is passed and not a String + expect(buffer) + .to receive(:add) + .with([["boo", 1, "c"], {tags: nil, sample_rate: 1}]) + # and then expect no more adds! + expect(buffer).to receive(:add).exactly(0).times + expect(buffer) + .to receive(:flush) + + allow(Datadog::Statsd::MessageBuffer).to receive(:new).and_return(buffer) + dogstats = Datadog::Statsd.new("localhost", 1234, delay_serialization: true) + + dogstats.increment("boo") + dogstats.flush(sync: true) + end + + it "serializes messages normally" do + socket = FakeUDPSocket.new(copy_message: true) + allow(UDPSocket).to receive(:new).and_return(socket) + dogstats = Datadog::Statsd.new("localhost", 1234, delay_serialization: true) + + dogstats.increment("boo") + dogstats.increment("oob", tags: {tag1: "val1"}) + dogstats.increment("pow", tags: {tag1: "val1"}, sample_rate: 2) + dogstats.flush(sync: true) + + expect(socket.recv[0]).to eq([ + "boo:1|c", + "oob:1|c|#tag1:val1", + "pow:1|c|@2|#tag1:val1" + ].join("\n")) + end +end diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index 31120824..d9f601ac 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -33,6 +33,10 @@ instance_double(Logger) end + let(:serializer) do + Datadog::Statsd::Serialization::Serializer.new + end + before do allow(Datadog::Statsd::MessageBuffer) .to receive(:new) @@ -94,6 +98,8 @@ logger: logger, global_tags: global_tags, + + serializer: serializer, } end @@ -277,6 +283,8 @@ logger: logger, global_tags: global_tags, + + serializer: serializer, } end @@ -464,6 +472,8 @@ logger: logger, global_tags: global_tags, + + serializer: serializer, } end diff --git a/spec/statsd/message_buffer_spec.rb b/spec/statsd/message_buffer_spec.rb index b2accf21..48f7f228 100644 --- a/spec/statsd/message_buffer_spec.rb +++ b/spec/statsd/message_buffer_spec.rb @@ -6,6 +6,7 @@ max_payload_size: max_payload_size, max_pool_size: max_pool_size, overflowing_stategy: overflowing_stategy, + serializer: serializer, ) end @@ -25,6 +26,10 @@ :drop end + let(:serializer) do + Datadog::Statsd::Serialization::Serializer.new + end + describe '#add' do context 'when the message is empty' do it 'returns nil' do diff --git a/spec/statsd/single_thread_sender_spec.rb b/spec/statsd/single_thread_sender_spec.rb index 1d63b9b0..7a497ce6 100644 --- a/spec/statsd/single_thread_sender_spec.rb +++ b/spec/statsd/single_thread_sender_spec.rb @@ -2,13 +2,14 @@ describe Datadog::Statsd::SingleThreadSender do subject do - described_class.new(message_buffer, flush_interval: flush_interval) + described_class.new(message_buffer, flush_interval: flush_interval, queue_size: queue_size) end let(:message_buffer) do instance_double(Datadog::Statsd::MessageBuffer) end let(:flush_interval) { nil } + let(:queue_size) { 6 } describe '#start' do after do @@ -80,12 +81,31 @@ subject.stop end - it 'adds a message to the message buffer asynchronously (needs rendez_vous)' do - expect(message_buffer) - .to receive(:add) - .with('sample message') + context 'when number of messages < queue size' do + it 'does not touch the message buffer' do + n = queue_size - 1 + n.times do |i| + subject.add("sample message #{i}") + end + end + end + + context 'when number of messages == queue size' do + it 'adds queued messages to the message buffer' do + n = queue_size + + n.times do |i| + expect(message_buffer) + .to receive(:add) + .with("sample message #{i}") + .ordered + end + expect(message_buffer).to receive(:add).exactly(0).times - subject.add('sample message') + n.times do |i| + subject.add("sample message #{i}") + end + end end end end @@ -100,10 +120,23 @@ subject.stop end - it 'flushes the message buffer' do + it 'adds queued messages to the message buffer and flushes it' do + n = queue_size - 1 + + n.times do |i| + expect(message_buffer) + .to receive(:add) + .with("sample message #{i}") + .ordered + end + # and then expect no more adds! + expect(message_buffer).to receive(:add).exactly(0).times expect(message_buffer) .to receive(:flush) + n.times do |i| + subject.add("sample message #{i}") + end subject.flush end end