From a7068421adfea495c4f8ccb34b1c6bec38c815b0 Mon Sep 17 00:00:00 2001 From: gillesbergerp Date: Sat, 11 Nov 2023 21:28:31 +0100 Subject: [PATCH] Refresh rules if they've been modified --- .../lib/opentelemetry/sampling/xray/client.rb | 37 ++++++++++++++----- .../lib/opentelemetry/sampling/xray/poller.rb | 15 +++++--- .../sampling/xray/client_test.rb | 33 ++++++++++------- .../sampling/xray/poller_test.rb | 31 +++++++++++----- 4 files changed, 77 insertions(+), 39 deletions(-) diff --git a/sampling/xray/lib/opentelemetry/sampling/xray/client.rb b/sampling/xray/lib/opentelemetry/sampling/xray/client.rb index b781a1a4e..b484b9cad 100644 --- a/sampling/xray/lib/opentelemetry/sampling/xray/client.rb +++ b/sampling/xray/lib/opentelemetry/sampling/xray/client.rb @@ -28,18 +28,17 @@ def fetch_sampling_rules end # @param [Array] sampling_rules - # @return [Array] + # @return [SamplingTargetResponse] def fetch_sampling_targets(sampling_rules) OpenTelemetry.logger.debug("Fetching sampling targets for rules: #{sampling_rules}") - post( + response = post( path: '/SamplingTargets', body: { SamplingStatisticsDocuments: sampling_rules.map { |rule| SamplingStatisticsDocument.from_rule(rule, @client_id) }.map(&:to_request) } ) - .fetch(:SamplingTargetDocuments, []) - .map { |item| SamplingTargetDocument.from_response(item) } + SamplingTargetResponse.from_response(response) end private @@ -50,12 +49,10 @@ def fetch_sampling_targets(sampling_rules) def post(path:, body: nil) response = Net::HTTP.post(URI("#{@endpoint}#{path}"), body&.to_json) - if response.kind_of?(Net::HTTPSuccess) - OpenTelemetry.logger.debug("X-Ray response for #{path}: #{response.body}") - JSON.parse(response.body, symbolize_names: true) - else - raise("Received #{response.code} (#{response.message}): #{response.body}") - end + raise("Received #{response.code} (#{response.message}): #{response.body}") unless response.is_a?(Net::HTTPSuccess) + + OpenTelemetry.logger.debug("X-Ray response for #{path}: #{response.body}") + JSON.parse(response.body, symbolize_names: true) rescue StandardError => e OpenTelemetry.logger.error("Error while posting to X-Ray: #{e.message}") {} @@ -196,6 +193,26 @@ def self.from_response(response) ) end end + + class SamplingTargetResponse + attr_reader(:last_rule_modification, :sampling_target_documents) + + def initialize(last_rule_modification:, sampling_target_documents:) + @last_rule_modification = last_rule_modification + @sampling_target_documents = sampling_target_documents + end + + # @param [Hash] response + # @return [SamplingTargetResponse] + def self.from_response(response) + return nil if response.empty? + + SamplingTargetResponse.new( + last_rule_modification: Time.at(response[:LastRuleModification]), + sampling_target_documents: response[:SamplingTargetDocuments].map { |item| SamplingTargetDocument.from_response(item) } + ) + end + end end end end diff --git a/sampling/xray/lib/opentelemetry/sampling/xray/poller.rb b/sampling/xray/lib/opentelemetry/sampling/xray/poller.rb index a2b873b6d..681c874dc 100644 --- a/sampling/xray/lib/opentelemetry/sampling/xray/poller.rb +++ b/sampling/xray/lib/opentelemetry/sampling/xray/poller.rb @@ -41,10 +41,9 @@ def start_worker Thread.new do while @running sleep(@target_interval) - @rule_interval_elapsed += @target_interval refresh_targets - refresh_rules if @rule_interval_elapsed >= @rule_interval + refresh_rules if (Time.now - @last_rule_refresh).to_i >= @rule_interval end end end @@ -52,7 +51,7 @@ def start_worker def refresh_rules OpenTelemetry.logger.debug('Refreshing sampling rules') @cache.update_rules(@client.fetch_sampling_rules.map(&:sampling_rule)) - @rule_interval_elapsed = 0 + @last_rule_refresh = Time.now end def refresh_targets @@ -63,9 +62,13 @@ def refresh_targets end OpenTelemetry.logger.debug('Refreshing sampling targets') - @cache.update_targets( - @client.fetch_sampling_targets(matched_rules) - ) + response = @client.fetch_sampling_targets(matched_rules) + return if response.nil? + + @cache.update_targets(response.sampling_target_documents) + return unless response.last_rule_modification > @last_rule_refresh + + refresh_rules end end end diff --git a/sampling/xray/test/opentelemetry/sampling/xray/client_test.rb b/sampling/xray/test/opentelemetry/sampling/xray/client_test.rb index 1e76e74fd..fa9c47efe 100644 --- a/sampling/xray/test/opentelemetry/sampling/xray/client_test.rb +++ b/sampling/xray/test/opentelemetry/sampling/xray/client_test.rb @@ -102,16 +102,19 @@ before do stub_request(:post, "#{endpoint}/SamplingTargets") .to_return( - body: { SamplingTargetDocuments: [] }.to_json, + body: { + SamplingTargetDocuments: [], + LastRuleModification: Time.now.to_i + }.to_json, headers: { content_type: 'application/json' } ) end it do - rules = OpenTelemetry::Sampling::XRay::Client - .new(endpoint: endpoint) - .fetch_sampling_targets([]) + response = OpenTelemetry::Sampling::XRay::Client + .new(endpoint: endpoint) + .fetch_sampling_targets([]) - _(rules).must_equal([]) + _(response.sampling_target_documents).must_equal([]) end end @@ -123,11 +126,11 @@ .to_return(status: 500) end it do - rules = OpenTelemetry::Sampling::XRay::Client - .new(endpoint: endpoint) - .fetch_sampling_targets([]) + response = OpenTelemetry::Sampling::XRay::Client + .new(endpoint: endpoint) + .fetch_sampling_targets([]) - _(rules).must_equal([]) + _(response).must_be_nil end end @@ -146,15 +149,19 @@ before do stub_request(:post, "#{endpoint}/SamplingTargets") .to_return( - body: { SamplingTargetDocuments: [document] }.to_json, + body: { + SamplingTargetDocuments: [document], + LastRuleModification: Time.now.to_i + }.to_json, headers: { content_type: 'application/json' } ) end it do - targets = OpenTelemetry::Sampling::XRay::Client - .new(endpoint: endpoint) - .fetch_sampling_targets([]) + response = OpenTelemetry::Sampling::XRay::Client + .new(endpoint: endpoint) + .fetch_sampling_targets([]) + targets = response.sampling_target_documents _(targets.size).must_equal(1) target = targets.first diff --git a/sampling/xray/test/opentelemetry/sampling/xray/poller_test.rb b/sampling/xray/test/opentelemetry/sampling/xray/poller_test.rb index 31d2b5aef..59f49125f 100644 --- a/sampling/xray/test/opentelemetry/sampling/xray/poller_test.rb +++ b/sampling/xray/test/opentelemetry/sampling/xray/poller_test.rb @@ -15,8 +15,8 @@ rules = [ OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new( sampling_rule: build_rule, - created_at: DateTime.now, - modified_at: DateTime.now + created_at: Time.now, + modified_at: Time.now ) ] @@ -44,15 +44,15 @@ first_rules = [ OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new( sampling_rule: build_rule, - created_at: DateTime.now, - modified_at: DateTime.now + created_at: Time.now, + modified_at: Time.now ) ] second_rules = [ OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new( sampling_rule: build_rule, - created_at: DateTime.now, - modified_at: DateTime.now + created_at: Time.now, + modified_at: Time.now ) ] @@ -83,18 +83,29 @@ rules = [ OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new( sampling_rule: build_rule, - created_at: DateTime.now, - modified_at: DateTime.now + created_at: Time.now, + modified_at: Time.now ) ] matched_rules = [build_rule] - targets = [SecureRandom.uuid.to_s] + targets = OpenTelemetry::Sampling::XRay::Client::SamplingTargetResponse.new( + last_rule_modification: Time.now, + sampling_target_documents: [ + OpenTelemetry::Sampling::XRay::Client::SamplingTargetDocument.new( + rule_name: SecureRandom.uuid.to_s, + fixed_rate: rand, + reservoir_quota: rand(0..100), + reservoir_quota_ttl: rand(0..100), + interval: rand(0..100) + ) + ] + ) client.expect(:fetch_sampling_rules, rules) cache.expect(:update_rules, nil, [rules.map(&:sampling_rule)]) cache.expect(:get_matched_rules, matched_rules) client.expect(:fetch_sampling_targets, targets, [matched_rules]) - cache.expect(:update_targets, nil, [targets]) + cache.expect(:update_targets, nil, [targets.sampling_target_documents]) poller = OpenTelemetry::Sampling::XRay::Poller.new( client: client,