Skip to content

Commit

Permalink
Fix concurrent issues caused by accessing request after calling Send (#…
Browse files Browse the repository at this point in the history
…12281)

This can happen when items are added to the memory queue and processed
by the batcher in the same time with reading the number of items from
this code.

Though our queue does not return error and still add the item unless the
Context is cancelled, but likely because of a bug that does not work as
expected, see
#12282

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 4, 2025
1 parent 9b352b2 commit 677b87e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 2 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix-concurrent-issue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix undefined behavior access to request after send to next component. This causes random memory access.

# One or more tracking issues or pull requests related to the change
issues: [12281]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 4 additions & 1 deletion exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,13 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe

// Send sends the request using the first sender in the chain.
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
err := be.firstSender.Send(ctx, req)
if err != nil {
be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage,
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
zap.Error(err), zap.Int("rejected_items", itemsCount))
}
return err
}
Expand Down
2 changes: 2 additions & 0 deletions exporter/exporterhelper/internal/obs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (or *obsQueue[T]) Shutdown(ctx context.Context) error {
}

func (or *obsQueue[T]) Offer(ctx context.Context, req T) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
numItems := req.ItemsCount()
err := or.Queue.Offer(ctx, req)
// No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available.
Expand Down
2 changes: 2 additions & 0 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) S
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
c := ors.obsrep.StartOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
Expand Down
5 changes: 4 additions & 1 deletion exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ func NewQueueSender(
next Sender[internal.Request],
) (*QueueSender, error) {
exportFunc := func(ctx context.Context, req internal.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
err := next.Send(ctx, req)
if err != nil {
qSet.ExporterSettings.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
zap.Error(err), zap.Int("dropped_items", itemsCount))
}
return err
}
Expand Down

0 comments on commit 677b87e

Please sign in to comment.