Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prometheusremotewriteexporter] reduce allocations in createAttributes #35184

27 changes: 27 additions & 0 deletions .chloggen/prometheusremotewrite-optimize-createattributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an enhancement, right? Not a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dashpole suggested this should be considered breaking, unless I misread the comment #35184 (comment).

I agree it's more of an enhancement though it is technically changing a public function (FromMetrics).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FromMetrics doesn't need to change to achieve this, does it? Rather than storing a converter pool in the PRW exporter it could be stored in the translator package and used within FromMetrics since a single invocation of that function encompasses a complete usage of the converter. Alternately, we can complete the effort that was started to formalize that interface and add a Reset() method to it, if required.

I think that should probably happen separately from optimization efforts, though. It already seems like there are several different threads being pulled at in this area. Can we work through what this API should look like to enable those desired changes, change the API, then deal with concurrency within the exporter and optimization within the translator independently?


# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: reduce allocations in createAttributes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35184]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 9 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS
p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
}

var converterPool = sync.Pool{
New: func() any {
return prometheusremotewrite.NewPrometheusConverter()
},
}

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
Expand Down Expand Up @@ -172,8 +178,10 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:
converter := converterPool.Get().(*prometheusremotewrite.PrometheusConverter)
defer converterPool.Put(converter)

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
tsMap, err := converter.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
Expand Down
67 changes: 31 additions & 36 deletions pkg/translator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"slices"
"sort"
"strconv"
"strings"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -96,46 +97,40 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)

// Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2

if haveServiceName {
maxLabelCount++
}

if haveInstanceID {
maxLabelCount++
}

// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)
l := c.labelsMap
clear(l)
Comment on lines +116 to +117
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also clear labelMap when we call reset()? I think it would be cleaner if we reset the state in one single place instead, or is there any particular reason to do it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the same reason as https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35184/files/928529a1cf587e8e5b29bd4880f2c36157eb8194#r1829677356 we want to isolate the contents of this map between calls to createAttributes, so we do that by clearing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thank you! Could we add a comment explaining it?


// store duplicate labels separately in a throwaway map
// assuming this is the less common case
collisions := make(map[string][]string)

// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount)
// XXX: Should we always drop service namespace/service name/service instance ID from the labels
// (as they get mapped to other Prometheus labels)?
attributes.Range(func(key string, value pcommon.Value) bool {
if !slices.Contains(ignoreAttrs, key) {
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
var finalKey = prometheustranslator.NormalizeLabel(key)
if _, alreadyExists := l[finalKey]; alreadyExists {
collisions[finalKey] = append(collisions[finalKey], value.AsString())
} else {
l[finalKey] = value.AsString()
}
}
return true
})
sort.Stable(ByLabelName(labels))

for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists {
l[finalKey] = existingValue + ";" + label.Value
} else {
l[finalKey] = label.Value
}
for key, values := range collisions {
values = append(values, l[key])
// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
sort.Strings(values)
l[key] = strings.Join(values, ";")
}

// Map service.name + service.namespace to job
Expand Down Expand Up @@ -175,12 +170,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
l[name] = extras[i+1]
}

labels = labels[:0]
startIndex := len(c.labels)
for k, v := range l {
labels = append(labels, prompb.Label{Name: k, Value: v})
c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}

return labels
return c.labels[startIndex:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that len(c.labels) is not 0 here? It's reset every time we call FromMetrics and I couldn't find any other place in the code where we write to this array, so why not just return c.labels and not worry about startIndex? I might be missing something but it feels like we're overcomplicating things here

for k, v := range l {
	c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}
return c.labels

Copy link
Contributor Author

@edma2 edma2 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startIndex is important for keeping the returned slices isolated from each other while sharing the same underlying array within a single FromMetrics call. It is 0 only for the first series of a batch.

Here is how it works: FromMetrics is called once per batch, and createAttributes for every series within the batch. We want to re-use the backing array of the labels slice for all series within a single batch. We do that by appending the labels of each series to the end of the slice. Finally we return only starting from startIndex so the caller doesn't see labels from other series (while reusing the same backing array which naturally grows up to the size needed to fit a single FromMetrics call).

For example, if X1...X4 are labels from series X and Y1...Y3 are labels from series Y, then the backing array of c.labels will look like [X1, X2, X3, X4, Y1, Y2, Y3] after calling createAttributes twice (this is a simplification as the backing array will probably have excess capacity from resizing or previous calls). Meanwhile, the first call to createAttributes will have returned [X1, X2, X3, X4] and the second call returned [Y1, Y2, Y3]. On the next FromMetrics call the index is reset to 0 and we can re-use the entire array with zero allocations.

Copy link
Member

@ArthurSens ArthurSens Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thank you for the explanation :)

Now what I'm thinking is if we have tests that assure concurrency works. Mostly to make sure we don't break the non-thread-safe promise by accident

}

// isValidAggregationTemporality checks whether an OTel metric has a valid
Expand All @@ -200,12 +195,12 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool {
return false
}

func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)

// If the sum is unset, it indicates the _sum metric point should be
// omitted
Expand Down Expand Up @@ -383,12 +378,12 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp {
return b
}

func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
settings Settings, baseName string) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)

// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Expand Down Expand Up @@ -456,7 +451,7 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr

// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false.
// Otherwise it creates a new one and returns that, and true.
func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
h := timeSeriesSignature(lbls)
ts := c.unique[h]
if ts != nil {
Expand Down Expand Up @@ -492,7 +487,7 @@ func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp
// addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist.
// If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp,
// both converted to milliseconds.
func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
ts, created := c.getOrCreateTimeSeries(lbls)
if created {
ts.Samples = []prompb.Sample{
Expand All @@ -506,7 +501,7 @@ func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi
}

// addResourceTargetInfo converts the resource to the target info metric.
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) {
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) {
if settings.DisableTargetInfo || timestamp == 0 {
return
}
Expand Down Expand Up @@ -534,7 +529,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name
}

labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
labels := converter.createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {
Expand Down
24 changes: 15 additions & 9 deletions pkg/translator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestPrometheusConverter_addSample(t *testing.T) {
}

t.Run("empty_case", func(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()
converter.addSample(nil, nil)
assert.Empty(t, converter.unique)
assert.Empty(t, converter.conflicts)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestPrometheusConverter_addSample(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()
converter.addSample(&tt.testCase[0].sample, tt.testCase[0].labels)
converter.addSample(&tt.testCase[1].sample, tt.testCase[1].labels)
assert.Exactly(t, tt.want, converter.unique)
Expand Down Expand Up @@ -359,8 +359,9 @@ func Test_createLabelSet(t *testing.T) {
}
// run tests
for _, tt := range tests {
c := NewPrometheusConverter()
t.Run(tt.name, func(t *testing.T) {
assert.ElementsMatch(t, tt.want, createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...))
assert.ElementsMatch(t, tt.want, c.createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...))
})
}
}
Expand All @@ -375,10 +376,15 @@ func BenchmarkCreateAttributes(b *testing.B) {
m.PutInt("test-int-key", 123)
m.PutBool("test-bool-key", true)

c := NewPrometheusConverter()
// preallocate slice to simulate a fully-grown buffer
c.labels = make([]prompb.Label, 0, b.N*m.Len())

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
createAttributes(r, m, ext, nil, true)
c.createAttributes(r, m, ext, nil, true)
}
}

Expand Down Expand Up @@ -439,7 +445,7 @@ func TestPrometheusConverter_addExemplars(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
converter := &prometheusConverter{
converter := &PrometheusConverter{
unique: tt.orig,
}
converter.addExemplars(tt.dataPoint, tt.bucketBounds)
Expand Down Expand Up @@ -620,7 +626,7 @@ func TestAddResourceTargetInfo(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()

addResourceTargetInfo(tc.resource, tc.settings, tc.timestamp, converter)

Expand Down Expand Up @@ -765,7 +771,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()
converter := newPrometheusConverter()
converter := NewPrometheusConverter()

converter.addSummaryDataPoints(
metric.Summary().DataPoints(),
Expand Down Expand Up @@ -875,7 +881,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()
converter := newPrometheusConverter()
converter := NewPrometheusConverter()

converter.addHistogramDataPoints(
metric.Histogram().DataPoints(),
Expand All @@ -893,7 +899,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
}

func TestPrometheusConverter_getOrCreateTimeSeries(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()
lbls := []prompb.Label{
{
Name: "key1",
Expand Down
4 changes: 2 additions & 2 deletions pkg/translator/prometheusremotewrite/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (

const defaultZeroThreshold = 1e-128

func (c *prometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice,
func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string) error {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
lbls := createAttributes(
lbls := c.createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
Expand Down
2 changes: 1 addition & 1 deletion pkg/translator/prometheusremotewrite/histograms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()

converter := newPrometheusConverter()
converter := NewPrometheusConverter()
require.NoError(t, converter.addExponentialHistogramDataPoints(
metric.ExponentialHistogram().DataPoints(),
pcommon.NewResource(),
Expand Down
Loading
Loading