Skip to content

Commit

Permalink
Use SourceID when spread events
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Oct 3, 2023
1 parent 1b30680 commit 1384a01
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (p *Pipeline) streamEvent(event *Event) uint64 {

// spread events across all processors
if p.useSpread {
streamID = StreamID(event.SeqID % uint64(p.procCount.Load()))
streamID = StreamID(uint64(event.SourceID) % uint64(p.procCount.Load()))
}

if !p.disableStreams {
Expand Down
38 changes: 38 additions & 0 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package pipeline_test

import (
"math/rand"
"reflect"
"sync"
"testing"

"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/input/fake"
"github.com/ozontech/file.d/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -121,3 +124,38 @@ func TestInInvalidMessages(t *testing.T) {
})
}
}

func TestPipelineSequenceWhenUseSpread(t *testing.T) {
t.Parallel()

p, input, output := test.NewPipelineMock(nil, "passive", "parallel")
p.UseSpread()
p.Start()

const iters = 1000

wg := &sync.WaitGroup{}
wg.Add(iters)

outEvents := make([]string, 0)
mu := sync.Mutex{}
offsetSequence := int64(1)
output.SetOutFn(func(event *pipeline.Event) {
outEvents = append(outEvents, event.Root.EncodeToString())
mu.Lock()
defer mu.Unlock()

require.Equal(t, offsetSequence, event.Offset)
offsetSequence++

wg.Done()
})

sourceID := pipeline.SourceID(rand.Int())
for offset := int64(1); offset <= iters; offset++ {
input.In(sourceID, "", offset, []byte(`{}`))
}

wg.Wait()
p.Stop()
}
2 changes: 1 addition & 1 deletion pipeline/pipeline_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestPipeline_streamEvent(t *testing.T) {
p.UseSpread()
p.streamEvent(event)

expectedStreamID := StreamID(event.SeqID % uint64(procs))
expectedStreamID := StreamID(uint64(event.SourceID) % uint64(procs))

assert.Equal(t, event, p.streamer.getStream(expectedStreamID, DefaultStreamName).first)
}
Expand Down

0 comments on commit 1384a01

Please sign in to comment.