Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: separate metric_exporter out of metric_reader #1779

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module Exporter
module OTLP
module Metrics
# An OpenTelemetry metrics exporter that sends metrics over HTTP as Protobuf encoded OTLP ExportMetricsServiceRequest.
class MetricsExporter < ::OpenTelemetry::SDK::Metrics::Export::MetricReader
class MetricsExporter < ::OpenTelemetry::SDK::Metrics::Export::MetricExporter
include Util

attr_reader :metric_snapshots
Expand Down Expand Up @@ -78,8 +78,8 @@ def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPOR
# consolidate the metrics data into the form of MetricData
#
# return MetricData
def pull
export(collect)
def pull(timeout: nil)
export(collect, timeout: timeout)
end

# metrics Array[MetricData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@

describe '#export' do
let(:exporter) { OpenTelemetry::Exporter::OTLP::Metrics::MetricsExporter.new }
let(:metric_reader) { OpenTelemetry::SDK::Metrics::Export::MetricReader.new(exporter: exporter) }
let(:meter_provider) { OpenTelemetry::SDK::Metrics::MeterProvider.new(resource: OpenTelemetry::SDK::Resources::Resource.telemetry_sdk) }

it 'integrates with collector' do
Expand Down Expand Up @@ -530,11 +531,11 @@

it 'exports a metric' do
stub_post = stub_request(:post, 'http://localhost:4318/v1/metrics').to_return(status: 200)
meter_provider.add_metric_reader(exporter)
meter_provider.add_metric_reader(metric_reader)
meter = meter_provider.meter('test')
counter = meter.create_counter('test_counter', unit: 'smidgen', description: 'a small amount of something')
counter.add(5, attributes: { 'foo' => 'bar' })
exporter.pull
metric_reader.pull
meter_provider.shutdown

assert_requested(stub_post)
Expand Down Expand Up @@ -574,7 +575,7 @@

it 'translates all the things' do
stub_request(:post, 'http://localhost:4318/v1/metrics').to_return(status: 200)
meter_provider.add_metric_reader(exporter)
meter_provider.add_metric_reader(metric_reader)
meter = meter_provider.meter('test')

counter = meter.create_counter('test_counter', unit: 'smidgen', description: 'a small amount of something')
Expand All @@ -586,7 +587,7 @@
gauge = meter.create_gauge('test_gauge', unit: 'smidgen', description: 'a small amount of something')
gauge.record(15, attributes: { 'baz' => 'qux' })

exporter.pull
metric_reader.pull
meter_provider.shutdown

encoded_etsr = Opentelemetry::Proto::Collector::Metrics::V1::ExportMetricsServiceRequest.encode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Metrics
module Aggregation
# Contains the implementation of the Drop aggregation
class Drop
attr_reader :aggregation_temporality
attr_accessor :aggregation_temporality

def initialize(aggregation_temporality: :delta)
@aggregation_temporality = aggregation_temporality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ class ExplicitBucketHistogram
DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
private_constant :DEFAULT_BOUNDARIES

attr_reader :aggregation_temporality
attr_accessor :aggregation_temporality

# attr_accessor :aggregation_temporality # approach 2

# The default value for boundaries represents the following buckets:
# (-inf, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Metrics
module Aggregation
# Contains the implementation of the LastValue aggregation
class LastValue
attr_reader :aggregation_temporality
attr_accessor :aggregation_temporality

def initialize(aggregation_temporality: :delta)
@aggregation_temporality = aggregation_temporality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Aggregation
# Contains the implementation of the Sum aggregation
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#sum-aggregation
class Sum
attr_reader :aggregation_temporality
attr_accessor :aggregation_temporality

def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta))
# TODO: the default should be :cumulative, see issue #1555
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def wrapped_metric_exporters_from_env
case exporter.strip
when 'none' then nil
when 'console' then OpenTelemetry.meter_provider.add_metric_reader(Metrics::Export::PeriodicMetricReader.new(exporter: Metrics::Export::ConsoleMetricPullExporter.new))
when 'in-memory' then OpenTelemetry.meter_provider.add_metric_reader(Metrics::Export::InMemoryMetricPullExporter.new)
when 'in-memory' then OpenTelemetry.meter_provider.add_metric_reader(Metrics::Export::MetricReader.new(exporter: Metrics::Export::InMemoryMetricPullExporter.new))
when 'otlp'
begin
OpenTelemetry.meter_provider.add_metric_reader(Metrics::Export::PeriodicMetricReader.new(exporter: OpenTelemetry::Exporter::OTLP::Metrics::MetricsExporter.new))
Expand Down
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Export
end
end

require 'opentelemetry/sdk/metrics/export/metric_exporter'
require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@ module Export
# Outputs {MetricData} to the console
#
# Potentially useful for exploratory purposes.
class ConsoleMetricPullExporter < MetricReader
class ConsoleMetricPullExporter < MetricExporter
def initialize
super
@stopped = false
end

def pull
export(collect)
end

def export(metrics, timeout: nil)
return FAILURE if @stopped

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Metrics
module Export
# The InMemoryMetricPullExporter behaves as a Metric Reader and Exporter.
# To be used for testing purposes, not production.
class InMemoryMetricPullExporter < MetricReader
class InMemoryMetricPullExporter < MetricExporter
attr_reader :metric_snapshots

def initialize
Expand All @@ -19,10 +19,6 @@ def initialize
@mutex = Mutex.new
end

def pull
export(collect)
end

def export(metrics, timeout: nil)
@mutex.synchronize do
@metric_snapshots.concat(Array(metrics))
Expand All @@ -35,10 +31,6 @@ def reset
@metric_snapshots.clear
end
end

def shutdown
SUCCESS
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
# Exporter provides a minimal example implementation.
# It is not required to subclass this class to provide an implementation
# of MetricReader, provided the interface is satisfied.
class MetricExporter
attr_reader :metric_store

def initialize
@metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new
end

def collect
@metric_store.collect
end

def pull(timeout: nil)
export(collect, timeout: timeout)
end

def export(metrics, timeout: nil)
Export::SUCCESS
end

def shutdown(timeout: nil)
Export::SUCCESS
end

def force_flush(timeout: nil)
Export::SUCCESS
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,69 @@ module OpenTelemetry
module SDK
module Metrics
module Export
# MetricReader provides a minimal example implementation.
# It is not required to subclass this class to provide an implementation
# of MetricReader, provided the interface is satisfied.
# MetricReader
class MetricReader
attr_reader :metric_store
attr_reader :exporters

def initialize
@metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new
def initialize(exporter: nil)
@mutex = Mutex.new
@exporters = []
register_exporter(exporter: exporter)
end

def collect
@metric_store.collect
# The metrics Reader implementation supports registering metric Exporters
def register_exporter(exporter: nil)
return unless exporter.respond_to?(:pull)

@mutex.synchronize do
@exporters << exporter
end
end

# exporter pull should trigger exporter to send out the metrics
def collect(timeout: nil)
@exporters.each { |exporter| exporter.pull(timeout: timeout) if exporter.respond_to?(:pull) }
end
alias pull collect

# The metrics Reader implementation supports configuring the
# default aggregation on the basis of instrument kind.
def aggregator(aggregator: nil, instrument_kind: nil)
return if aggregator.nil?

@exporters.each do |exporter|
exporter.metric_store.metric_streams.each do |ms|
ms.default_aggregation = aggregator if instrument_kind.nil? || ms.instrument_kind == instrument_kind
end
end
end

# The metrics Reader implementation supports configuring the
# default temporality on the basis of instrument kind.
def temporality(temporality: nil, instrument_kind: nil)
return if temporality.nil?

@exporters.each do |exporter|
exporter.metric_store.metric_streams.each do |ms|
ms.default_aggregation.aggregation_temporality = temporality if instrument_kind.nil? || ms.instrument_kind == instrument_kind
end
end
end

# shutdown all exporters
def shutdown(timeout: nil)
@exporters.each { |exporter| exporter.shutdown(timeout: timeout) if exporter.respond_to?(:shutdown) }
Export::SUCCESS
rescue StandardError
Export::FAILURE
end

# force flush all exporters
def force_flush(timeout: nil)
@exporters.each { |exporter| exporter.force_flush(timeout: timeout) if exporter.respond_to?(:force_flush) }
Export::SUCCESS
rescue StandardError
Export::FAILURE
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ class PeriodicMetricReader < MetricReader
def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)),
export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)),
exporter: nil)
super()
super(exporter: exporter)

@export_interval = export_interval_millis / 1000.0
@export_timeout = export_timeout_millis / 1000.0
@exporter = exporter
@thread = nil
@continue = false
@mutex = Mutex.new
@export_mutex = Mutex.new

start
Expand All @@ -44,26 +42,21 @@ def shutdown(timeout: nil)
@thread
end
thread&.join(@export_interval)
@exporter.force_flush if @exporter.respond_to?(:force_flush)
@exporter.shutdown
Export::SUCCESS
super(timeout: timeout)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to shutdown PeriodicMetricReader.')
Export::FAILURE
end

def force_flush(timeout: nil)
export(timeout: timeout)
Export::SUCCESS
rescue StandardError
Export::FAILURE
super(timeout: timeout)
end

private

def start
@continue = true
if @exporter.nil?
if @exporters.empty?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please shutdown it if it needs to restart.'
Expand All @@ -85,8 +78,7 @@ def start

def export(timeout: nil)
@export_mutex.synchronize do
collected_metrics = collect
@exporter.export(collected_metrics, timeout: timeout || @export_timeout) unless collected_metrics.empty?
@exporters.each { |exporter| exporter.pull(timeout: timeout || @export_timeout) if exporter.respond_to?(:pull) }
end
end

Expand Down
4 changes: 3 additions & 1 deletion metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def initialize(name, version, meter_provider)
# @api private
def add_metric_reader(metric_reader)
@instrument_registry.each_value do |instrument|
instrument.register_with_new_metric_store(metric_reader.metric_store)
metric_reader.exporters.each do |exporter|
instrument.register_with_new_metric_store(exporter.metric_store)
end
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ def add_metric_reader(metric_reader)
def register_synchronous_instrument(instrument)
@mutex.synchronize do
@metric_readers.each do |mr|
instrument.register_with_new_metric_store(mr.metric_store)
mr.exporters.each do |exporter|
instrument.register_with_new_metric_store(exporter.metric_store)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ module State
# The MetricStore module provides SDK internal functionality that is not a part of the
# public API.
class MetricStore
attr_reader :metric_streams

def initialize
@mutex = Mutex.new
@epoch_start_time = now_in_nano
Expand All @@ -23,7 +25,6 @@ def initialize
def collect
@mutex.synchronize do
@epoch_end_time = now_in_nano
# snapshot = @metric_streams.map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) }
snapshot = @metric_streams.flat_map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) }
@epoch_start_time = @epoch_end_time
snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module State
# public API.
class MetricStream
attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points
attr_accessor :default_aggregation

def initialize(
name,
Expand Down
Loading
Loading