Skip to content

Commit

Permalink
Refresh rules if they've been modified
Browse files Browse the repository at this point in the history
  • Loading branch information
gillesbergerp committed Nov 20, 2023
1 parent 919a4d2 commit a706842
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 39 deletions.
37 changes: 27 additions & 10 deletions sampling/xray/lib/opentelemetry/sampling/xray/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@ def fetch_sampling_rules
end

# @param [Array<SamplingRule>] sampling_rules
# @return [Array<SamplingTargetDocument>]
# @return [SamplingTargetResponse]
def fetch_sampling_targets(sampling_rules)
OpenTelemetry.logger.debug("Fetching sampling targets for rules: #{sampling_rules}")

post(
response = post(
path: '/SamplingTargets',
body: {
SamplingStatisticsDocuments: sampling_rules.map { |rule| SamplingStatisticsDocument.from_rule(rule, @client_id) }.map(&:to_request)
}
)
.fetch(:SamplingTargetDocuments, [])
.map { |item| SamplingTargetDocument.from_response(item) }
SamplingTargetResponse.from_response(response)
end

private
Expand All @@ -50,12 +49,10 @@ def fetch_sampling_targets(sampling_rules)
def post(path:, body: nil)
response = Net::HTTP.post(URI("#{@endpoint}#{path}"), body&.to_json)

if response.kind_of?(Net::HTTPSuccess)
OpenTelemetry.logger.debug("X-Ray response for #{path}: #{response.body}")
JSON.parse(response.body, symbolize_names: true)
else
raise("Received #{response.code} (#{response.message}): #{response.body}")
end
raise("Received #{response.code} (#{response.message}): #{response.body}") unless response.is_a?(Net::HTTPSuccess)

OpenTelemetry.logger.debug("X-Ray response for #{path}: #{response.body}")
JSON.parse(response.body, symbolize_names: true)
rescue StandardError => e
OpenTelemetry.logger.error("Error while posting to X-Ray: #{e.message}")
{}
Expand Down Expand Up @@ -196,6 +193,26 @@ def self.from_response(response)
)
end
end

class SamplingTargetResponse
attr_reader(:last_rule_modification, :sampling_target_documents)

def initialize(last_rule_modification:, sampling_target_documents:)
@last_rule_modification = last_rule_modification
@sampling_target_documents = sampling_target_documents
end

# @param [Hash] response
# @return [SamplingTargetResponse]
def self.from_response(response)
return nil if response.empty?

SamplingTargetResponse.new(
last_rule_modification: Time.at(response[:LastRuleModification]),
sampling_target_documents: response[:SamplingTargetDocuments].map { |item| SamplingTargetDocument.from_response(item) }
)
end
end
end
end
end
Expand Down
15 changes: 9 additions & 6 deletions sampling/xray/lib/opentelemetry/sampling/xray/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,17 @@ def start_worker
Thread.new do
while @running
sleep(@target_interval)
@rule_interval_elapsed += @target_interval

refresh_targets
refresh_rules if @rule_interval_elapsed >= @rule_interval
refresh_rules if (Time.now - @last_rule_refresh).to_i >= @rule_interval
end
end
end

def refresh_rules
OpenTelemetry.logger.debug('Refreshing sampling rules')
@cache.update_rules(@client.fetch_sampling_rules.map(&:sampling_rule))
@rule_interval_elapsed = 0
@last_rule_refresh = Time.now
end

def refresh_targets
Expand All @@ -63,9 +62,13 @@ def refresh_targets
end

OpenTelemetry.logger.debug('Refreshing sampling targets')
@cache.update_targets(
@client.fetch_sampling_targets(matched_rules)
)
response = @client.fetch_sampling_targets(matched_rules)
return if response.nil?

@cache.update_targets(response.sampling_target_documents)
return unless response.last_rule_modification > @last_rule_refresh

refresh_rules
end
end
end
Expand Down
33 changes: 20 additions & 13 deletions sampling/xray/test/opentelemetry/sampling/xray/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,19 @@
before do
stub_request(:post, "#{endpoint}/SamplingTargets")
.to_return(
body: { SamplingTargetDocuments: [] }.to_json,
body: {
SamplingTargetDocuments: [],
LastRuleModification: Time.now.to_i
}.to_json,
headers: { content_type: 'application/json' }
)
end
it do
rules = OpenTelemetry::Sampling::XRay::Client
.new(endpoint: endpoint)
.fetch_sampling_targets([])
response = OpenTelemetry::Sampling::XRay::Client
.new(endpoint: endpoint)
.fetch_sampling_targets([])

_(rules).must_equal([])
_(response.sampling_target_documents).must_equal([])
end
end

Expand All @@ -123,11 +126,11 @@
.to_return(status: 500)
end
it do
rules = OpenTelemetry::Sampling::XRay::Client
.new(endpoint: endpoint)
.fetch_sampling_targets([])
response = OpenTelemetry::Sampling::XRay::Client
.new(endpoint: endpoint)
.fetch_sampling_targets([])

_(rules).must_equal([])
_(response).must_be_nil
end
end

Expand All @@ -146,15 +149,19 @@
before do
stub_request(:post, "#{endpoint}/SamplingTargets")
.to_return(
body: { SamplingTargetDocuments: [document] }.to_json,
body: {
SamplingTargetDocuments: [document],
LastRuleModification: Time.now.to_i
}.to_json,
headers: { content_type: 'application/json' }
)
end
it do
targets = OpenTelemetry::Sampling::XRay::Client
.new(endpoint: endpoint)
.fetch_sampling_targets([])
response = OpenTelemetry::Sampling::XRay::Client
.new(endpoint: endpoint)
.fetch_sampling_targets([])

targets = response.sampling_target_documents
_(targets.size).must_equal(1)

target = targets.first
Expand Down
31 changes: 21 additions & 10 deletions sampling/xray/test/opentelemetry/sampling/xray/poller_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
rules = [
OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new(
sampling_rule: build_rule,
created_at: DateTime.now,
modified_at: DateTime.now
created_at: Time.now,
modified_at: Time.now
)
]

Expand Down Expand Up @@ -44,15 +44,15 @@
first_rules = [
OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new(
sampling_rule: build_rule,
created_at: DateTime.now,
modified_at: DateTime.now
created_at: Time.now,
modified_at: Time.now
)
]
second_rules = [
OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new(
sampling_rule: build_rule,
created_at: DateTime.now,
modified_at: DateTime.now
created_at: Time.now,
modified_at: Time.now
)
]

Expand Down Expand Up @@ -83,18 +83,29 @@
rules = [
OpenTelemetry::Sampling::XRay::Client::SamplingRuleRecord.new(
sampling_rule: build_rule,
created_at: DateTime.now,
modified_at: DateTime.now
created_at: Time.now,
modified_at: Time.now
)
]
matched_rules = [build_rule]
targets = [SecureRandom.uuid.to_s]
targets = OpenTelemetry::Sampling::XRay::Client::SamplingTargetResponse.new(
last_rule_modification: Time.now,
sampling_target_documents: [
OpenTelemetry::Sampling::XRay::Client::SamplingTargetDocument.new(
rule_name: SecureRandom.uuid.to_s,
fixed_rate: rand,
reservoir_quota: rand(0..100),
reservoir_quota_ttl: rand(0..100),
interval: rand(0..100)
)
]
)

client.expect(:fetch_sampling_rules, rules)
cache.expect(:update_rules, nil, [rules.map(&:sampling_rule)])
cache.expect(:get_matched_rules, matched_rules)
client.expect(:fetch_sampling_targets, targets, [matched_rules])
cache.expect(:update_targets, nil, [targets])
cache.expect(:update_targets, nil, [targets.sampling_target_documents])

poller = OpenTelemetry::Sampling::XRay::Poller.new(
client: client,
Expand Down

0 comments on commit a706842

Please sign in to comment.