From 677b87e3ab5c615bc3f93b8f99bb1fa5be951751 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 4 Feb 2025 13:51:05 -0800 Subject: [PATCH] Fix concurrent issues caused by accessing request after calling Send (#12281) This can happen when items are added to the memory queue and processed by the batcher in the same time with reading the number of items from this code. Though our queue does not return error and still add the item unless the Context is cancelled, but likely because of a bug that does not work as expected, see https://github.com/open-telemetry/opentelemetry-collector/issues/12282 Signed-off-by: Bogdan Drutu --- .chloggen/fix-concurrent-issue.yaml | 25 +++++++++++++++++++ .../exporterhelper/internal/base_exporter.go | 5 +++- exporter/exporterhelper/internal/obs_queue.go | 2 ++ .../internal/obs_report_sender.go | 2 ++ .../exporterhelper/internal/queue_sender.go | 5 +++- 5 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 .chloggen/fix-concurrent-issue.yaml diff --git a/.chloggen/fix-concurrent-issue.yaml b/.chloggen/fix-concurrent-issue.yaml new file mode 100644 index 00000000000..5db06b2f27f --- /dev/null +++ b/.chloggen/fix-concurrent-issue.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix undefined behavior access to request after send to next component. This causes random memory access. + +# One or more tracking issues or pull requests related to the change +issues: [12281] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 494c3aa0c92..2c8da490b5a 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -126,10 +126,13 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe // Send sends the request using the first sender in the chain. func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error { + // Have to read the number of items before sending the request since the request can + // be modified by the downstream components like the batcher. + itemsCount := req.ItemsCount() err := be.firstSender.Send(ctx, req) if err != nil { be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage, - zap.Error(err), zap.Int("rejected_items", req.ItemsCount())) + zap.Error(err), zap.Int("rejected_items", itemsCount)) } return err } diff --git a/exporter/exporterhelper/internal/obs_queue.go b/exporter/exporterhelper/internal/obs_queue.go index 594affd14bb..53cb8031454 100644 --- a/exporter/exporterhelper/internal/obs_queue.go +++ b/exporter/exporterhelper/internal/obs_queue.go @@ -71,6 +71,8 @@ func (or *obsQueue[T]) Shutdown(ctx context.Context) error { } func (or *obsQueue[T]) Offer(ctx context.Context, req T) error { + // Have to read the number of items before sending the request since the request can + // be modified by the downstream components like the batcher. numItems := req.ItemsCount() err := or.Queue.Offer(ctx, req) // No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available. diff --git a/exporter/exporterhelper/internal/obs_report_sender.go b/exporter/exporterhelper/internal/obs_report_sender.go index d33a43e0e0d..439fbdcb08d 100644 --- a/exporter/exporterhelper/internal/obs_report_sender.go +++ b/exporter/exporterhelper/internal/obs_report_sender.go @@ -22,6 +22,8 @@ func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) S } func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error { + // Have to read the number of items before sending the request since the request can + // be modified by the downstream components like the batcher. c := ors.obsrep.StartOp(ctx) items := req.ItemsCount() // Forward the data to the next consumer (this pusher is the next). diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index dbacc5e860e..623746ac4cd 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -78,10 +78,13 @@ func NewQueueSender( next Sender[internal.Request], ) (*QueueSender, error) { exportFunc := func(ctx context.Context, req internal.Request) error { + // Have to read the number of items before sending the request since the request can + // be modified by the downstream components like the batcher. + itemsCount := req.ItemsCount() err := next.Send(ctx, req) if err != nil { qSet.ExporterSettings.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, - zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + zap.Error(err), zap.Int("dropped_items", itemsCount)) } return err }