diff --git a/exporter/exporterbatcher/config.go b/exporter/exporterbatcher/config.go index 239dc2dd4fe..f9cb604ff77 100644 --- a/exporter/exporterbatcher/config.go +++ b/exporter/exporterbatcher/config.go @@ -31,6 +31,7 @@ type MinSizeConfig struct { // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. MinSizeItems int `mapstructure:"min_size_items"` + MinSizeBytes int `mapstructure:"min_size_bytes"` } // MaxSizeConfig defines the configuration for the maximum number of items in a batch. @@ -41,18 +42,32 @@ type MaxSizeConfig struct { // If the batch size exceeds this value, it will be broken up into smaller batches if possible. // Setting this value to zero disables the maximum size limit. MaxSizeItems int `mapstructure:"max_size_items"` + MaxSizeBytes int `mapstructure:"max_size_bytes"` } func (c Config) Validate() error { + if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 { + return errors.New("size limit and bytes limit cannot be specified at the same time") + } + if c.MinSizeItems < 0 { return errors.New("min_size_items must be greater than or equal to zero") } + if c.MinSizeBytes < 0 { + return errors.New("min_size_bytes must be greater than or equal to zero") + } if c.MaxSizeItems < 0 { return errors.New("max_size_items must be greater than or equal to zero") } + if c.MaxSizeBytes < 0 { + return errors.New("max_size_bytes must be greater than or equal to zero") + } if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems { return errors.New("max_size_items must be greater than or equal to min_size_items") } + if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes { + return errors.New("max_size_bytes must be greater than or equal to min_size_bytes") + } if c.FlushTimeout <= 0 { return errors.New("timeout must be greater than zero") } diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 470f4c62e82..e6b5ae94ce2 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int { return 7 } +func (mer *mockErrorRequest) ByteSize() int { + return 7 +} + func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { return nil, nil } @@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int { return m.cnt } +func (m *mockRequest) ByteSize() int { + return m.cnt +} + func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 74a658b98fe..67414e8b9af 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -25,8 +25,9 @@ var ( ) type logsRequest struct { - ld plog.Logs - pusher consumer.ConsumeLogsFunc + ld plog.Logs + pusher consumer.ConsumeLogsFunc + cachedByteSize int } func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { @@ -66,6 +67,21 @@ func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } +func (req *logsRequest) ByteSize() int { + if req.cachedByteSize == 0 { + req.cachedByteSize = logsMarshaler.LogsSize(req.ld) + } + return req.cachedByteSize +} + +func (req *logsRequest) invalidateCachedByteSize() { + req.cachedByteSize = 0 +} + +func (req *logsRequest) updateCachedByteSize(delta int) { + req.cachedByteSize += delta +} + type logsExporter struct { *internal.BaseExporter consumer.Logs diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 8bdad75e744..a119060a4f5 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" "errors" + math_bits "math/bits" "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/pdata/plog" @@ -23,11 +24,144 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz } } - if cfg.MaxSizeItems == 0 { - req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 { + if req2 != nil { + req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + } return []Request{req}, nil } + if cfg.MaxSizeBytes > 0 { + return req.mergeSplitBasedOnByteSize(cfg, req2) + } + return req.mergeSplitBasedOnItemCount(cfg, req2) +} +func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) { + var ( + res []Request + destReq *logsRequest + capacityLeft = cfg.MaxSizeBytes + ) + for _, srcReq := range []*logsRequest{req, req2} { + if srcReq == nil { + continue + } + + ByteSize := srcReq.ByteSize() + if ByteSize <= capacityLeft { + if destReq == nil { + destReq = srcReq + } else { + destReq.updateCachedByteSize(ByteSize) + srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + } + capacityLeft -= ByteSize + continue + } + + for { + srcReq.invalidateCachedByteSize() + extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft) + + if extractedLogs.LogRecordCount() == 0 { + break + } + if destReq == nil { + destReq = &logsRequest{ + ld: extractedLogs, + pusher: srcReq.pusher, + } + } else { + destReq.updateCachedByteSize(logsMarshaler.LogsSize(extractedLogs)) + extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + } + // Create new batch once capacity is reached. + if capacityReached { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeBytes + } else { + capacityLeft = cfg.MaxSizeBytes - destReq.ByteSize() + } + } + } + + if destReq != nil { + res = append(res, destReq) + } + return res, nil +} + +// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records. +func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) { + capacityReached := false + destLogs := plog.NewLogs() + capacityLeft := capacity - logsMarshaler.LogsSize(destLogs) + srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool { + if capacityReached { + return false + } + needToExtract := logsMarshaler.ResourceLogsSize(srcRL) > capacityLeft + if needToExtract { + srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacityLeft) + if srcRL.ScopeLogs().Len() == 0 { + return false + } + } + capacityLeft -= deltaCapacity(logsMarshaler.ResourceLogsSize(srcRL)) + srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty()) + return !needToExtract + }) + return destLogs, capacityReached +} + +// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records. +func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) { + capacityReached := false + destRL := plog.NewResourceLogs() + destRL.SetSchemaUrl(srcRL.SchemaUrl()) + srcRL.Resource().CopyTo(destRL.Resource()) + capacityLeft := capacity - logsMarshaler.ResourceLogsSize(destRL) + srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool { + if capacityReached { + return false + } + needToExtract := logsMarshaler.ScopeLogsSize(srcSL) > capacityLeft + if needToExtract { + srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacityLeft) + if srcSL.LogRecords().Len() == 0 { + return false + } + } + + capacityLeft -= deltaCapacity(logsMarshaler.ScopeLogsSize(srcSL)) + srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty()) + return !needToExtract + }) + return destRL, capacityReached +} + +// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records. +func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) { + capacityReached := false + destSL := plog.NewScopeLogs() + destSL.SetSchemaUrl(srcSL.SchemaUrl()) + srcSL.Scope().CopyTo(destSL.Scope()) + capacityLeft := capacity - logsMarshaler.ScopeLogsSize(destSL) + + srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool { + if capacityReached || logsMarshaler.LogRecordSize(srcLR) > capacityLeft { + capacityReached = true + return false + } + capacityLeft -= deltaCapacity(logsMarshaler.LogRecordSize(srcLR)) + srcLR.MoveTo(destSL.LogRecords().AppendEmpty()) + return true + }) + return destSL, capacityReached +} + +func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) { var ( res []Request destReq *logsRequest @@ -75,6 +209,11 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz return res, nil } +// deltaCapacity() returns the delta size of a proto slice when a new item is added. +func deltaCapacity(newItemSize int) int { + return 1 + newItemSize + int(math_bits.Len64(uint64(newItemSize|1)+6)/7) +} + // extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records. func extractLogs(srcLogs plog.Logs, count int) plog.Logs { destLogs := plog.NewLogs() diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index 6f7a87be0e3..d9c401759d4 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -31,7 +31,7 @@ func TestMergeLogsInvalidInput(t *testing.T) { require.Error(t, err) } -func TestMergeSplitLogs(t *testing.T) { +func TestMergeSplitLogsBasedOnItemCount(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig @@ -123,7 +123,7 @@ func TestMergeSplitLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i, r := range res { - assert.Equal(t, tt.expected[i], r.(*logsRequest)) + assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld) } }) } @@ -152,3 +152,187 @@ func TestExtractLogs(t *testing.T) { assert.Equal(t, 10-i, ld.LogRecordCount()) } } + +func TestMergeSplitLogsBasedOnByteSize(t *testing.T) { + // Magic number is the byte size testdata.GenerateLogs(10) + tests := []struct { + name string + cfg exporterbatcher.MaxSizeConfig + lr1 internal.Request + lr2 internal.Request + expected []*logsRequest + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(10))}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: plog.NewLogs()}, + expected: []*logsRequest{{ld: plog.NewLogs()}}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(10))}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(5)}, + expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, + }, + { + name: "first_empty_second_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(10))}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: nil, + expected: []*logsRequest{{ld: plog.NewLogs()}}, + }, + { + name: "merge_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(11))}, + lr1: &logsRequest{ld: testdata.GenerateLogs(4)}, + lr2: &logsRequest{ld: testdata.GenerateLogs(6)}, + expected: []*logsRequest{{ld: func() plog.Logs { + logs := testdata.GenerateLogs(4) + testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }()}}, + }, + { + name: "split_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(4))}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(10)}, + expected: []*logsRequest{ + {ld: testdata.GenerateLogs(4)}, + {ld: testdata.GenerateLogs(4)}, + {ld: testdata.GenerateLogs(2)}, + }, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: (logsMarshaler.LogsSize(testdata.GenerateLogs(10)) + logsMarshaler.LogsSize(testdata.GenerateLogs(11))) / 2}, + lr1: &logsRequest{ld: testdata.GenerateLogs(8)}, + lr2: &logsRequest{ld: testdata.GenerateLogs(20)}, + expected: []*logsRequest{ + {ld: func() plog.Logs { + logs := testdata.GenerateLogs(8) + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }()}, + {ld: testdata.GenerateLogs(10)}, + {ld: testdata.GenerateLogs(8)}, + }, + }, + { + name: "scope_logs_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(4))}, + lr1: &logsRequest{ld: func() plog.Logs { + ld := testdata.GenerateLogs(4) + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log") + return ld + }()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(2)}, + expected: []*logsRequest{ + {ld: testdata.GenerateLogs(4)}, + {ld: func() plog.Logs { + ld := testdata.GenerateLogs(0) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log") + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs()) + return ld + }()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2) + require.NoError(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i, r := range res { + assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld) + assert.Equal(t, r.(*logsRequest).ByteSize(), logsMarshaler.LogsSize(r.(*logsRequest).ld)) + } + }) + } +} + +func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(10)} + for j := 0; j < 1000; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnByteSizeManySmallLogs(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(10)} + for j := 0; j < 1000; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(10001)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(10000))} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(10001)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyBelowLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(9999)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9999)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyBelowLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(10000))} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(9999)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9999)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnItemCountHugeLog(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(1)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } +} + +func BenchmarkSplittingBasedOnByteSizeHugeLog(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: logsMarshaler.LogsSize(testdata.GenerateLogs(10000))} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(1)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } +} diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3d4ccbd2ae..58460342d3e 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -66,6 +66,10 @@ func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } +func (req *metricsRequest) ByteSize() int { + return metricsMarshaler.MetricsSize(req.md) +} + type metricsExporter struct { *internal.BaseExporter consumer.Metrics diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index f8387d5a3b8..3311006c2b9 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -66,6 +66,10 @@ func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } +func (req *tracesRequest) ByteSize() int { + return tracesMarshaler.TracesSize(req.td) +} + type tracesExporter struct { *internal.BaseExporter consumer.Traces diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index d9eb55b3ef7..42369859773 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -69,6 +69,10 @@ func (req *profilesRequest) ItemsCount() int { return req.pd.SampleCount() } +func (req *profilesRequest) ByteSize() int { + return req.pd.SampleCount() +} + type profileExporter struct { *internal.BaseExporter xconsumer.Profiles diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 2981d11830b..1516f0657d4 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -155,6 +155,10 @@ func (req *dummyRequest) ItemsCount() int { return 1 } +func (req *dummyRequest) ByteSize() int { + return 1 +} + func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) ( []exporterhelper.Request, error, ) { diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index dced12e9e2b..eece6445483 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -28,6 +28,19 @@ func (qb *DefaultBatcher) resetTimer() { } } +func (qb *DefaultBatcher) maxSizeLimitExists() bool { + return qb.batchCfg.MaxSizeItems > 0 || qb.batchCfg.MaxSizeBytes > 0 +} + +func (qb *DefaultBatcher) reachedMinSizeThreadhold(req internal.Request) bool { + if qb.batchCfg.MinSizeItems > 0 { + return req.ItemsCount() >= qb.batchCfg.MinSizeItems + } else if qb.batchCfg.MinSizeBytes > 0 { + return req.ByteSize() >= qb.batchCfg.MinSizeBytes + } + return true +} + // startReadingFlushingGoroutine starts a goroutine that reads and then flushes. func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.stopWG.Add(1) @@ -43,7 +56,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Lock() - if qb.batchCfg.MaxSizeItems > 0 { + if qb.maxSizeLimitExists() { var reqList []internal.Request var mergeSplitErr error if qb.currentBatch == nil || qb.currentBatch.req == nil { @@ -60,7 +73,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } // If there was a split, we flush everything immediately. - if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 { + if qb.reachedMinSizeThreadhold(reqList[0]) || len(reqList) > 1 { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { @@ -103,7 +116,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } } - if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems { + if qb.reachedMinSizeThreadhold(qb.currentBatch.req) { batchToFlush := *qb.currentBatch qb.currentBatch = nil qb.currentBatchMu.Unlock() diff --git a/exporter/internal/request.go b/exporter/internal/request.go index 88a914b9e36..c14ba6e60f1 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -19,6 +19,8 @@ type Request interface { // sent. For example, for OTLP exporter, this value represents the number of spans, // metric data points or log records. ItemsCount() int + // ByteSize returns the serialized size of the request. + ByteSize() int // MergeSplit is a function that merge and/or splits this request with another one into multiple requests based on the // configured limit provided in MaxSizeConfig. // MergeSplit does not split if all fields in MaxSizeConfig are not initialized (zero). diff --git a/pdata/plog/pb.go b/pdata/plog/pb.go index bb102591bf2..a4cb09eb6ea 100644 --- a/pdata/plog/pb.go +++ b/pdata/plog/pb.go @@ -22,6 +22,18 @@ func (e *ProtoMarshaler) LogsSize(ld Logs) int { return pb.Size() } +func (e *ProtoMarshaler) ResourceLogsSize(rl ResourceLogs) int { + return rl.orig.Size() +} + +func (e *ProtoMarshaler) ScopeLogsSize(sl ScopeLogs) int { + return sl.orig.Size() +} + +func (e *ProtoMarshaler) LogRecordSize(lr LogRecord) int { + return lr.orig.Size() +} + var _ Unmarshaler = (*ProtoUnmarshaler)(nil) type ProtoUnmarshaler struct{}