Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter] Consolidate merge splitting for the case where maxLimit is set and the case it's not #12104

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 28 additions & 64 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,77 +43,41 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {

qb.currentBatchMu.Lock()

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
}
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || reqList == nil {
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}
if mergeSplitErr != nil || len(reqList) == 0 {
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}

// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
req: reqList[0],
// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if len is 0, get a segfault here?

Copy link
Contributor Author

@sfc-gh-sili sfc-gh-sili Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the exporter.request specification, the result is guaranteed to be non-empty if there is no error.

// marked as not mutable. The length of the returned slice MUST not be 0.

But to be safe, I added a check and return early if length is 0

qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx},
}
} else {
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
}
}

if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
qb.currentBatch = &batch{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx},
}
qb.currentBatchMu.Unlock()
}
}
}()
Expand Down
4 changes: 4 additions & 0 deletions exporter/internal/requesttest/fake_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (r *FakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeC

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
if r2 == nil {
return []internal.Request{r}, nil
}

fr2 := r2.(*FakeRequest)
if fr2.MergeErr != nil {
return nil, fr2.MergeErr
Expand Down