Skip to content

Commit

Permalink
New optimization based on Bogdan's feedabck
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 15, 2025
1 parent 716bbda commit a8a28cd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 27 deletions.
19 changes: 15 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
byteSize int
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedByteSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
Expand Down Expand Up @@ -68,7 +68,18 @@ func (req *logsRequest) ItemsCount() int {
}

func (req *logsRequest) ByteSize() int {
return req.ld.ByteSize()
if req.cachedByteSize == 0 {
req.cachedByteSize = req.ld.ByteSize()
}
return req.cachedByteSize
}

func (req *logsRequest) invalidateCachedByteSize() {
req.cachedByteSize = 0
}

func (req *logsRequest) updateCachedByteSize(delta int) {
req.cachedByteSize += delta
}

type logsExporter struct {
Expand Down
45 changes: 28 additions & 17 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"
"errors"
math_bits "math/bits"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -46,35 +47,32 @@ func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeCon
continue
}

ByteSize := srcReq.byteSize
if ByteSize == 0 {
ByteSize = srcReq.ld.ByteSize()
}
if ByteSize > capacityLeft && capacityLeft < cfg.MaxSizeBytes {
res = append(res, destReq)
destReq = nil
capacityLeft = cfg.MaxSizeBytes
}

ByteSize := srcReq.ByteSize()
if ByteSize <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
destReq.updateCachedByteSize(ByteSize)
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.byteSize += ByteSize
}
capacityLeft -= ByteSize
continue
}

for {
srcReq.invalidateCachedByteSize()
extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft)

if extractedLogs.LogRecordCount() == 0 {
break
}
if destReq == nil {
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher}
destReq = &logsRequest{
ld: extractedLogs,
pusher: srcReq.pusher,
}
} else {
destReq.updateCachedByteSize(extractedLogs.ByteSize())
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
// Create new batch once capacity is reached.
Expand All @@ -98,17 +96,19 @@ func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeCon
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) {
capacityReached := false
destLogs := plog.NewLogs()
capacityLeft := capacity - destLogs.ByteSize()
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if capacityReached {
return false
}
needToExtract := srcRL.Size() > capacity-destLogs.ByteSize()
needToExtract := srcRL.Size() > capacityLeft
if needToExtract {
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacity-destLogs.ByteSize())
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacityLeft)
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
capacityLeft -= deltaCapacity(srcRL.Size())
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
Expand All @@ -121,17 +121,20 @@ func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
capacityLeft := capacity - destRL.Size()
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if capacityReached {
return false
}
needToExtract := srcSL.Size() > capacity-destRL.Size()
needToExtract := srcSL.Size() > capacityLeft
if needToExtract {
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacity-destRL.Size())
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacityLeft)
if srcSL.LogRecords().Len() == 0 {
return false
}
}

capacityLeft -= deltaCapacity(srcSL.Size())
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
Expand All @@ -144,11 +147,14 @@ func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.S
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
capacityLeft := capacity - destSL.Size()

srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if capacityReached || srcLR.Size()+destSL.Size() > capacity {
if capacityReached || srcLR.Size() > capacityLeft {
capacityReached = true
return false
}
capacityLeft -= deltaCapacity(srcLR.Size())
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
return true
})
Expand Down Expand Up @@ -203,6 +209,11 @@ func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeCo
return res, nil
}

// deltaCapacity() returns the delta size of a proto slice when a new item is added.
func deltaCapacity(newItemSize int) int {
return 1 + newItemSize + int(math_bits.Len64(uint64(newItemSize|1)+6)/7)
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogs(srcLogs plog.Logs, count int) plog.Logs {
destLogs := plog.NewLogs()
Expand Down
13 changes: 7 additions & 6 deletions exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMergeSplitLogsBasedOnItemCount(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*logsRequest))
assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld)
}
})
}
Expand Down Expand Up @@ -246,7 +246,8 @@ func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*logsRequest))
assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld)
assert.Equal(t, r.(*logsRequest).ByteSize(), r.(*logsRequest).ld.ByteSize())
}
})
}
Expand Down Expand Up @@ -286,7 +287,7 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B)
}

func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 960052} // 960052 corresponds to 10000 generated logs
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10000).ByteSize()}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(10001)}
for j := 0; j < 10; j++ {
Expand All @@ -308,7 +309,7 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyBelowLimit(b *testing.B)
}

func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyBelowLimit(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 960052} // 960052 corresponds to 10000 generated logs
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10000).ByteSize()}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(9999)}
for j := 0; j < 10; j++ {
Expand All @@ -322,13 +323,13 @@ func BenchmarkSplittingBasedOnItemCountHugeLog(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(1)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)} // l2 is of size 9.600054 MB
lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)}
lr1.MergeSplit(context.Background(), cfg, lr2)
}
}

func BenchmarkSplittingBasedOnByteSizeHugeLog(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 970000}
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10000).ByteSize()}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(1)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)}
Expand Down

0 comments on commit a8a28cd

Please sign in to comment.