Skip to content

Commit

Permalink
POC byte based batching
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 16, 2025
1 parent b9d4e39 commit b7d0c30
Show file tree
Hide file tree
Showing 14 changed files with 422 additions and 9 deletions.
15 changes: 15 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MinSizeConfig struct {
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
MinSizeBytes int `mapstructure:"min_size_bytes"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
Expand All @@ -41,18 +42,32 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
MaxSizeBytes int `mapstructure:"max_size_bytes"`
}

func (c Config) Validate() error {
if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 {
return errors.New("size limit and bytes limit cannot be specified at the same time")
}

Check warning on line 51 in exporter/exporterbatcher/config.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterbatcher/config.go#L50-L51

Added lines #L50 - L51 were not covered by tests

if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MinSizeBytes < 0 {
return errors.New("min_size_bytes must be greater than or equal to zero")
}

Check warning on line 58 in exporter/exporterbatcher/config.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterbatcher/config.go#L57-L58

Added lines #L57 - L58 were not covered by tests
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeBytes < 0 {
return errors.New("max_size_bytes must be greater than or equal to zero")
}

Check warning on line 64 in exporter/exporterbatcher/config.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterbatcher/config.go#L63-L64

Added lines #L63 - L64 were not covered by tests
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes {
return errors.New("max_size_bytes must be greater than or equal to min_size_bytes")
}

Check warning on line 70 in exporter/exporterbatcher/config.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterbatcher/config.go#L69-L70

Added lines #L69 - L70 were not covered by tests
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) ByteSize() int {
return r.items

Check warning on line 60 in exporter/exporterhelper/internal/request.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/request.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}

func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
Expand Down
8 changes: 8 additions & 0 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int {
return 7
}

func (mer *mockErrorRequest) ByteSize() int {
return 7
}

func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down Expand Up @@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int {
return m.cnt
}

func (m *mockRequest) ByteSize() int {
return m.cnt
}

func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down
20 changes: 18 additions & 2 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedByteSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
Expand Down Expand Up @@ -66,6 +67,21 @@ func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
}

func (req *logsRequest) ByteSize() int {
if req.cachedByteSize == 0 {
req.cachedByteSize = logsMarshaler.LogsSize(req.ld)
}
return req.cachedByteSize
}

func (req *logsRequest) invalidateCachedByteSize() {
req.cachedByteSize = 0
}

func (req *logsRequest) updateCachedByteSize(delta int) {
req.cachedByteSize += delta
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down
143 changes: 141 additions & 2 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"
"errors"
math_bits "math/bits"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,11 +24,144 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}
}

if cfg.MaxSizeItems == 0 {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
if req2 != nil {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
}
return []Request{req}, nil
}
if cfg.MaxSizeBytes > 0 {
return req.mergeSplitBasedOnByteSize(cfg, req2)
}
return req.mergeSplitBasedOnItemCount(cfg, req2)
}

func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
var (
res []Request
destReq *logsRequest
capacityLeft = cfg.MaxSizeBytes
)
for _, srcReq := range []*logsRequest{req, req2} {
if srcReq == nil {
continue
}

ByteSize := srcReq.ByteSize()
if ByteSize <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
destReq.updateCachedByteSize(ByteSize)
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
capacityLeft -= ByteSize
continue
}

for {
srcReq.invalidateCachedByteSize()
extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft)

if extractedLogs.LogRecordCount() == 0 {
break
}
if destReq == nil {
destReq = &logsRequest{
ld: extractedLogs,
pusher: srcReq.pusher,
}
} else {
destReq.updateCachedByteSize(logsMarshaler.LogsSize(extractedLogs))
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
// Create new batch once capacity is reached.
if capacityReached {
res = append(res, destReq)
destReq = nil
capacityLeft = cfg.MaxSizeBytes
} else {
capacityLeft = cfg.MaxSizeBytes - destReq.ByteSize()
}
}
}

if destReq != nil {
res = append(res, destReq)
}
return res, nil
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) {
capacityReached := false
destLogs := plog.NewLogs()
capacityLeft := capacity - logsMarshaler.LogsSize(destLogs)
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if capacityReached {
return false
}

Check warning on line 103 in exporter/exporterhelper/logs_batch.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/logs_batch.go#L102-L103

Added lines #L102 - L103 were not covered by tests
needToExtract := logsMarshaler.ResourceLogsSize(srcRL) > capacityLeft
if needToExtract {
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacityLeft)
if srcRL.ScopeLogs().Len() == 0 {
return false
}

Check warning on line 109 in exporter/exporterhelper/logs_batch.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/logs_batch.go#L108-L109

Added lines #L108 - L109 were not covered by tests
}
capacityLeft -= deltaCapacity(logsMarshaler.ResourceLogsSize(srcRL))
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs, capacityReached
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
capacityLeft := capacity - logsMarshaler.ResourceLogsSize(destRL)
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if capacityReached {
return false
}

Check warning on line 128 in exporter/exporterhelper/logs_batch.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/logs_batch.go#L127-L128

Added lines #L127 - L128 were not covered by tests
needToExtract := logsMarshaler.ScopeLogsSize(srcSL) > capacityLeft
if needToExtract {
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacityLeft)
if srcSL.LogRecords().Len() == 0 {
return false
}
}

capacityLeft -= deltaCapacity(logsMarshaler.ScopeLogsSize(srcSL))
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
capacityLeft := capacity - logsMarshaler.ScopeLogsSize(destSL)

srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if capacityReached || logsMarshaler.LogRecordSize(srcLR) > capacityLeft {
capacityReached = true
return false
}
capacityLeft -= deltaCapacity(logsMarshaler.LogRecordSize(srcLR))
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
return true
})
return destSL, capacityReached
}

func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
var (
res []Request
destReq *logsRequest
Expand Down Expand Up @@ -75,6 +209,11 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
return res, nil
}

// deltaCapacity() returns the delta size of a proto slice when a new item is added.
func deltaCapacity(newItemSize int) int {
return 1 + newItemSize + int(math_bits.Len64(uint64(newItemSize|1)+6)/7)
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogs(srcLogs plog.Logs, count int) plog.Logs {
destLogs := plog.NewLogs()
Expand Down
Loading

0 comments on commit b7d0c30

Please sign in to comment.