Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry Metrics Support For Live Metrics #43564

Open
wants to merge 57 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
b5748bc
sending metrics to openTel exporter
t-nsukumar Jul 3, 2024
e072720
working base version
t-nsukumar Jul 3, 2024
8cc3cff
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 3, 2024
59db9df
testing metric interval
t-nsukumar Jul 3, 2024
14bbaca
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 5, 2024
d7d9a6c
Merge branch 't-nsukumar/experimentation' of https://github.com/navsu…
t-nsukumar Jul 5, 2024
7280ddf
added etag variable
t-nsukumar Jul 5, 2024
d0a4401
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 9, 2024
26ddfba
code for sending opentel metrics
t-nsukumar Jul 9, 2024
2b1e66c
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 11, 2024
26f2934
fixed custom metrics to display on Live
t-nsukumar Jul 11, 2024
cb1ee6e
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 11, 2024
f069e2d
Resolved intermittent display issue in Live Metrics
t-nsukumar Jul 12, 2024
f9c2a03
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 12, 2024
59e7318
Merge branch 't-nsukumar/experimentation' into main
navsukumar Jul 15, 2024
13a2561
Merge pull request #1 from navsukumar/main
navsukumar Jul 15, 2024
b9058f2
resolved reliance on breeze interval
t-nsukumar Jul 15, 2024
ec3a3e0
Merge remote-tracking branch 'origin/t-nsukumar/experimentation' into…
t-nsukumar Jul 15, 2024
6de7c2f
Merge branch 'Azure:main' into main
navsukumar Jul 16, 2024
5aef823
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 16, 2024
87609a5
Merge branch 't-nsukumar/experimentation' of https://github.com/navsu…
t-nsukumar Jul 16, 2024
b1209e6
Merge branch 'Azure:main' into main
navsukumar Jul 16, 2024
109d18e
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 16, 2024
39168fe
Merge branch 't-nsukumar/experimentation' of https://github.com/navsu…
t-nsukumar Jul 16, 2024
a1cf9ca
working version
t-nsukumar Jul 16, 2024
8aee2f1
Merge branch 'Azure:main' into main
navsukumar Jul 17, 2024
824b200
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 17, 2024
925b457
refactoring QuickPulseConfiguration
t-nsukumar Jul 18, 2024
97026e2
Merge branch 'Azure:main' into main
navsukumar Jul 18, 2024
7314a75
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 18, 2024
56c58d4
Merge branch 'Azure:main' into main
navsukumar Jul 19, 2024
1c863e0
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 19, 2024
6b302bf
Merge branch 'Azure:main' into main
navsukumar Jul 19, 2024
2231e28
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 19, 2024
258b89c
fixed same metric on multiple dashboards bug
t-nsukumar Jul 19, 2024
46af557
Merge branch 'Azure:main' into main
navsukumar Jul 24, 2024
4c86a30
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 24, 2024
e2113f7
temporarily removing timestamp
t-nsukumar Jul 24, 2024
472287f
updated functionality
t-nsukumar Jul 24, 2024
52fc348
updated invariant version in tests
t-nsukumar Jul 24, 2024
cea6b53
Merge branch 'Azure:main' into main
navsukumar Jul 24, 2024
267658c
Merge branch 'Azure:main' into t-nsukumar/experimentation
navsukumar Jul 24, 2024
ac7631a
deleted reset function in config
t-nsukumar Jul 24, 2024
9ec6d16
refactored storage of derived metric info
t-nsukumar Aug 1, 2024
52b0d3f
Merge branch 'main' into t-nsukumar/experimentation
t-nsukumar Aug 1, 2024
7cbc371
added filtering support
t-nsukumar Aug 9, 2024
eb9e78c
big merge from sdk main
t-nsukumar Aug 10, 2024
dcc0e67
working on fixing implementation
t-nsukumar Aug 12, 2024
640a32e
adapted to use azure-json
t-nsukumar Aug 13, 2024
00a2332
merged main into branch
t-nsukumar Aug 13, 2024
f78b58c
removed unused import statements
t-nsukumar Aug 13, 2024
aed5a9c
removed unused imports
t-nsukumar Aug 13, 2024
62a8fd0
Merge branch 'main' into t-nsukumar/experimentation
t-nsukumar Aug 13, 2024
6903595
Merge remote-tracking branch 'upstream/main'
t-nsukumar Aug 13, 2024
632bcca
Merge branch 'main' into t-nsukumar/experimentation
t-nsukumar Aug 13, 2024
e48e6ff
fixed metric receiver
t-nsukumar Aug 16, 2024
72e4d79
Merge branch 'main' into t-nsukumar/experimentation
Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ public MetricDataMapper(BiConsumer<AbstractTelemetryBuilder, Resource> telemetry
this.captureHttpServer4xxAsError = captureHttpServer4xxAsError;
}

public void mapMetrics(MetricData metricData, Consumer<TelemetryItem> consumer) {
MetricDataType type = metricData.getType();
if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) {

// DO NOT emit unstable metrics from the OpenTelemetry auto instrumentation libraries
// custom metrics are always emitted
if (OTEL_UNSTABLE_METRICS_TO_EXCLUDE.contains(metricData.getName())
&& metricData.getInstrumentationScopeInfo().getName().startsWith(OTEL_INSTRUMENTATION_NAME_PREFIX)) {
return;
}
List<TelemetryItem> stableOtelMetrics = convertOtelMetricToAzureMonitorMetric(metricData, false);
stableOtelMetrics.forEach(consumer::accept);
} else {
logger.warning("metric data type {} is not supported yet.", metricData.getType());
}
}

public void map(MetricData metricData, Consumer<TelemetryItem> consumer) {
MetricDataType type = metricData.getType();
if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpRequest;
import com.azure.monitor.opentelemetry.exporter.implementation.MetricDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.HostName;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
Expand All @@ -20,13 +21,15 @@

public class QuickPulse {

static final int QP_INVARIANT_VERSION = 1;
// 6 represents filtering support for Otel metrics only is enabled
static final int QP_INVARIANT_VERSION = 6;

private volatile QuickPulseDataCollector collector;

public static QuickPulse create(HttpPipeline httpPipeline, Supplier<URL> endpointUrl,
Supplier<String> instrumentationKey, @Nullable String roleName, @Nullable String roleInstance,
String sdkVersion) {
boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader,
MetricDataMapper metricDataMapper, String sdkVersion) {

QuickPulse quickPulse = new QuickPulse();

Expand All @@ -40,7 +43,8 @@ public static QuickPulse create(HttpPipeline httpPipeline, Supplier<URL> endpoin
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance, sdkVersion);
quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance,
useNormalizedValueForNonNormalizedCpuPercentage, quickPulseMetricReader, metricDataMapper, sdkVersion);
});
// the condition below will always be false, but by referencing the executor it ensures the
// executor can't become unreachable in the middle of the execute() method execution above
Expand All @@ -64,12 +68,16 @@ public void add(TelemetryItem telemetryItem) {
}

private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Supplier<String> instrumentationKey,
@Nullable String roleName, @Nullable String roleInstance, String sdkVersion) {
@Nullable String roleName, @Nullable String roleInstance,
boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader,
MetricDataMapper metricDataMapper, String sdkVersion) {

String quickPulseId = UUID.randomUUID().toString().replace("-", "");
ArrayBlockingQueue<HttpRequest> sendQueue = new ArrayBlockingQueue<>(256, true);
QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration();

QuickPulseDataSender quickPulseDataSender = new QuickPulseDataSender(httpPipeline, sendQueue);
QuickPulseDataSender quickPulseDataSender
= new QuickPulseDataSender(httpPipeline, sendQueue, quickPulseConfiguration);

String instanceName = roleInstance;
String machineName = HostName.get();
Expand All @@ -81,12 +89,13 @@ private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Su
instanceName = "Unknown host";
}

QuickPulseDataCollector collector = new QuickPulseDataCollector();
QuickPulseDataCollector collector
= new QuickPulseDataCollector(useNormalizedValueForNonNormalizedCpuPercentage, quickPulseConfiguration);

QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, endpointUrl,
instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion);
instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion, quickPulseConfiguration);
QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher(collector, sendQueue, endpointUrl,
instrumentationKey, roleName, instanceName, machineName, quickPulseId);
instrumentationKey, roleName, instanceName, machineName, quickPulseId, quickPulseConfiguration);

QuickPulseCoordinatorInitData coordinatorInitData
= new QuickPulseCoordinatorInitDataBuilder().withPingSender(quickPulsePingSender)
Expand All @@ -97,6 +106,14 @@ private void initialize(HttpPipeline httpPipeline, Supplier<URL> endpointUrl, Su

QuickPulseCoordinator coordinator = new QuickPulseCoordinator(coordinatorInitData);

QuickPulseMetricReceiver quickPulseMetricReceiver
= new QuickPulseMetricReceiver(quickPulseMetricReader, metricDataMapper, collector);

Thread metricReceiverThread
= new Thread(quickPulseMetricReceiver, QuickPulseMetricReceiver.class.getSimpleName());
metricReceiverThread.setDaemon(true);
metricReceiverThread.start();

Thread senderThread = new Thread(quickPulseDataSender, QuickPulseDataSender.class.getSimpleName());
senderThread.setDaemon(true);
senderThread.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse;

import com.azure.core.http.HttpResponse;
import com.azure.core.util.logging.ClientLogger;
import com.azure.json.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class QuickPulseConfiguration {
private static final ClientLogger logger = new ClientLogger(QuickPulseDataFetcher.class);
private AtomicReference<String> etag = new AtomicReference<>();
private ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> derivedMetrics = new ConcurrentHashMap<>();

public synchronized String getEtag() {
return this.etag.get();
}

public synchronized void setEtag(String etag) {
this.etag.set(etag);
}

public synchronized ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> getDerivedMetrics() {
return this.derivedMetrics;
}

public synchronized void setDerivedMetrics(ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> metrics) {
this.derivedMetrics = metrics;
}

public synchronized void updateConfig(String etagValue,
ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> otelMetrics) {
if (!Objects.equals(this.getEtag(), etagValue)) {
this.setEtag(etagValue);
this.setDerivedMetrics(otelMetrics);
}

}

public ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> parseDerivedMetrics(HttpResponse response)
throws IOException {

ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>> requestedMetrics = new ConcurrentHashMap<>();
try {

String responseBody = response.getBodyAsString().block();
if (responseBody == null || responseBody.isEmpty()) {
return new ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>>();
}

try (JsonReader jsonReader = JsonProviders.createReader(responseBody)) {
jsonReader.nextToken();
while (jsonReader.nextToken() != JsonToken.END_OBJECT) {
if ("Metrics".equals(jsonReader.getFieldName())) {
jsonReader.nextToken();

while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
DerivedMetricInfo metric = new DerivedMetricInfo();

while (jsonReader.nextToken() != JsonToken.END_OBJECT) {

String fieldName = jsonReader.getFieldName();
jsonReader.nextToken();

switch (fieldName) {
case "Id":
metric.setId(jsonReader.getString());
break;

case "Aggregation":
metric.setAggregation(jsonReader.getString());
break;

case "TelemetryType":
metric.setTelemetryType(jsonReader.getString());
break;

case "Projection":
metric.setProjection(jsonReader.getString());
break;

case "FilterGroups":
// Handle "FilterGroups" field
if (jsonReader.currentToken() == JsonToken.START_ARRAY) {
while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
if (jsonReader.currentToken() == JsonToken.START_OBJECT) {
while (jsonReader.nextToken() != JsonToken.END_OBJECT) {
if (jsonReader.currentToken() == JsonToken.FIELD_NAME
&& jsonReader.getFieldName().equals("Filters")) {
jsonReader.nextToken();
if (jsonReader.currentToken() == JsonToken.START_ARRAY) {
while (jsonReader.nextToken() != JsonToken.END_ARRAY) {
if (jsonReader.currentToken()
== JsonToken.START_OBJECT) {
String innerFieldName = "";
String predicate = "";
String comparand = "";

while (jsonReader.nextToken()
!= JsonToken.END_OBJECT) {
String filterFieldName
= jsonReader.getFieldName();
jsonReader.nextToken();

switch (filterFieldName) {
case "FieldName":
innerFieldName
= jsonReader.getString();
if (innerFieldName.contains(".")) {
innerFieldName = innerFieldName
.split("\\.")[1];
}
break;

case "Predicate":
predicate = jsonReader.getString();
break;

case "Comparand":
comparand = jsonReader.getString();
break;
}
}

if (!innerFieldName.isEmpty()
&& !innerFieldName.equals("undefined")
&& !predicate.isEmpty()
&& !comparand.isEmpty()) {
metric.addFilterGroup(innerFieldName,
predicate, comparand);
}
}
}
}
}
}
}
}
}
break;

default:
jsonReader.skipChildren();
break;
}
}
requestedMetrics.computeIfAbsent(metric.getTelemetryType(), k -> new ArrayList<>())
.add(metric);
}
} else {
jsonReader.skipChildren();

}
}
}
return requestedMetrics;
} catch (Exception e) {
logger.verbose("Failed to parse metrics from response: %s", e.getMessage());
}
return new ConcurrentHashMap<String, ArrayList<DerivedMetricInfo>>();
}

public class DerivedMetricInfo {
private String id;
private String projection;
private String telemetryType;
private String aggregation;
private ArrayList<FilterGroup> filterGroups = new ArrayList<FilterGroup>();

public String getId() {
return this.id;
}

public void setId(String id) {
this.id = id;
}

public String getProjection() {
return projection;
}

public void setTelemetryType(String telemetryType) {
this.telemetryType = telemetryType;
}

public String getTelemetryType() {
return this.telemetryType;
}

public void setProjection(String projection) {
this.projection = projection;
}

public String getAggregation() {
return this.aggregation;
}

public void setAggregation(String aggregation) {
this.aggregation = aggregation;
}

public ArrayList<FilterGroup> getFilterGroups() {
return this.filterGroups;
}

public void addFilterGroup(String fieldName, String predicate, String comparand) {
this.filterGroups.add(new FilterGroup(fieldName, predicate, comparand));
}
}

class FilterGroup {
private String fieldName;
private String operator;
private String comparand;

public FilterGroup(String fieldName, String predicate, String comparand) {
this.setFieldName(fieldName);
this.setOperator(predicate);
this.setComparand(comparand);
}

public String getFieldName() {
return this.fieldName;
}

private void setFieldName(String fieldName) {
this.fieldName = fieldName;
}

public String getOperator() {
return this.operator;
}

private void setOperator(String operator) {
this.operator = operator;
}

public String getComparand() {
return this.comparand;
}

public void setComparand(String comparand) {
this.comparand = comparand;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ private long sendData() {

case QP_IS_OFF:
pingMode = true;
collector.flushOtelMetrics();
QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo);
return qpsServicePollingIntervalHintMillis > 0
? qpsServicePollingIntervalHintMillis
: waitBetweenPingsInMillis;

case QP_IS_ON:
QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo);
return waitBetweenPostsInMillis;
}

Expand Down
Loading
Loading