From 1384a01c4633fb95297720e60bd9722b71dd9562 Mon Sep 17 00:00:00 2001 From: Vadim Alekseev Date: Tue, 3 Oct 2023 15:21:26 +0300 Subject: [PATCH] Use SourceID when spread events --- pipeline/pipeline.go | 2 +- pipeline/pipeline_test.go | 38 ++++++++++++++++++++++++++++++ pipeline/pipeline_whitebox_test.go | 2 +- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 4da2a6589..cd8b5e56c 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -471,7 +471,7 @@ func (p *Pipeline) streamEvent(event *Event) uint64 { // spread events across all processors if p.useSpread { - streamID = StreamID(event.SeqID % uint64(p.procCount.Load())) + streamID = StreamID(uint64(event.SourceID) % uint64(p.procCount.Load())) } if !p.disableStreams { diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 6dce69506..bcc56f522 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -1,11 +1,14 @@ package pipeline_test import ( + "math/rand" "reflect" + "sync" "testing" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/plugin/input/fake" + "github.com/ozontech/file.d/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -121,3 +124,38 @@ func TestInInvalidMessages(t *testing.T) { }) } } + +func TestPipelineSequenceWhenUseSpread(t *testing.T) { + t.Parallel() + + p, input, output := test.NewPipelineMock(nil, "passive", "parallel") + p.UseSpread() + p.Start() + + const iters = 1000 + + wg := &sync.WaitGroup{} + wg.Add(iters) + + outEvents := make([]string, 0) + mu := sync.Mutex{} + offsetSequence := int64(1) + output.SetOutFn(func(event *pipeline.Event) { + outEvents = append(outEvents, event.Root.EncodeToString()) + mu.Lock() + defer mu.Unlock() + + require.Equal(t, offsetSequence, event.Offset) + offsetSequence++ + + wg.Done() + }) + + sourceID := pipeline.SourceID(rand.Int()) + for offset := int64(1); offset <= iters; offset++ { + input.In(sourceID, "", offset, []byte(`{}`)) + } + + wg.Wait() + p.Stop() +} diff --git a/pipeline/pipeline_whitebox_test.go b/pipeline/pipeline_whitebox_test.go index f5958b63c..98511e0e3 100644 --- a/pipeline/pipeline_whitebox_test.go +++ b/pipeline/pipeline_whitebox_test.go @@ -31,7 +31,7 @@ func TestPipeline_streamEvent(t *testing.T) { p.UseSpread() p.streamEvent(event) - expectedStreamID := StreamID(event.SeqID % uint64(procs)) + expectedStreamID := StreamID(uint64(event.SourceID) % uint64(procs)) assert.Equal(t, event, p.streamer.getStream(expectedStreamID, DefaultStreamName).first) }