From 90f46f7eb6ea4af851577386ad90acbe52067a27 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Sat, 11 Jan 2025 09:18:29 +0100 Subject: [PATCH] Move bufferpool to dedicated package --- exporter/elasticsearchexporter/bufferpool.go | 40 ------------------- exporter/elasticsearchexporter/exporter.go | 13 +++--- .../internal/pool/bufferpool.go | 40 +++++++++++++++++++ 3 files changed, 47 insertions(+), 46 deletions(-) delete mode 100644 exporter/elasticsearchexporter/bufferpool.go create mode 100644 exporter/elasticsearchexporter/internal/pool/bufferpool.go diff --git a/exporter/elasticsearchexporter/bufferpool.go b/exporter/elasticsearchexporter/bufferpool.go deleted file mode 100644 index b3070885513c..000000000000 --- a/exporter/elasticsearchexporter/bufferpool.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" - -import ( - "bytes" - "io" - "sync" -) - -type bufferPool struct { - pool *sync.Pool -} - -func newBufferPool() *bufferPool { - return &bufferPool{pool: &sync.Pool{New: func() any { return &bytes.Buffer{} }}} -} - -func (w *bufferPool) newPooledBuffer() pooledBuffer { - return pooledBuffer{ - Buffer: w.pool.Get().(*bytes.Buffer), - pool: w.pool, - } -} - -type pooledBuffer struct { - Buffer *bytes.Buffer - pool *sync.Pool -} - -func (p pooledBuffer) recycle() { - p.Buffer.Reset() - p.pool.Put(p.Buffer) -} - -func (p pooledBuffer) WriteTo(w io.Writer) (n int64, err error) { - defer p.recycle() - return p.Buffer.WriteTo(w) -} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 392f3ba552c5..173cd9ff2d33 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool" "runtime" "sync" "time" @@ -34,7 +35,7 @@ type elasticsearchExporter struct { wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer - bufferPool *bufferPool + bufferPool *pool.BufferPool } func newExporter( @@ -68,7 +69,7 @@ func newExporter( model: model, logstashFormat: cfg.LogstashFormat, otel: otel, - bufferPool: newBufferPool(), + bufferPool: pool.NewBufferPool(), } } @@ -173,7 +174,7 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } - buffer := e.bufferPool.newPooledBuffer() + buffer := e.bufferPool.NewPooledBuffer() err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buffer.Buffer) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) @@ -288,7 +289,7 @@ func (e *elasticsearchExporter) pushMetricsData( for fIndex, groupedDataPoints := range groupedDataPointsByIndex { for _, dataPoints := range groupedDataPoints { - buf := e.bufferPool.newPooledBuffer() + buf := e.bufferPool.NewPooledBuffer() dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer) if err != nil { errs = append(errs, err) @@ -409,7 +410,7 @@ func (e *elasticsearchExporter) pushTraceRecord( fIndex = formattedIndex } - buf := e.bufferPool.newPooledBuffer() + buf := e.bufferPool.NewPooledBuffer() err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer) if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) @@ -439,7 +440,7 @@ func (e *elasticsearchExporter) pushSpanEvent( } fIndex = formattedIndex } - buf := e.bufferPool.newPooledBuffer() + buf := e.bufferPool.NewPooledBuffer() e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer) if buf.Buffer.Len() == 0 { return nil diff --git a/exporter/elasticsearchexporter/internal/pool/bufferpool.go b/exporter/elasticsearchexporter/internal/pool/bufferpool.go new file mode 100644 index 000000000000..88f896ae384f --- /dev/null +++ b/exporter/elasticsearchexporter/internal/pool/bufferpool.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pool // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "bytes" + "io" + "sync" +) + +type BufferPool struct { + pool *sync.Pool +} + +func NewBufferPool() *BufferPool { + return &BufferPool{pool: &sync.Pool{New: func() any { return &bytes.Buffer{} }}} +} + +func (w *BufferPool) NewPooledBuffer() PooledBuffer { + return PooledBuffer{ + Buffer: w.pool.Get().(*bytes.Buffer), + pool: w.pool, + } +} + +type PooledBuffer struct { + Buffer *bytes.Buffer + pool *sync.Pool +} + +func (p PooledBuffer) recycle() { + p.Buffer.Reset() + p.pool.Put(p.Buffer) +} + +func (p PooledBuffer) WriteTo(w io.Writer) (n int64, err error) { + defer p.recycle() + return p.Buffer.WriteTo(w) +}