From a4ad24ca9c2a6a0b3c311c359f480cc923d23bb4 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Sat, 4 Jan 2025 17:30:28 -0800 Subject: [PATCH 1/2] POC byte based batching --- exporter/exporterbatcher/config.go | 15 ++ exporter/exporterhelper/internal/request.go | 4 + .../internal/retry_sender_test.go | 8 + exporter/exporterhelper/logs.go | 4 + exporter/exporterhelper/logs_batch.go | 122 ++++++++++++++- exporter/exporterhelper/logs_batch_test.go | 141 +++++++++++++++++- exporter/exporterhelper/metrics.go | 4 + exporter/exporterhelper/traces.go | 4 + .../xexporterhelper/profiles.go | 4 + .../xexporterhelper/profiles_batch_test.go | 4 + .../exporterqueue/persistent_queue_test.go | 24 +++ exporter/go.mod | 1 + exporter/go.sum | 2 + exporter/internal/queue/default_batcher.go | 19 ++- exporter/internal/queue/fake_request_test.go | 4 + exporter/internal/request.go | 2 + .../internal/templates/message.go.tmpl | 4 + .../pcommon/generated_instrumentationscope.go | 4 + pdata/pcommon/generated_resource.go | 4 + pdata/plog/generated_logrecord.go | 4 + pdata/plog/generated_resourcelogs.go | 4 + pdata/plog/generated_scopelogs.go | 4 + pdata/plog/logs.go | 8 + .../generated_exportpartialsuccess.go | 4 + pdata/pmetric/generated_exemplar.go | 4 + .../pmetric/generated_exponentialhistogram.go | 4 + ...generated_exponentialhistogramdatapoint.go | 4 + ...ed_exponentialhistogramdatapointbuckets.go | 4 + pdata/pmetric/generated_gauge.go | 4 + pdata/pmetric/generated_histogram.go | 4 + pdata/pmetric/generated_histogramdatapoint.go | 4 + pdata/pmetric/generated_metric.go | 4 + pdata/pmetric/generated_numberdatapoint.go | 4 + pdata/pmetric/generated_resourcemetrics.go | 4 + pdata/pmetric/generated_scopemetrics.go | 4 + pdata/pmetric/generated_sum.go | 4 + pdata/pmetric/generated_summary.go | 4 + pdata/pmetric/generated_summarydatapoint.go | 4 + ...nerated_summarydatapointvalueatquantile.go | 4 + pdata/pmetric/metrics.go | 4 + .../generated_exportpartialsuccess.go | 4 + pdata/pprofile/generated_attribute.go | 4 + pdata/pprofile/generated_attributeunit.go | 4 + pdata/pprofile/generated_function.go | 4 + pdata/pprofile/generated_line.go | 4 + pdata/pprofile/generated_link.go | 4 + pdata/pprofile/generated_location.go | 4 + pdata/pprofile/generated_mapping.go | 4 + pdata/pprofile/generated_profile.go | 4 + pdata/pprofile/generated_resourceprofiles.go | 4 + pdata/pprofile/generated_sample.go | 4 + pdata/pprofile/generated_scopeprofiles.go | 4 + pdata/pprofile/generated_valuetype.go | 4 + .../generated_exportpartialsuccess.go | 4 + pdata/ptrace/generated_resourcespans.go | 4 + pdata/ptrace/generated_scopespans.go | 4 + pdata/ptrace/generated_span.go | 4 + pdata/ptrace/generated_spanevent.go | 4 + pdata/ptrace/generated_spanlink.go | 4 + pdata/ptrace/generated_status.go | 4 + .../generated_exportpartialsuccess.go | 4 + pdata/ptrace/traces.go | 4 + 62 files changed, 544 insertions(+), 6 deletions(-) diff --git a/exporter/exporterbatcher/config.go b/exporter/exporterbatcher/config.go index 239dc2dd4fe..f9cb604ff77 100644 --- a/exporter/exporterbatcher/config.go +++ b/exporter/exporterbatcher/config.go @@ -31,6 +31,7 @@ type MinSizeConfig struct { // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. MinSizeItems int `mapstructure:"min_size_items"` + MinSizeBytes int `mapstructure:"min_size_bytes"` } // MaxSizeConfig defines the configuration for the maximum number of items in a batch. @@ -41,18 +42,32 @@ type MaxSizeConfig struct { // If the batch size exceeds this value, it will be broken up into smaller batches if possible. // Setting this value to zero disables the maximum size limit. MaxSizeItems int `mapstructure:"max_size_items"` + MaxSizeBytes int `mapstructure:"max_size_bytes"` } func (c Config) Validate() error { + if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 { + return errors.New("size limit and bytes limit cannot be specified at the same time") + } + if c.MinSizeItems < 0 { return errors.New("min_size_items must be greater than or equal to zero") } + if c.MinSizeBytes < 0 { + return errors.New("min_size_bytes must be greater than or equal to zero") + } if c.MaxSizeItems < 0 { return errors.New("max_size_items must be greater than or equal to zero") } + if c.MaxSizeBytes < 0 { + return errors.New("max_size_bytes must be greater than or equal to zero") + } if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems { return errors.New("max_size_items must be greater than or equal to min_size_items") } + if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes { + return errors.New("max_size_bytes must be greater than or equal to min_size_bytes") + } if c.FlushTimeout <= 0 { return errors.New("timeout must be greater than zero") } diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index daf06830fec..767d20defd4 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int { return r.items } +func (r *fakeRequest) ByteSize() int { + return r.items +} + func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) { if r.mergeErr != nil { return nil, r.mergeErr diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 470f4c62e82..e6b5ae94ce2 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int { return 7 } +func (mer *mockErrorRequest) ByteSize() int { + return 7 +} + func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { return nil, nil } @@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int { return m.cnt } +func (m *mockRequest) ByteSize() int { + return m.cnt +} + func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 74a658b98fe..fafb34c94dc 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -66,6 +66,10 @@ func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } +func (req *logsRequest) ByteSize() int { + return req.ld.ByteSize() +} + type logsExporter struct { *internal.BaseExporter consumer.Logs diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 4e4609b18ca..3bd7dc472f9 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -23,11 +23,129 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz } } - if cfg.MaxSizeItems == 0 { - req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 { + if req2 != nil { + req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + } return []Request{req}, nil } + if cfg.MaxSizeBytes > 0 { + return req.mergeSplitBasedOnByteSize(cfg, req2) + } + return req.mergeSplitBasedOnItemCount(cfg, req2) +} + +func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) { + var ( + res []Request + destReq *logsRequest + capacityLeft = cfg.MaxSizeBytes + ) + for _, srcReq := range []*logsRequest{req, req2} { + if srcReq == nil { + continue + } + + ByteSize := srcReq.ld.ByteSize() + if ByteSize <= capacityLeft { + if destReq == nil { + destReq = srcReq + } else { + srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + } + capacityLeft -= ByteSize + continue + } + + for { + extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft) + if extractedLogs.LogRecordCount() == 0 { + break + } + if destReq == nil { + destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher} + } else { + extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + } + // Create new batch once capacity is reached. + if capacityReached { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeBytes + } else { + capacityLeft = cfg.MaxSizeBytes - destReq.ByteSize() + } + } + } + + if destReq != nil { + res = append(res, destReq) + } + return res, nil +} + +// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records. +func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) { + capacityReached := false + destLogs := plog.NewLogs() + srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool { + if capacityReached { + return false + } + needToExtract := srcRL.Size() > capacity-destLogs.ByteSize() + if needToExtract { + srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacity-destLogs.ByteSize()) + if srcRL.ScopeLogs().Len() == 0 { + return false + } + } + srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty()) + return !needToExtract + }) + return destLogs, capacityReached +} + +// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records. +func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) { + capacityReached := false + destRL := plog.NewResourceLogs() + destRL.SetSchemaUrl(srcRL.SchemaUrl()) + srcRL.Resource().CopyTo(destRL.Resource()) + srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool { + if capacityReached { + return false + } + needToExtract := srcSL.Size() > capacity-destRL.Size() + if needToExtract { + srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacity-destRL.Size()) + if srcSL.LogRecords().Len() == 0 { + return false + } + } + srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty()) + return !needToExtract + }) + return destRL, capacityReached +} + +// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records. +func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) { + capacityReached := false + destSL := plog.NewScopeLogs() + destSL.SetSchemaUrl(srcSL.SchemaUrl()) + srcSL.Scope().CopyTo(destSL.Scope()) + srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool { + if capacityReached || srcLR.Size()+destSL.Size() > capacity { + capacityReached = true + return false + } + srcLR.MoveTo(destSL.LogRecords().AppendEmpty()) + return true + }) + return destSL, capacityReached +} +func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) { var ( res []Request destReq *logsRequest diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index d05d87764ee..5fe2920034f 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -31,7 +31,7 @@ func TestMergeLogsInvalidInput(t *testing.T) { require.Error(t, err) } -func TestMergeSplitLogs(t *testing.T) { +func TestMergeSplitLogsBasedOnItemCount(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig @@ -152,3 +152,142 @@ func TestExtractLogs(t *testing.T) { assert.Equal(t, 10-i, ld.LogRecordCount()) } } + +func TestMergeSplitLogsBasedOnByteSize(t *testing.T) { + // Magic number is the byte size testdata.GenerateLogs(10) + tests := []struct { + name string + cfg exporterbatcher.MaxSizeConfig + lr1 internal.Request + lr2 internal.Request + expected []*logsRequest + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: plog.NewLogs()}, + expected: []*logsRequest{{ld: plog.NewLogs()}}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(5)}, + expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, + }, + { + name: "first_empty_second_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: nil, + expected: []*logsRequest{{ld: plog.NewLogs()}}, + }, + { + name: "merge_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(11).ByteSize()}, + lr1: &logsRequest{ld: testdata.GenerateLogs(4)}, + lr2: &logsRequest{ld: testdata.GenerateLogs(6)}, + expected: []*logsRequest{{ld: func() plog.Logs { + logs := testdata.GenerateLogs(4) + testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }()}}, + }, + { + name: "split_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(10)}, + expected: []*logsRequest{ + {ld: testdata.GenerateLogs(4)}, + {ld: testdata.GenerateLogs(4)}, + {ld: testdata.GenerateLogs(2)}, + }, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: (testdata.GenerateLogs(10).ByteSize() + testdata.GenerateLogs(11).ByteSize()) / 2}, + lr1: &logsRequest{ld: testdata.GenerateLogs(8)}, + lr2: &logsRequest{ld: testdata.GenerateLogs(20)}, + expected: []*logsRequest{ + {ld: func() plog.Logs { + logs := testdata.GenerateLogs(8) + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }()}, + {ld: testdata.GenerateLogs(10)}, + {ld: testdata.GenerateLogs(8)}, + }, + }, + { + name: "scope_logs_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()}, + lr1: &logsRequest{ld: func() plog.Logs { + ld := testdata.GenerateLogs(4) + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log") + return ld + }()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(2)}, + expected: []*logsRequest{ + {ld: testdata.GenerateLogs(4)}, + {ld: func() plog.Logs { + ld := testdata.GenerateLogs(0) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log") + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs()) + return ld + }()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2) + require.NoError(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i, r := range res { + assert.Equal(t, tt.expected[i], r.(*logsRequest)) + } + }) + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogs(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(9)} + for j := 0; j < 1000; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnByteSizeManyLogs(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(9)} + for j := 0; j < 1000; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnItemCountHugeLog(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(1)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } +} + +func BenchmarkSplittingBasedOnByteSizeHugeLog(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(1)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } +} diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3d4ccbd2ae..d99bcd11ac1 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -66,6 +66,10 @@ func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } +func (req *metricsRequest) ByteSize() int { + return req.md.ByteSize() +} + type metricsExporter struct { *internal.BaseExporter consumer.Metrics diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index f8387d5a3b8..a505ff59ce8 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -66,6 +66,10 @@ func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } +func (req *tracesRequest) ByteSize() int { + return req.td.ByteSize() +} + type tracesExporter struct { *internal.BaseExporter consumer.Traces diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index d9eb55b3ef7..42369859773 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -69,6 +69,10 @@ func (req *profilesRequest) ItemsCount() int { return req.pd.SampleCount() } +func (req *profilesRequest) ByteSize() int { + return req.pd.SampleCount() +} + type profileExporter struct { *internal.BaseExporter xconsumer.Profiles diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 2981d11830b..1516f0657d4 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -155,6 +155,10 @@ func (req *dummyRequest) ItemsCount() int { return 1 } +func (req *dummyRequest) ByteSize() int { + return 1 +} + func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) ( []exporterhelper.Request, error, ) { diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 02c63017a27..d5903553c78 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/extension/xextension/storage" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" ) @@ -49,6 +50,29 @@ func uint64Unmarshaler(bytes []byte) (uint64, error) { return binary.LittleEndian.Uint64(bytes), nil } +type tracesRequest struct { + traces ptrace.Traces +} + +func (tr tracesRequest) ItemsCount() int { + return tr.traces.SpanCount() +} + +func (tr tracesRequest) ByteSize() int { + return tr.traces.SpanCount() +} + +func marshalTracesRequest(tr tracesRequest) ([]byte, error) { + marshaler := &ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(tr.traces) +} + +func unmarshalTracesRequest(bytes []byte) (tracesRequest, error) { + unmarshaler := &ptrace.ProtoUnmarshaler{} + traces, err := unmarshaler.UnmarshalTraces(bytes) + return tracesRequest{traces: traces}, err +} + type mockHost struct { component.Host ext map[component.ID]component.Component diff --git a/exporter/go.mod b/exporter/go.mod index 40ad5c6c743..12f14784db5 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -40,6 +40,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.117.0 // indirect go.opentelemetry.io/collector/exporter/xexporter v0.117.0 // indirect diff --git a/exporter/go.sum b/exporter/go.sum index 05057d77799..96997ab721e 100644 --- a/exporter/go.sum +++ b/exporter/go.sum @@ -32,6 +32,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= +github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index dced12e9e2b..eece6445483 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -28,6 +28,19 @@ func (qb *DefaultBatcher) resetTimer() { } } +func (qb *DefaultBatcher) maxSizeLimitExists() bool { + return qb.batchCfg.MaxSizeItems > 0 || qb.batchCfg.MaxSizeBytes > 0 +} + +func (qb *DefaultBatcher) reachedMinSizeThreadhold(req internal.Request) bool { + if qb.batchCfg.MinSizeItems > 0 { + return req.ItemsCount() >= qb.batchCfg.MinSizeItems + } else if qb.batchCfg.MinSizeBytes > 0 { + return req.ByteSize() >= qb.batchCfg.MinSizeBytes + } + return true +} + // startReadingFlushingGoroutine starts a goroutine that reads and then flushes. func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.stopWG.Add(1) @@ -43,7 +56,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatchMu.Lock() - if qb.batchCfg.MaxSizeItems > 0 { + if qb.maxSizeLimitExists() { var reqList []internal.Request var mergeSplitErr error if qb.currentBatch == nil || qb.currentBatch.req == nil { @@ -60,7 +73,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } // If there was a split, we flush everything immediately. - if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 { + if qb.reachedMinSizeThreadhold(reqList[0]) || len(reqList) > 1 { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { @@ -103,7 +116,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { } } - if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems { + if qb.reachedMinSizeThreadhold(qb.currentBatch.req) { batchToFlush := *qb.currentBatch qb.currentBatch = nil qb.currentBatchMu.Unlock() diff --git a/exporter/internal/queue/fake_request_test.go b/exporter/internal/queue/fake_request_test.go index b062cdd0d68..7d9b433fc51 100644 --- a/exporter/internal/queue/fake_request_test.go +++ b/exporter/internal/queue/fake_request_test.go @@ -53,6 +53,10 @@ func (r *fakeRequest) ItemsCount() int { return r.items } +func (r *fakeRequest) ByteSize() int { + return r.items +} + func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) { if r.mergeErr != nil { return nil, r.mergeErr diff --git a/exporter/internal/request.go b/exporter/internal/request.go index 88a914b9e36..c14ba6e60f1 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -19,6 +19,8 @@ type Request interface { // sent. For example, for OTLP exporter, this value represents the number of spans, // metric data points or log records. ItemsCount() int + // ByteSize returns the serialized size of the request. + ByteSize() int // MergeSplit is a function that merge and/or splits this request with another one into multiple requests based on the // configured limit provided in MaxSizeConfig. // MergeSplit does not split if all fields in MaxSizeConfig are not initialized (zero). diff --git a/pdata/internal/cmd/pdatagen/internal/templates/message.go.tmpl b/pdata/internal/cmd/pdatagen/internal/templates/message.go.tmpl index b6486fbe48a..80716abf3b8 100644 --- a/pdata/internal/cmd/pdatagen/internal/templates/message.go.tmpl +++ b/pdata/internal/cmd/pdatagen/internal/templates/message.go.tmpl @@ -54,6 +54,10 @@ func (ms {{ .structName }}) MoveTo(dest {{ .structName }}) { *ms.{{ .origAccessor }} = {{ .originName }}{} } +func(ms {{ .structName }}) Size() int { + return ms.{{ .origAccessor }}.Size() +} + {{ if .isCommon -}} func (ms {{ .structName }}) getOrig() *{{ .originName }} { return internal.GetOrig{{ .structName }}(internal.{{ .structName }}(ms)) diff --git a/pdata/pcommon/generated_instrumentationscope.go b/pdata/pcommon/generated_instrumentationscope.go index 8a5d23b549f..67e795c478d 100644 --- a/pdata/pcommon/generated_instrumentationscope.go +++ b/pdata/pcommon/generated_instrumentationscope.go @@ -42,6 +42,10 @@ func (ms InstrumentationScope) MoveTo(dest InstrumentationScope) { *ms.getOrig() = otlpcommon.InstrumentationScope{} } +func (ms InstrumentationScope) Size() int { + return ms.getOrig().Size() +} + func (ms InstrumentationScope) getOrig() *otlpcommon.InstrumentationScope { return internal.GetOrigInstrumentationScope(internal.InstrumentationScope(ms)) } diff --git a/pdata/pcommon/generated_resource.go b/pdata/pcommon/generated_resource.go index 12e6cfa7f3b..07ab59c9025 100644 --- a/pdata/pcommon/generated_resource.go +++ b/pdata/pcommon/generated_resource.go @@ -42,6 +42,10 @@ func (ms Resource) MoveTo(dest Resource) { *ms.getOrig() = otlpresource.Resource{} } +func (ms Resource) Size() int { + return ms.getOrig().Size() +} + func (ms Resource) getOrig() *otlpresource.Resource { return internal.GetOrigResource(internal.Resource(ms)) } diff --git a/pdata/plog/generated_logrecord.go b/pdata/plog/generated_logrecord.go index ee79ddec3cd..e12c47cebb3 100644 --- a/pdata/plog/generated_logrecord.go +++ b/pdata/plog/generated_logrecord.go @@ -47,6 +47,10 @@ func (ms LogRecord) MoveTo(dest LogRecord) { *ms.orig = otlplogs.LogRecord{} } +func (ms LogRecord) Size() int { + return ms.orig.Size() +} + // ObservedTimestamp returns the observedtimestamp associated with this LogRecord. func (ms LogRecord) ObservedTimestamp() pcommon.Timestamp { return pcommon.Timestamp(ms.orig.ObservedTimeUnixNano) diff --git a/pdata/plog/generated_resourcelogs.go b/pdata/plog/generated_resourcelogs.go index 1d240e03975..b13dcc65bf4 100644 --- a/pdata/plog/generated_resourcelogs.go +++ b/pdata/plog/generated_resourcelogs.go @@ -46,6 +46,10 @@ func (ms ResourceLogs) MoveTo(dest ResourceLogs) { *ms.orig = otlplogs.ResourceLogs{} } +func (ms ResourceLogs) Size() int { + return ms.orig.Size() +} + // Resource returns the resource associated with this ResourceLogs. func (ms ResourceLogs) Resource() pcommon.Resource { return pcommon.Resource(internal.NewResource(&ms.orig.Resource, ms.state)) diff --git a/pdata/plog/generated_scopelogs.go b/pdata/plog/generated_scopelogs.go index 6e45a9627f8..8ec36459a10 100644 --- a/pdata/plog/generated_scopelogs.go +++ b/pdata/plog/generated_scopelogs.go @@ -46,6 +46,10 @@ func (ms ScopeLogs) MoveTo(dest ScopeLogs) { *ms.orig = otlplogs.ScopeLogs{} } +func (ms ScopeLogs) Size() int { + return ms.orig.Size() +} + // Scope returns the scope associated with this ScopeLogs. func (ms ScopeLogs) Scope() pcommon.InstrumentationScope { return pcommon.InstrumentationScope(internal.NewInstrumentationScope(&ms.orig.Scope, ms.state)) diff --git a/pdata/plog/logs.go b/pdata/plog/logs.go index 490526090f8..6fac50e9db0 100644 --- a/pdata/plog/logs.go +++ b/pdata/plog/logs.go @@ -21,6 +21,10 @@ func (ms Logs) getOrig() *otlpcollectorlog.ExportLogsServiceRequest { return internal.GetOrigLogs(internal.Logs(ms)) } +func (ms Logs) GetOrig() *otlpcollectorlog.ExportLogsServiceRequest { + return ms.getOrig() +} + func (ms Logs) getState() *internal.State { return internal.GetLogsState(internal.Logs(ms)) } @@ -55,6 +59,10 @@ func (ms Logs) LogRecordCount() int { return logCount } +func (ms Logs) ByteSize() int { + return ms.getOrig().Size() +} + // ResourceLogs returns the ResourceLogsSlice associated with this Logs. func (ms Logs) ResourceLogs() ResourceLogsSlice { return newResourceLogsSlice(&ms.getOrig().ResourceLogs, internal.GetLogsState(internal.Logs(ms))) diff --git a/pdata/plog/plogotlp/generated_exportpartialsuccess.go b/pdata/plog/plogotlp/generated_exportpartialsuccess.go index bcf420c9120..a03c9369964 100644 --- a/pdata/plog/plogotlp/generated_exportpartialsuccess.go +++ b/pdata/plog/plogotlp/generated_exportpartialsuccess.go @@ -45,6 +45,10 @@ func (ms ExportPartialSuccess) MoveTo(dest ExportPartialSuccess) { *ms.orig = otlpcollectorlog.ExportLogsPartialSuccess{} } +func (ms ExportPartialSuccess) Size() int { + return ms.orig.Size() +} + // RejectedLogRecords returns the rejectedlogrecords associated with this ExportPartialSuccess. func (ms ExportPartialSuccess) RejectedLogRecords() int64 { return ms.orig.RejectedLogRecords diff --git a/pdata/pmetric/generated_exemplar.go b/pdata/pmetric/generated_exemplar.go index 9937a1b500f..cebce3e5fc6 100644 --- a/pdata/pmetric/generated_exemplar.go +++ b/pdata/pmetric/generated_exemplar.go @@ -50,6 +50,10 @@ func (ms Exemplar) MoveTo(dest Exemplar) { *ms.orig = otlpmetrics.Exemplar{} } +func (ms Exemplar) Size() int { + return ms.orig.Size() +} + // Timestamp returns the timestamp associated with this Exemplar. func (ms Exemplar) Timestamp() pcommon.Timestamp { return pcommon.Timestamp(ms.orig.TimeUnixNano) diff --git a/pdata/pmetric/generated_exponentialhistogram.go b/pdata/pmetric/generated_exponentialhistogram.go index 18ba20d7377..64f36819b5e 100644 --- a/pdata/pmetric/generated_exponentialhistogram.go +++ b/pdata/pmetric/generated_exponentialhistogram.go @@ -46,6 +46,10 @@ func (ms ExponentialHistogram) MoveTo(dest ExponentialHistogram) { *ms.orig = otlpmetrics.ExponentialHistogram{} } +func (ms ExponentialHistogram) Size() int { + return ms.orig.Size() +} + // AggregationTemporality returns the aggregationtemporality associated with this ExponentialHistogram. func (ms ExponentialHistogram) AggregationTemporality() AggregationTemporality { return AggregationTemporality(ms.orig.AggregationTemporality) diff --git a/pdata/pmetric/generated_exponentialhistogramdatapoint.go b/pdata/pmetric/generated_exponentialhistogramdatapoint.go index 3d76368b384..990ef712930 100644 --- a/pdata/pmetric/generated_exponentialhistogramdatapoint.go +++ b/pdata/pmetric/generated_exponentialhistogramdatapoint.go @@ -49,6 +49,10 @@ func (ms ExponentialHistogramDataPoint) MoveTo(dest ExponentialHistogramDataPoin *ms.orig = otlpmetrics.ExponentialHistogramDataPoint{} } +func (ms ExponentialHistogramDataPoint) Size() int { + return ms.orig.Size() +} + // Attributes returns the Attributes associated with this ExponentialHistogramDataPoint. func (ms ExponentialHistogramDataPoint) Attributes() pcommon.Map { return pcommon.Map(internal.NewMap(&ms.orig.Attributes, ms.state)) diff --git a/pdata/pmetric/generated_exponentialhistogramdatapointbuckets.go b/pdata/pmetric/generated_exponentialhistogramdatapointbuckets.go index 7acfbc627f3..96a2a4af9ee 100644 --- a/pdata/pmetric/generated_exponentialhistogramdatapointbuckets.go +++ b/pdata/pmetric/generated_exponentialhistogramdatapointbuckets.go @@ -46,6 +46,10 @@ func (ms ExponentialHistogramDataPointBuckets) MoveTo(dest ExponentialHistogramD *ms.orig = otlpmetrics.ExponentialHistogramDataPoint_Buckets{} } +func (ms ExponentialHistogramDataPointBuckets) Size() int { + return ms.orig.Size() +} + // Offset returns the offset associated with this ExponentialHistogramDataPointBuckets. func (ms ExponentialHistogramDataPointBuckets) Offset() int32 { return ms.orig.Offset diff --git a/pdata/pmetric/generated_gauge.go b/pdata/pmetric/generated_gauge.go index ba572f9ca57..2ba2b87a581 100644 --- a/pdata/pmetric/generated_gauge.go +++ b/pdata/pmetric/generated_gauge.go @@ -45,6 +45,10 @@ func (ms Gauge) MoveTo(dest Gauge) { *ms.orig = otlpmetrics.Gauge{} } +func (ms Gauge) Size() int { + return ms.orig.Size() +} + // DataPoints returns the DataPoints associated with this Gauge. func (ms Gauge) DataPoints() NumberDataPointSlice { return newNumberDataPointSlice(&ms.orig.DataPoints, ms.state) diff --git a/pdata/pmetric/generated_histogram.go b/pdata/pmetric/generated_histogram.go index 950fb13127e..ca0f384fe5b 100644 --- a/pdata/pmetric/generated_histogram.go +++ b/pdata/pmetric/generated_histogram.go @@ -45,6 +45,10 @@ func (ms Histogram) MoveTo(dest Histogram) { *ms.orig = otlpmetrics.Histogram{} } +func (ms Histogram) Size() int { + return ms.orig.Size() +} + // AggregationTemporality returns the aggregationtemporality associated with this Histogram. func (ms Histogram) AggregationTemporality() AggregationTemporality { return AggregationTemporality(ms.orig.AggregationTemporality) diff --git a/pdata/pmetric/generated_histogramdatapoint.go b/pdata/pmetric/generated_histogramdatapoint.go index 22dc32344a2..62ded836adf 100644 --- a/pdata/pmetric/generated_histogramdatapoint.go +++ b/pdata/pmetric/generated_histogramdatapoint.go @@ -46,6 +46,10 @@ func (ms HistogramDataPoint) MoveTo(dest HistogramDataPoint) { *ms.orig = otlpmetrics.HistogramDataPoint{} } +func (ms HistogramDataPoint) Size() int { + return ms.orig.Size() +} + // Attributes returns the Attributes associated with this HistogramDataPoint. func (ms HistogramDataPoint) Attributes() pcommon.Map { return pcommon.Map(internal.NewMap(&ms.orig.Attributes, ms.state)) diff --git a/pdata/pmetric/generated_metric.go b/pdata/pmetric/generated_metric.go index ecf2dba29ef..950eb45f431 100644 --- a/pdata/pmetric/generated_metric.go +++ b/pdata/pmetric/generated_metric.go @@ -47,6 +47,10 @@ func (ms Metric) MoveTo(dest Metric) { *ms.orig = otlpmetrics.Metric{} } +func (ms Metric) Size() int { + return ms.orig.Size() +} + // Name returns the name associated with this Metric. func (ms Metric) Name() string { return ms.orig.Name diff --git a/pdata/pmetric/generated_numberdatapoint.go b/pdata/pmetric/generated_numberdatapoint.go index bc47c4747d1..e67797a8fc5 100644 --- a/pdata/pmetric/generated_numberdatapoint.go +++ b/pdata/pmetric/generated_numberdatapoint.go @@ -46,6 +46,10 @@ func (ms NumberDataPoint) MoveTo(dest NumberDataPoint) { *ms.orig = otlpmetrics.NumberDataPoint{} } +func (ms NumberDataPoint) Size() int { + return ms.orig.Size() +} + // Attributes returns the Attributes associated with this NumberDataPoint. func (ms NumberDataPoint) Attributes() pcommon.Map { return pcommon.Map(internal.NewMap(&ms.orig.Attributes, ms.state)) diff --git a/pdata/pmetric/generated_resourcemetrics.go b/pdata/pmetric/generated_resourcemetrics.go index 43622f3f806..72cda4c590c 100644 --- a/pdata/pmetric/generated_resourcemetrics.go +++ b/pdata/pmetric/generated_resourcemetrics.go @@ -46,6 +46,10 @@ func (ms ResourceMetrics) MoveTo(dest ResourceMetrics) { *ms.orig = otlpmetrics.ResourceMetrics{} } +func (ms ResourceMetrics) Size() int { + return ms.orig.Size() +} + // Resource returns the resource associated with this ResourceMetrics. func (ms ResourceMetrics) Resource() pcommon.Resource { return pcommon.Resource(internal.NewResource(&ms.orig.Resource, ms.state)) diff --git a/pdata/pmetric/generated_scopemetrics.go b/pdata/pmetric/generated_scopemetrics.go index 1151c36dae3..89a8d65ef44 100644 --- a/pdata/pmetric/generated_scopemetrics.go +++ b/pdata/pmetric/generated_scopemetrics.go @@ -46,6 +46,10 @@ func (ms ScopeMetrics) MoveTo(dest ScopeMetrics) { *ms.orig = otlpmetrics.ScopeMetrics{} } +func (ms ScopeMetrics) Size() int { + return ms.orig.Size() +} + // Scope returns the scope associated with this ScopeMetrics. func (ms ScopeMetrics) Scope() pcommon.InstrumentationScope { return pcommon.InstrumentationScope(internal.NewInstrumentationScope(&ms.orig.Scope, ms.state)) diff --git a/pdata/pmetric/generated_sum.go b/pdata/pmetric/generated_sum.go index 7def0749aa5..9aecaeeb20a 100644 --- a/pdata/pmetric/generated_sum.go +++ b/pdata/pmetric/generated_sum.go @@ -45,6 +45,10 @@ func (ms Sum) MoveTo(dest Sum) { *ms.orig = otlpmetrics.Sum{} } +func (ms Sum) Size() int { + return ms.orig.Size() +} + // AggregationTemporality returns the aggregationtemporality associated with this Sum. func (ms Sum) AggregationTemporality() AggregationTemporality { return AggregationTemporality(ms.orig.AggregationTemporality) diff --git a/pdata/pmetric/generated_summary.go b/pdata/pmetric/generated_summary.go index 64fbffbefd3..4b0c609c375 100644 --- a/pdata/pmetric/generated_summary.go +++ b/pdata/pmetric/generated_summary.go @@ -45,6 +45,10 @@ func (ms Summary) MoveTo(dest Summary) { *ms.orig = otlpmetrics.Summary{} } +func (ms Summary) Size() int { + return ms.orig.Size() +} + // DataPoints returns the DataPoints associated with this Summary. func (ms Summary) DataPoints() SummaryDataPointSlice { return newSummaryDataPointSlice(&ms.orig.DataPoints, ms.state) diff --git a/pdata/pmetric/generated_summarydatapoint.go b/pdata/pmetric/generated_summarydatapoint.go index 0f0f6dd1e99..aa73d93510f 100644 --- a/pdata/pmetric/generated_summarydatapoint.go +++ b/pdata/pmetric/generated_summarydatapoint.go @@ -46,6 +46,10 @@ func (ms SummaryDataPoint) MoveTo(dest SummaryDataPoint) { *ms.orig = otlpmetrics.SummaryDataPoint{} } +func (ms SummaryDataPoint) Size() int { + return ms.orig.Size() +} + // Attributes returns the Attributes associated with this SummaryDataPoint. func (ms SummaryDataPoint) Attributes() pcommon.Map { return pcommon.Map(internal.NewMap(&ms.orig.Attributes, ms.state)) diff --git a/pdata/pmetric/generated_summarydatapointvalueatquantile.go b/pdata/pmetric/generated_summarydatapointvalueatquantile.go index b4e1fe08f7b..c4e739c1506 100644 --- a/pdata/pmetric/generated_summarydatapointvalueatquantile.go +++ b/pdata/pmetric/generated_summarydatapointvalueatquantile.go @@ -45,6 +45,10 @@ func (ms SummaryDataPointValueAtQuantile) MoveTo(dest SummaryDataPointValueAtQua *ms.orig = otlpmetrics.SummaryDataPoint_ValueAtQuantile{} } +func (ms SummaryDataPointValueAtQuantile) Size() int { + return ms.orig.Size() +} + // Quantile returns the quantile associated with this SummaryDataPointValueAtQuantile. func (ms SummaryDataPointValueAtQuantile) Quantile() float64 { return ms.orig.Quantile diff --git a/pdata/pmetric/metrics.go b/pdata/pmetric/metrics.go index 91195ca4dfa..09d653ff70e 100644 --- a/pdata/pmetric/metrics.go +++ b/pdata/pmetric/metrics.go @@ -60,6 +60,10 @@ func (ms Metrics) MetricCount() int { return metricCount } +func (ms Metrics) ByteSize() int { + return ms.getOrig().Size() +} + // DataPointCount calculates the total number of data points. func (ms Metrics) DataPointCount() (dataPointCount int) { rms := ms.ResourceMetrics() diff --git a/pdata/pmetric/pmetricotlp/generated_exportpartialsuccess.go b/pdata/pmetric/pmetricotlp/generated_exportpartialsuccess.go index 60aa6a03e95..e274dd65336 100644 --- a/pdata/pmetric/pmetricotlp/generated_exportpartialsuccess.go +++ b/pdata/pmetric/pmetricotlp/generated_exportpartialsuccess.go @@ -45,6 +45,10 @@ func (ms ExportPartialSuccess) MoveTo(dest ExportPartialSuccess) { *ms.orig = otlpcollectormetrics.ExportMetricsPartialSuccess{} } +func (ms ExportPartialSuccess) Size() int { + return ms.orig.Size() +} + // RejectedDataPoints returns the rejecteddatapoints associated with this ExportPartialSuccess. func (ms ExportPartialSuccess) RejectedDataPoints() int64 { return ms.orig.RejectedDataPoints diff --git a/pdata/pprofile/generated_attribute.go b/pdata/pprofile/generated_attribute.go index 059b89be727..faf10893c49 100644 --- a/pdata/pprofile/generated_attribute.go +++ b/pdata/pprofile/generated_attribute.go @@ -46,6 +46,10 @@ func (ms Attribute) MoveTo(dest Attribute) { *ms.orig = v1.KeyValue{} } +func (ms Attribute) Size() int { + return ms.orig.Size() +} + // Key returns the key associated with this Attribute. func (ms Attribute) Key() string { return ms.orig.Key diff --git a/pdata/pprofile/generated_attributeunit.go b/pdata/pprofile/generated_attributeunit.go index a7d98f425d4..7c035654e36 100644 --- a/pdata/pprofile/generated_attributeunit.go +++ b/pdata/pprofile/generated_attributeunit.go @@ -45,6 +45,10 @@ func (ms AttributeUnit) MoveTo(dest AttributeUnit) { *ms.orig = otlpprofiles.AttributeUnit{} } +func (ms AttributeUnit) Size() int { + return ms.orig.Size() +} + // AttributeKeyStrindex returns the attributekeystrindex associated with this AttributeUnit. func (ms AttributeUnit) AttributeKeyStrindex() int32 { return ms.orig.AttributeKeyStrindex diff --git a/pdata/pprofile/generated_function.go b/pdata/pprofile/generated_function.go index 21fe358de8f..e509249b37f 100644 --- a/pdata/pprofile/generated_function.go +++ b/pdata/pprofile/generated_function.go @@ -45,6 +45,10 @@ func (ms Function) MoveTo(dest Function) { *ms.orig = otlpprofiles.Function{} } +func (ms Function) Size() int { + return ms.orig.Size() +} + // NameStrindex returns the namestrindex associated with this Function. func (ms Function) NameStrindex() int32 { return ms.orig.NameStrindex diff --git a/pdata/pprofile/generated_line.go b/pdata/pprofile/generated_line.go index daae7435237..f340776f33f 100644 --- a/pdata/pprofile/generated_line.go +++ b/pdata/pprofile/generated_line.go @@ -45,6 +45,10 @@ func (ms Line) MoveTo(dest Line) { *ms.orig = otlpprofiles.Line{} } +func (ms Line) Size() int { + return ms.orig.Size() +} + // FunctionIndex returns the functionindex associated with this Line. func (ms Line) FunctionIndex() int32 { return ms.orig.FunctionIndex diff --git a/pdata/pprofile/generated_link.go b/pdata/pprofile/generated_link.go index 22966f15545..5b374bf7a00 100644 --- a/pdata/pprofile/generated_link.go +++ b/pdata/pprofile/generated_link.go @@ -47,6 +47,10 @@ func (ms Link) MoveTo(dest Link) { *ms.orig = otlpprofiles.Link{} } +func (ms Link) Size() int { + return ms.orig.Size() +} + // TraceID returns the traceid associated with this Link. func (ms Link) TraceID() pcommon.TraceID { return pcommon.TraceID(ms.orig.TraceId) diff --git a/pdata/pprofile/generated_location.go b/pdata/pprofile/generated_location.go index 8b968f973b9..5c7e79bd39c 100644 --- a/pdata/pprofile/generated_location.go +++ b/pdata/pprofile/generated_location.go @@ -46,6 +46,10 @@ func (ms Location) MoveTo(dest Location) { *ms.orig = otlpprofiles.Location{} } +func (ms Location) Size() int { + return ms.orig.Size() +} + // MappingIndex returns the mappingindex associated with this Location. func (ms Location) MappingIndex() int32 { return ms.orig.GetMappingIndex() diff --git a/pdata/pprofile/generated_mapping.go b/pdata/pprofile/generated_mapping.go index 65ef57fa4db..5da8bf8499d 100644 --- a/pdata/pprofile/generated_mapping.go +++ b/pdata/pprofile/generated_mapping.go @@ -46,6 +46,10 @@ func (ms Mapping) MoveTo(dest Mapping) { *ms.orig = otlpprofiles.Mapping{} } +func (ms Mapping) Size() int { + return ms.orig.Size() +} + // MemoryStart returns the memorystart associated with this Mapping. func (ms Mapping) MemoryStart() uint64 { return ms.orig.MemoryStart diff --git a/pdata/pprofile/generated_profile.go b/pdata/pprofile/generated_profile.go index 48d11af1e31..230f6e70c87 100644 --- a/pdata/pprofile/generated_profile.go +++ b/pdata/pprofile/generated_profile.go @@ -47,6 +47,10 @@ func (ms Profile) MoveTo(dest Profile) { *ms.orig = otlpprofiles.Profile{} } +func (ms Profile) Size() int { + return ms.orig.Size() +} + // SampleType returns the SampleType associated with this Profile. func (ms Profile) SampleType() ValueTypeSlice { return newValueTypeSlice(&ms.orig.SampleType, ms.state) diff --git a/pdata/pprofile/generated_resourceprofiles.go b/pdata/pprofile/generated_resourceprofiles.go index 649e5bbf33d..11f020736dc 100644 --- a/pdata/pprofile/generated_resourceprofiles.go +++ b/pdata/pprofile/generated_resourceprofiles.go @@ -46,6 +46,10 @@ func (ms ResourceProfiles) MoveTo(dest ResourceProfiles) { *ms.orig = otlpprofiles.ResourceProfiles{} } +func (ms ResourceProfiles) Size() int { + return ms.orig.Size() +} + // Resource returns the resource associated with this ResourceProfiles. func (ms ResourceProfiles) Resource() pcommon.Resource { return pcommon.Resource(internal.NewResource(&ms.orig.Resource, ms.state)) diff --git a/pdata/pprofile/generated_sample.go b/pdata/pprofile/generated_sample.go index c62027753d2..61243ef0e74 100644 --- a/pdata/pprofile/generated_sample.go +++ b/pdata/pprofile/generated_sample.go @@ -46,6 +46,10 @@ func (ms Sample) MoveTo(dest Sample) { *ms.orig = otlpprofiles.Sample{} } +func (ms Sample) Size() int { + return ms.orig.Size() +} + // LocationsStartIndex returns the locationsstartindex associated with this Sample. func (ms Sample) LocationsStartIndex() int32 { return ms.orig.LocationsStartIndex diff --git a/pdata/pprofile/generated_scopeprofiles.go b/pdata/pprofile/generated_scopeprofiles.go index 920b578a5be..cf2dcc35805 100644 --- a/pdata/pprofile/generated_scopeprofiles.go +++ b/pdata/pprofile/generated_scopeprofiles.go @@ -46,6 +46,10 @@ func (ms ScopeProfiles) MoveTo(dest ScopeProfiles) { *ms.orig = otlpprofiles.ScopeProfiles{} } +func (ms ScopeProfiles) Size() int { + return ms.orig.Size() +} + // Scope returns the scope associated with this ScopeProfiles. func (ms ScopeProfiles) Scope() pcommon.InstrumentationScope { return pcommon.InstrumentationScope(internal.NewInstrumentationScope(&ms.orig.Scope, ms.state)) diff --git a/pdata/pprofile/generated_valuetype.go b/pdata/pprofile/generated_valuetype.go index 2afc8614048..95cdb9ceddd 100644 --- a/pdata/pprofile/generated_valuetype.go +++ b/pdata/pprofile/generated_valuetype.go @@ -45,6 +45,10 @@ func (ms ValueType) MoveTo(dest ValueType) { *ms.orig = otlpprofiles.ValueType{} } +func (ms ValueType) Size() int { + return ms.orig.Size() +} + // TypeStrindex returns the typestrindex associated with this ValueType. func (ms ValueType) TypeStrindex() int32 { return ms.orig.TypeStrindex diff --git a/pdata/pprofile/pprofileotlp/generated_exportpartialsuccess.go b/pdata/pprofile/pprofileotlp/generated_exportpartialsuccess.go index 284c10f096a..0deb5f9d2cf 100644 --- a/pdata/pprofile/pprofileotlp/generated_exportpartialsuccess.go +++ b/pdata/pprofile/pprofileotlp/generated_exportpartialsuccess.go @@ -45,6 +45,10 @@ func (ms ExportPartialSuccess) MoveTo(dest ExportPartialSuccess) { *ms.orig = otlpcollectorprofile.ExportProfilesPartialSuccess{} } +func (ms ExportPartialSuccess) Size() int { + return ms.orig.Size() +} + // RejectedProfiles returns the rejectedprofiles associated with this ExportPartialSuccess. func (ms ExportPartialSuccess) RejectedProfiles() int64 { return ms.orig.RejectedProfiles diff --git a/pdata/ptrace/generated_resourcespans.go b/pdata/ptrace/generated_resourcespans.go index fc2ed01dbbc..1db1272abc7 100644 --- a/pdata/ptrace/generated_resourcespans.go +++ b/pdata/ptrace/generated_resourcespans.go @@ -46,6 +46,10 @@ func (ms ResourceSpans) MoveTo(dest ResourceSpans) { *ms.orig = otlptrace.ResourceSpans{} } +func (ms ResourceSpans) Size() int { + return ms.orig.Size() +} + // Resource returns the resource associated with this ResourceSpans. func (ms ResourceSpans) Resource() pcommon.Resource { return pcommon.Resource(internal.NewResource(&ms.orig.Resource, ms.state)) diff --git a/pdata/ptrace/generated_scopespans.go b/pdata/ptrace/generated_scopespans.go index 6ea0d82fd68..13c915a0347 100644 --- a/pdata/ptrace/generated_scopespans.go +++ b/pdata/ptrace/generated_scopespans.go @@ -46,6 +46,10 @@ func (ms ScopeSpans) MoveTo(dest ScopeSpans) { *ms.orig = otlptrace.ScopeSpans{} } +func (ms ScopeSpans) Size() int { + return ms.orig.Size() +} + // Scope returns the scope associated with this ScopeSpans. func (ms ScopeSpans) Scope() pcommon.InstrumentationScope { return pcommon.InstrumentationScope(internal.NewInstrumentationScope(&ms.orig.Scope, ms.state)) diff --git a/pdata/ptrace/generated_span.go b/pdata/ptrace/generated_span.go index 56a701974c4..91147c36970 100644 --- a/pdata/ptrace/generated_span.go +++ b/pdata/ptrace/generated_span.go @@ -48,6 +48,10 @@ func (ms Span) MoveTo(dest Span) { *ms.orig = otlptrace.Span{} } +func (ms Span) Size() int { + return ms.orig.Size() +} + // TraceID returns the traceid associated with this Span. func (ms Span) TraceID() pcommon.TraceID { return pcommon.TraceID(ms.orig.TraceId) diff --git a/pdata/ptrace/generated_spanevent.go b/pdata/ptrace/generated_spanevent.go index a35c88ae93d..3649dca14bd 100644 --- a/pdata/ptrace/generated_spanevent.go +++ b/pdata/ptrace/generated_spanevent.go @@ -47,6 +47,10 @@ func (ms SpanEvent) MoveTo(dest SpanEvent) { *ms.orig = otlptrace.Span_Event{} } +func (ms SpanEvent) Size() int { + return ms.orig.Size() +} + // Timestamp returns the timestamp associated with this SpanEvent. func (ms SpanEvent) Timestamp() pcommon.Timestamp { return pcommon.Timestamp(ms.orig.TimeUnixNano) diff --git a/pdata/ptrace/generated_spanlink.go b/pdata/ptrace/generated_spanlink.go index 3121e263e23..abfa7bb7c24 100644 --- a/pdata/ptrace/generated_spanlink.go +++ b/pdata/ptrace/generated_spanlink.go @@ -49,6 +49,10 @@ func (ms SpanLink) MoveTo(dest SpanLink) { *ms.orig = otlptrace.Span_Link{} } +func (ms SpanLink) Size() int { + return ms.orig.Size() +} + // TraceID returns the traceid associated with this SpanLink. func (ms SpanLink) TraceID() pcommon.TraceID { return pcommon.TraceID(ms.orig.TraceId) diff --git a/pdata/ptrace/generated_status.go b/pdata/ptrace/generated_status.go index 2b3e66a92d0..a5e88de29c2 100644 --- a/pdata/ptrace/generated_status.go +++ b/pdata/ptrace/generated_status.go @@ -46,6 +46,10 @@ func (ms Status) MoveTo(dest Status) { *ms.orig = otlptrace.Status{} } +func (ms Status) Size() int { + return ms.orig.Size() +} + // Code returns the code associated with this Status. func (ms Status) Code() StatusCode { return StatusCode(ms.orig.Code) diff --git a/pdata/ptrace/ptraceotlp/generated_exportpartialsuccess.go b/pdata/ptrace/ptraceotlp/generated_exportpartialsuccess.go index 50cf0c805e8..1a4a23be938 100644 --- a/pdata/ptrace/ptraceotlp/generated_exportpartialsuccess.go +++ b/pdata/ptrace/ptraceotlp/generated_exportpartialsuccess.go @@ -45,6 +45,10 @@ func (ms ExportPartialSuccess) MoveTo(dest ExportPartialSuccess) { *ms.orig = otlpcollectortrace.ExportTracePartialSuccess{} } +func (ms ExportPartialSuccess) Size() int { + return ms.orig.Size() +} + // RejectedSpans returns the rejectedspans associated with this ExportPartialSuccess. func (ms ExportPartialSuccess) RejectedSpans() int64 { return ms.orig.RejectedSpans diff --git a/pdata/ptrace/traces.go b/pdata/ptrace/traces.go index a4b71e17853..421127beca7 100644 --- a/pdata/ptrace/traces.go +++ b/pdata/ptrace/traces.go @@ -54,6 +54,10 @@ func (ms Traces) SpanCount() int { return spanCount } +func (ms Traces) ByteSize() int { + return ms.getOrig().Size() +} + // ResourceSpans returns the ResourceSpansSlice associated with this Metrics. func (ms Traces) ResourceSpans() ResourceSpansSlice { return newResourceSpansSlice(&ms.getOrig().ResourceSpans, internal.GetTracesState(internal.Traces(ms))) From 716bbda4d425cd8b95090a7a6f4b5c6b721cef0c Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Mon, 13 Jan 2025 18:09:34 -0800 Subject: [PATCH 2/2] Benchmarks and optimizations --- exporter/exporterhelper/logs.go | 5 +- exporter/exporterhelper/logs_batch.go | 12 +++- exporter/exporterhelper/logs_batch_test.go | 68 ++++++++++++++++++---- 3 files changed, 70 insertions(+), 15 deletions(-) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index fafb34c94dc..c9e5b43ae18 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -25,8 +25,9 @@ var ( ) type logsRequest struct { - ld plog.Logs - pusher consumer.ConsumeLogsFunc + ld plog.Logs + pusher consumer.ConsumeLogsFunc + byteSize int } func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 3bd7dc472f9..ef22c86ffe3 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -46,12 +46,22 @@ func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeCon continue } - ByteSize := srcReq.ld.ByteSize() + ByteSize := srcReq.byteSize + if ByteSize == 0 { + ByteSize = srcReq.ld.ByteSize() + } + if ByteSize > capacityLeft && capacityLeft < cfg.MaxSizeBytes { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeBytes + } + if ByteSize <= capacityLeft { if destReq == nil { destReq = srcReq } else { srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + destReq.byteSize += ByteSize } capacityLeft -= ByteSize continue diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index 5fe2920034f..7c9312ffc93 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -252,42 +252,86 @@ func TestMergeSplitLogsBasedOnByteSize(t *testing.T) { } } -func BenchmarkSplittingBasedOnItemCountManyLogs(b *testing.B) { - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10} +func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} for i := 0; i < b.N; i++ { - lr1 := &logsRequest{ld: testdata.GenerateLogs(9)} + lr1 := &logsRequest{ld: testdata.GenerateLogs(10)} for j := 0; j < 1000; j++ { - lr2 := &logsRequest{ld: testdata.GenerateLogs(9)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(10)} lr1.MergeSplit(context.Background(), cfg, lr2) } } } -func BenchmarkSplittingBasedOnByteSizeManyLogs(b *testing.B) { - cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010} +func BenchmarkSplittingBasedOnByteSizeManySmallLogs(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010000} for i := 0; i < b.N; i++ { - lr1 := &logsRequest{ld: testdata.GenerateLogs(9)} + lr1 := &logsRequest{ld: testdata.GenerateLogs(10)} for j := 0; j < 1000; j++ { - lr2 := &logsRequest{ld: testdata.GenerateLogs(9)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(10)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(10001)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 960052} // 960052 corresponds to 10000 generated logs + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(10001)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyBelowLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(9999)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9999)} + lr1.MergeSplit(context.Background(), cfg, lr2) + } + } +} + +func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyBelowLimit(b *testing.B) { + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 960052} // 960052 corresponds to 10000 generated logs + for i := 0; i < b.N; i++ { + lr1 := &logsRequest{ld: testdata.GenerateLogs(9999)} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9999)} lr1.MergeSplit(context.Background(), cfg, lr2) } } } func BenchmarkSplittingBasedOnItemCountHugeLog(b *testing.B) { - cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10} + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} for i := 0; i < b.N; i++ { lr1 := &logsRequest{ld: testdata.GenerateLogs(1)} - lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)} // l2 is of size 9.600054 MB lr1.MergeSplit(context.Background(), cfg, lr2) } } func BenchmarkSplittingBasedOnByteSizeHugeLog(b *testing.B) { - cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010} + cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 970000} for i := 0; i < b.N; i++ { lr1 := &logsRequest{ld: testdata.GenerateLogs(1)} - lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)} lr1.MergeSplit(context.Background(), cfg, lr2) } }