Skip to content

Commit

Permalink
Move bufferpool to dedicated package
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Jan 11, 2025
1 parent d150493 commit 90f46f7
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 46 deletions.
40 changes: 0 additions & 40 deletions exporter/elasticsearchexporter/bufferpool.go

This file was deleted.

13 changes: 7 additions & 6 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -34,7 +35,7 @@ type elasticsearchExporter struct {
wg sync.WaitGroup // active sessions
bulkIndexer bulkIndexer

bufferPool *bufferPool
bufferPool *pool.BufferPool
}

func newExporter(
Expand Down Expand Up @@ -68,7 +69,7 @@ func newExporter(
model: model,
logstashFormat: cfg.LogstashFormat,
otel: otel,
bufferPool: newBufferPool(),
bufferPool: pool.NewBufferPool(),
}
}

Expand Down Expand Up @@ -173,7 +174,7 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

buffer := e.bufferPool.newPooledBuffer()
buffer := e.bufferPool.NewPooledBuffer()
err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buffer.Buffer)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
Expand Down Expand Up @@ -288,7 +289,7 @@ func (e *elasticsearchExporter) pushMetricsData(

for fIndex, groupedDataPoints := range groupedDataPointsByIndex {
for _, dataPoints := range groupedDataPoints {
buf := e.bufferPool.newPooledBuffer()
buf := e.bufferPool.NewPooledBuffer()
dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -409,7 +410,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
fIndex = formattedIndex
}

buf := e.bufferPool.newPooledBuffer()
buf := e.bufferPool.NewPooledBuffer()
err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer)
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
Expand Down Expand Up @@ -439,7 +440,7 @@ func (e *elasticsearchExporter) pushSpanEvent(
}
fIndex = formattedIndex
}
buf := e.bufferPool.newPooledBuffer()
buf := e.bufferPool.NewPooledBuffer()
e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer)
if buf.Buffer.Len() == 0 {
return nil
Expand Down
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/internal/pool/bufferpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package pool // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"bytes"
"io"
"sync"
)

type BufferPool struct {
pool *sync.Pool
}

func NewBufferPool() *BufferPool {
return &BufferPool{pool: &sync.Pool{New: func() any { return &bytes.Buffer{} }}}
}

func (w *BufferPool) NewPooledBuffer() PooledBuffer {
return PooledBuffer{
Buffer: w.pool.Get().(*bytes.Buffer),
pool: w.pool,
}
}

type PooledBuffer struct {
Buffer *bytes.Buffer
pool *sync.Pool
}

func (p PooledBuffer) recycle() {
p.Buffer.Reset()
p.pool.Put(p.Buffer)
}

func (p PooledBuffer) WriteTo(w io.Writer) (n int64, err error) {
defer p.recycle()
return p.Buffer.WriteTo(w)
}

0 comments on commit 90f46f7

Please sign in to comment.