From 2b61a41893a2422320ac20ef5a46c9587dd9ead9 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Fri, 10 Jan 2025 18:33:34 +0200 Subject: [PATCH 1/2] add tests to queue --- utils/queue.go | 2 +- utils/queue_test.go | 162 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 1 deletion(-) diff --git a/utils/queue.go b/utils/queue.go index 2054dec..f2bbfe9 100644 --- a/utils/queue.go +++ b/utils/queue.go @@ -88,7 +88,7 @@ func (q *FDQueue[S, E]) Feed(ctx context.Context) (recs S, err error) { return } recs = append(recs, pkg) - if len(recs) > q.batchSize { + if len(recs) >= q.batchSize { return } } diff --git a/utils/queue_test.go b/utils/queue_test.go index 17e90f4..e8b26b1 100644 --- a/utils/queue_test.go +++ b/utils/queue_test.go @@ -3,6 +3,7 @@ package utils import ( "context" "encoding/binary" + "sync" "testing" "time" @@ -49,3 +50,164 @@ func TestBlockingRecordQueue_Drain(t *testing.T) { _, err2 := queue.Feed(context.Background()) assert.Equal(t, ErrClosed, err2) } + +func TestNewFDQueue(t *testing.T) { + queue := NewFDQueue[[]int, int](10, time.Second, 5) + assert.NotNil(t, queue, "FDQueue creation failed") + assert.Equal(t, 0, queue.Len(), "Expected queue length to be 0") +} + +func TestFDQueue_DrainAndFeed(t *testing.T) { + queue := NewFDQueue[[]int, int](10, time.Second, 5) + ctx := context.Background() + + records := []int{1, 2, 3, 4, 5} + go func() { + err := queue.Drain(ctx, records) + assert.NoError(t, err, "Unexpected error in Drain") + }() + + result, err := queue.Feed(ctx) + assert.NoError(t, err, "Unexpected error in Feed") + assert.Equal(t, len(records), len(result), "Mismatch in record length") + assert.Equal(t, records, result, "Mismatch in records") +} + +func TestFDQueue_Close(t *testing.T) { + queue := NewFDQueue[[]int, int](10, time.Second, 5) + err := queue.Close() + assert.NoError(t, err, "Unexpected error during Close") + + err = queue.Drain(context.Background(), []int{1, 2, 3}) + assert.ErrorIs(t, err, ErrClosed, "Expected ErrClosed after Close") + + _, err = queue.Feed(context.Background()) + assert.ErrorIs(t, err, ErrClosed, "Expected ErrClosed after Close") + + assert.Equal(t, 0, queue.Len(), "Expected queue length to be 0 after close") +} + +func TestFDQueue_ConcurrentDrainAndFeed(t *testing.T) { + queue := NewFDQueue[[]int, int](15, time.Second, 10) + ctx := context.Background() + + records := []int{1, 2, 3, 4, 5} + wg := sync.WaitGroup{} + + // Add 15 elements to the queue in 3 batches + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + err := queue.Drain(ctx, records) + assert.NoError(t, err, "Unexpected error in Drain") + } + }() + + // Fetch batches of elements from the queue + wg.Add(1) + go func() { + defer wg.Done() + // First batch should return 10 elements (batch size of queue) + result, err := queue.Feed(ctx) + assert.NoError(t, err, "Unexpected error in Feed") + assert.Equal(t, 10, len(result), "Expected 10 elements in first batch") + assert.Equal(t, append(records, records...), result, "Mismatch in first batch records") + + // Second batch should return 5 elements (remaining items) + result, err = queue.Feed(ctx) + assert.NoError(t, err, "Unexpected error in Feed") + assert.Equal(t, 5, len(result), "Expected 5 elements in second batch") + assert.Equal(t, records, result, "Mismatch in second batch records") + }() + + wg.Wait() +} + +func TestFDQueue_TimeLimit(t *testing.T) { + queue := NewFDQueue[[]int, int](10, 50*time.Millisecond, 5) + ctx := context.Background() + + go func() { + time.Sleep(100 * time.Millisecond) + queue.Drain(ctx, []int{1, 2, 3}) + }() + + result, err := queue.Feed(ctx) + assert.NoError(t, err, "Unexpected error in Feed") + assert.Equal(t, 0, len(result), "Expected 0 records due to timeout") +} + +func TestFDQueue_DrainStopsWhenContextCancelled(t *testing.T) { + queue := NewFDQueue[[]int, int](10, time.Second, 5) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + records := []int{1, 2, 3, 4, 5} + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + queue.Drain(ctx, records) +} + +func TestFDQueue_FeedStopsWhenContextCancelled(t *testing.T) { + queue := NewFDQueue[[]int, int](10, time.Second, 5) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + recs, _ := queue.Feed(ctx) + assert.Nil(t, recs) +} + +func TestFDQueue_CloseStopsDrainAndFeed(t *testing.T) { + queue := NewFDQueue[[]int, int](10, time.Second, 5) + ctx := context.Background() + + go func() { + time.Sleep(50 * time.Millisecond) + queue.Close() + }() + + records := []int{1, 2, 3} + queue.Drain(ctx, records) + queue.Feed(ctx) +} + +func TestFDQueue_ChannelLimitBlockingBehavior(t *testing.T) { + queue := NewFDQueue[[]int, int](5, time.Second, 5) + ctx := context.Background() + + records := []int{1, 2, 3, 4, 5} + wg := sync.WaitGroup{} + + // Add more elements than the channel limit to test blocking behavior + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + err := queue.Drain(ctx, records) + assert.NoError(t, err, "Unexpected error in Drain") + } + }() + + // Read elements from the queue to allow the above to proceed + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + result, err := queue.Feed(ctx) + assert.NoError(t, err, "Unexpected error in Feed") + assert.Equal(t, len(records), len(result), "Mismatch in record length") + assert.Equal(t, records, result, "Mismatch in records") + } + }() + + wg.Wait() +} From f556d6dea5f547a77669907b7778fa0582fcf515 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Tue, 14 Jan 2025 14:42:47 +0200 Subject: [PATCH 2/2] broadcast queue with capacity in total bytes, instead of number of records --- chotki.go | 33 +++-- protocol/net_test.go | 18 ++- utils/queue.go | 288 +++++++++++++++++++++++++++++++++++-------- utils/queue_test.go | 64 +++++----- 4 files changed, 292 insertions(+), 111 deletions(-) diff --git a/chotki.go b/chotki.go index 2ad8a3f..6e831c1 100644 --- a/chotki.go +++ b/chotki.go @@ -77,15 +77,16 @@ type Options struct { Src uint64 Name string Log1 protocol.Records - MaxLogLen int64 - RelaxedOrder bool Logger utils.Logger - PingPeriod time.Duration - PingWait time.Duration + PingPeriod time.Duration // how often should we ping neighbour replicae if its silent + PingWait time.Duration // how much time we wait until pong received PebbleWriteOptions *pebble.WriteOptions - BroadcastBatchSize int - BroadcastTimeLimit time.Duration - ReadAccumTimeLimit time.Duration + BroadcastQueueMaxSize int // size in bytes, after reaching it all writes will block + BroadcastQueueMinBatchSize int // reads will wait until they have enough data or timelimit expires + // if this limit expires before read has enough data (BroadcastQueueMinBatchSize) it will return whatever it has, + // writes will cause overflow error which will result in queue shutdown and session end + BroadcastQueueTimeLimit time.Duration + ReadAccumTimeLimit time.Duration // ReadMaxBufferSize int ReadMinBufferSizeToProcess int TcpReadBufferSize int @@ -95,10 +96,6 @@ type Options struct { } func (o *Options) SetDefaults() { - if o.MaxLogLen == 0 { - o.MaxLogLen = 1 << 23 - } - if o.PingPeriod == 0 { o.PingPeriod = 30 * time.Second } @@ -118,8 +115,11 @@ func (o *Options) SetDefaults() { o.PebbleWriteOptions = &pebble.WriteOptions{Sync: true} } - if o.BroadcastTimeLimit == 0 { - o.BroadcastTimeLimit = time.Millisecond + if o.BroadcastQueueTimeLimit == 0 { + o.BroadcastQueueTimeLimit = time.Second + } + if o.BroadcastQueueMaxSize == 0 { + o.BroadcastQueueMaxSize = 10 * 1024 * 1024 // 10Mb } if o.ReadAccumTimeLimit == 0 { o.ReadAccumTimeLimit = 5 * time.Second @@ -236,9 +236,8 @@ func Open(dirname string, opts Options) (*Chotki, error) { cho.net = protocol.NewNet(cho.log, func(name string) protocol.FeedDrainCloserTraced { // new connection - const outQueueLimit = 1 << 20 - queue := utils.NewFDQueue[protocol.Records](outQueueLimit, cho.opts.BroadcastTimeLimit, cho.opts.BroadcastBatchSize) + queue := utils.NewFDQueue[protocol.Records](cho.opts.BroadcastQueueMaxSize, cho.opts.BroadcastQueueTimeLimit, cho.opts.BroadcastQueueMinBatchSize) if q, loaded := cho.outq.LoadAndStore(name, queue); loaded && q != nil { cho.log.Warn(fmt.Sprintf("closing the old conn to %s", name)) if err := q.Close(); err != nil { @@ -586,8 +585,8 @@ func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric) { need_to_pass[key] = true } n.chotki.outq.Range(func(key string, value protocol.DrainCloser) bool { - if q, ok := value.(*utils.FDQueue[protocol.Records, []byte]); ok { - m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Len()), key) + if q, ok := value.(*utils.FDQueue[protocol.Records]); ok { + m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Size()), key) nw_collected[key] = struct{}{} need_to_pass[key] = false } diff --git a/protocol/net_test.go b/protocol/net_test.go index 37d2091..2584b70 100644 --- a/protocol/net_test.go +++ b/protocol/net_test.go @@ -64,11 +64,11 @@ func tlsConfig(servername string) *tls.Config { } } -type TracedQueue[S ~[]E, E any] struct { - *utils.FDQueue[S, E] +type TracedQueue[T ~[][]byte] struct { + *utils.FDQueue[T] } -func (t *TracedQueue[S, E]) GetTraceId() string { +func (t *TracedQueue[T]) GetTraceId() string { return "" } @@ -77,28 +77,26 @@ func TestTCPDepot_Connect(t *testing.T) { log := utils.NewDefaultLogger(slog.LevelDebug) - lCon := utils.NewFDQueue[Records](16, time.Millisecond, 0) + lCon := utils.NewFDQueue[Records](1000, time.Minute, 1) l := NewNet(log, func(_ string) FeedDrainCloserTraced { - return &TracedQueue[Records, []byte]{lCon} + return &TracedQueue[Records]{lCon} }, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute}) err := l.Listen(loop) assert.Nil(t, err) - cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0) + cCon := utils.NewFDQueue[Records](1000, time.Minute, 1) c := NewNet(log, func(_ string) FeedDrainCloserTraced { - return &TracedQueue[Records, []byte]{cCon} + return &TracedQueue[Records]{cCon} }, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute}) err = c.Connect(loop) assert.Nil(t, err) - time.Sleep(time.Second) // Wait connection, todo use events // send a record err = cCon.Drain(context.Background(), Records{Record('M', []byte("Hi there"))}) assert.Nil(t, err) - time.Sleep(1000 * time.Millisecond) rec, err := lCon.Feed(context.Background()) assert.Nil(t, err) assert.Greater(t, len(rec), 0) @@ -136,7 +134,7 @@ func TestTCPDepot_ConnectFailed(t *testing.T) { cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0) c := NewNet(log, func(_ string) FeedDrainCloserTraced { - return &TracedQueue[Records, []byte]{cCon} + return &TracedQueue[Records]{cCon} }, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}) err := c.Connect(loop) diff --git a/utils/queue.go b/utils/queue.go index f2bbfe9..abef055 100644 --- a/utils/queue.go +++ b/utils/queue.go @@ -3,92 +3,282 @@ package utils import ( "context" "errors" - "sync" + "sync/atomic" "time" ) -type FDQueue[S ~[]E, E any] struct { - ctx context.Context - close context.CancelFunc - timelimit time.Duration - sync sync.Mutex - ch chan E - active sync.WaitGroup - batchSize int +type accumulator[T ~[][]byte] struct { + data T + size int +} + +type FDQueue[T ~[][]byte] struct { + ctx context.Context + close context.CancelFunc + timelimit time.Duration + batchSize int + accum atomic.Pointer[accumulator[T]] + maxSize int + overflowed atomic.Bool + + readLock chan struct{} + writeLock chan struct{} + syncLock chan struct{} + + writeSignal atomic.Pointer[chan struct{}] + readSignal atomic.Pointer[chan struct{}] } var ErrClosed = errors.New("[chotki] feed/drain queue is closed") +var ErrOverflow = errors.New("[chotki] feed/drain queue is overflowed") -func NewFDQueue[S ~[]E, E any](limit int, timelimit time.Duration, batchSize int) *FDQueue[S, E] { +func NewFDQueue[T ~[][]byte](limit int, timelimit time.Duration, batchSize int) *FDQueue[T] { ctx, cancel := context.WithCancel(context.Background()) - return &FDQueue[S, E]{ + return &FDQueue[T]{ timelimit: timelimit, - ch: make(chan E, limit), ctx: ctx, close: cancel, + maxSize: limit, batchSize: batchSize, + readLock: make(chan struct{}, 1), + writeLock: make(chan struct{}, 1), + syncLock: make(chan struct{}, 1), } } -func (q *FDQueue[S, E]) Close() error { +func (q *FDQueue[T]) Close() error { q.close() - q.active.Wait() - q.sync.Lock() - defer q.sync.Unlock() - if q.ch != nil { - close(q.ch) - } - q.ch = nil + q.accum.Store(nil) return nil } -func (q *FDQueue[S, E]) Len() int { +func (q *FDQueue[T]) Size() int { if q.ctx.Err() != nil { return 0 } - return len(q.ch) + if q.accum.Load() != nil { + return q.accum.Load().size + } + return 0 +} + +type wait int + +const ( + waitTimeout wait = iota + waitOK + waitCanceled +) + +func (q *FDQueue[T]) lock(timer *time.Timer, lock chan struct{}, ctx context.Context) wait { + select { + case lock <- struct{}{}: + return waitOK + case <-q.ctx.Done(): + return waitCanceled + case <-ctx.Done(): + return waitCanceled + case <-timer.C: + return waitTimeout + } } -func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { +func (q *FDQueue[T]) waitSignal(timer *time.Timer, signal chan struct{}, ctx context.Context) wait { + select { + case <-signal: + return waitOK + case <-q.ctx.Done(): + return waitCanceled + case <-ctx.Done(): + return waitCanceled + case <-timer.C: + return waitTimeout + } +} + +func (q *FDQueue[T]) Drain(ctx context.Context, recs T) error { if q.ctx.Err() != nil { return ErrClosed } - q.active.Add(1) - defer q.active.Done() - for _, pkg := range recs { - select { - case <-ctx.Done(): - break - case <-q.ctx.Done(): - break - case q.ch <- pkg: - } + if q.overflowed.Load() { + return ErrOverflow + } + + timer := time.NewTimer(q.timelimit) + defer timer.Stop() + // try aquire write lock, so all writes would be ordered + switch q.lock(timer, q.writeLock, ctx) { + case waitCanceled: + return nil + case waitTimeout: + q.overflowed.Store(true) + return ErrOverflow + case waitOK: + defer func() { <-q.writeLock }() + } + + // try to aquire sync lock, syncronizes reads and writes + switch q.lock(timer, q.syncLock, ctx) { + case waitCanceled: + return nil + case waitTimeout: + q.overflowed.Store(true) + return ErrOverflow + case waitOK: + } + + for len(recs) > 0 { + accumPointer := q.accum.Load() + accum := accumPointer + if accum == nil { + accum = &accumulator[T]{ + data: nil, + size: 0, + } + } + freespace := q.maxSize - accum.size + writeSize := 0 + towrite := 0 + for _, pkg := range recs { + length := len(pkg) + if length <= freespace { + towrite++ + freespace -= length + writeSize += length + } else { + break + } + } + needwait := false + if towrite > 0 { + accum = &accumulator[T]{ + data: append(accum.data, recs[:towrite]...), + size: accum.size + writeSize, + } + if q.accum.CompareAndSwap(accumPointer, accum) { + recs = recs[towrite:] + // if there is a read waiting, signal it to wake up + signal := q.readSignal.Swap(nil) + if signal != nil { + *signal <- struct{}{} + } + if len(recs) > 0 { + needwait = true + } + } + } else { + needwait = true + } + if needwait { + // creating signaling channel, to wait on it + signal := make(chan struct{}, 1) + q.writeSignal.Store(&signal) + // releasing sync lock, to allow reads + <-q.syncLock + // now waiting some read to signal us through write signal + switch q.waitSignal(timer, signal, ctx) { + case waitOK: + // aquiring sync lock again + switch q.lock(timer, q.syncLock, ctx) { + case waitOK: + case waitCanceled: + return nil + case waitTimeout: + q.overflowed.Store(true) + return ErrOverflow + } + case waitCanceled: + return nil + case waitTimeout: + q.overflowed.Store(true) + return ErrOverflow + } + } } + <-q.syncLock return nil } -func (q *FDQueue[S, E]) Feed(ctx context.Context) (recs S, err error) { +func (q *FDQueue[T]) Feed(ctx context.Context) (recs T, err error) { if q.ctx.Err() != nil { return nil, ErrClosed } - q.active.Add(1) - defer q.active.Done() - timelimit := time.After(q.timelimit) + if q.overflowed.Load() { + return nil, ErrOverflow + } + + timer := time.NewTimer(q.timelimit) + defer timer.Stop() + + // try aquire read lock, so all reads would be ordered + switch q.lock(timer, q.readLock, ctx) { + case waitCanceled, waitTimeout: + return + case waitOK: + defer func() { <-q.readLock }() + } + + // try to aquire sync lock, syncronizes reads and writes + switch q.lock(timer, q.syncLock, ctx) { + case waitCanceled, waitTimeout: + return + case waitOK: + } + + payloadSize := 0 for { - select { - case <-q.ctx.Done(): - return - case <-ctx.Done(): - return - case <-timelimit: - return - case pkg, ok := <-q.ch: - if !ok { - return + data := q.accum.Load() + needwait := false + if data != nil { + read := 0 + for _, pkg := range data.data { + recs = append(recs, pkg) + payloadSize += len(pkg) + read++ + if payloadSize >= q.batchSize { + break + } + } + // we have sync lock here so its safe + data.data = data.data[read:] + data.size = data.size - payloadSize + + // if a write waits us, signal it + signal := q.writeSignal.Swap(nil) + if signal != nil { + *signal <- struct{}{} } - recs = append(recs, pkg) - if len(recs) >= q.batchSize { + + // we have enough data, release lock and return + if payloadSize >= q.batchSize { + <-q.syncLock + return recs, nil + } else { + needwait = true + } + } else { + needwait = true + } + if needwait { + // creating signal channel to wait on + signal := make(chan struct{}, 1) + q.readSignal.Store(&signal) + // releasing sync lock, to allow writes + <-q.syncLock + select { + case <-signal: + // aquiring sync lock again + switch q.lock(timer, q.syncLock, ctx) { + case waitCanceled, waitTimeout: + return + case waitOK: + } + case <-q.ctx.Done(): + return + case <-ctx.Done(): + return + case <-timer.C: return } } diff --git a/utils/queue_test.go b/utils/queue_test.go index e8b26b1..80cc0a1 100644 --- a/utils/queue_test.go +++ b/utils/queue_test.go @@ -14,7 +14,7 @@ func TestBlockingRecordQueue_Drain(t *testing.T) { const N = 1 << 10 // 8K const K = 1 << 4 // 16 - queue := NewFDQueue[[][]byte](1024, time.Millisecond, 0) + queue := NewFDQueue[[][]byte](1024, 100*time.Millisecond, 0) for k := 0; k < K; k++ { go func(k int) { @@ -52,20 +52,18 @@ func TestBlockingRecordQueue_Drain(t *testing.T) { } func TestNewFDQueue(t *testing.T) { - queue := NewFDQueue[[]int, int](10, time.Second, 5) + queue := NewFDQueue[[][]byte](10, time.Second, 5) assert.NotNil(t, queue, "FDQueue creation failed") - assert.Equal(t, 0, queue.Len(), "Expected queue length to be 0") + assert.Equal(t, 0, queue.Size(), "Expected queue length to be 0") } func TestFDQueue_DrainAndFeed(t *testing.T) { - queue := NewFDQueue[[]int, int](10, time.Second, 5) + queue := NewFDQueue[[][]byte](10, time.Second, 5) ctx := context.Background() - records := []int{1, 2, 3, 4, 5} - go func() { - err := queue.Drain(ctx, records) - assert.NoError(t, err, "Unexpected error in Drain") - }() + records := [][]byte{{1}, {2}, {3}, {4}, {5}} + err := queue.Drain(ctx, records) + assert.NoError(t, err, "Unexpected error in Drain") result, err := queue.Feed(ctx) assert.NoError(t, err, "Unexpected error in Feed") @@ -74,35 +72,31 @@ func TestFDQueue_DrainAndFeed(t *testing.T) { } func TestFDQueue_Close(t *testing.T) { - queue := NewFDQueue[[]int, int](10, time.Second, 5) + queue := NewFDQueue[[][]byte](10, time.Second, 5) err := queue.Close() assert.NoError(t, err, "Unexpected error during Close") - err = queue.Drain(context.Background(), []int{1, 2, 3}) + err = queue.Drain(context.Background(), [][]byte{{1}, {2}, {3}}) assert.ErrorIs(t, err, ErrClosed, "Expected ErrClosed after Close") _, err = queue.Feed(context.Background()) assert.ErrorIs(t, err, ErrClosed, "Expected ErrClosed after Close") - assert.Equal(t, 0, queue.Len(), "Expected queue length to be 0 after close") + assert.Equal(t, 0, queue.Size(), "Expected queue length to be 0 after close") } func TestFDQueue_ConcurrentDrainAndFeed(t *testing.T) { - queue := NewFDQueue[[]int, int](15, time.Second, 10) + queue := NewFDQueue[[][]byte](15, time.Second, 10) ctx := context.Background() - records := []int{1, 2, 3, 4, 5} + records := [][]byte{{1}, {2}, {3}, {4}, {5}} wg := sync.WaitGroup{} // Add 15 elements to the queue in 3 batches - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 3; i++ { - err := queue.Drain(ctx, records) - assert.NoError(t, err, "Unexpected error in Drain") - } - }() + for i := 0; i < 3; i++ { + err := queue.Drain(ctx, records) + assert.NoError(t, err, "Unexpected error in Drain") + } // Fetch batches of elements from the queue wg.Add(1) @@ -125,12 +119,12 @@ func TestFDQueue_ConcurrentDrainAndFeed(t *testing.T) { } func TestFDQueue_TimeLimit(t *testing.T) { - queue := NewFDQueue[[]int, int](10, 50*time.Millisecond, 5) + queue := NewFDQueue[[][]byte](10, 50*time.Millisecond, 5) ctx := context.Background() go func() { time.Sleep(100 * time.Millisecond) - queue.Drain(ctx, []int{1, 2, 3}) + queue.Drain(ctx, [][]byte{{1}, {2}, {3}}) }() result, err := queue.Feed(ctx) @@ -139,11 +133,11 @@ func TestFDQueue_TimeLimit(t *testing.T) { } func TestFDQueue_DrainStopsWhenContextCancelled(t *testing.T) { - queue := NewFDQueue[[]int, int](10, time.Second, 5) + queue := NewFDQueue[[][]byte](10, time.Second, 5) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - records := []int{1, 2, 3, 4, 5} + records := [][]byte{{1}, {2}, {3}, {4}, {5}} go func() { time.Sleep(50 * time.Millisecond) cancel() @@ -153,7 +147,7 @@ func TestFDQueue_DrainStopsWhenContextCancelled(t *testing.T) { } func TestFDQueue_FeedStopsWhenContextCancelled(t *testing.T) { - queue := NewFDQueue[[]int, int](10, time.Second, 5) + queue := NewFDQueue[[][]byte](10, time.Second, 5) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -167,7 +161,7 @@ func TestFDQueue_FeedStopsWhenContextCancelled(t *testing.T) { } func TestFDQueue_CloseStopsDrainAndFeed(t *testing.T) { - queue := NewFDQueue[[]int, int](10, time.Second, 5) + queue := NewFDQueue[[][]byte](10, time.Second, 5) ctx := context.Background() go func() { @@ -175,16 +169,16 @@ func TestFDQueue_CloseStopsDrainAndFeed(t *testing.T) { queue.Close() }() - records := []int{1, 2, 3} + records := [][]byte{{1}, {2}, {3}} queue.Drain(ctx, records) queue.Feed(ctx) } func TestFDQueue_ChannelLimitBlockingBehavior(t *testing.T) { - queue := NewFDQueue[[]int, int](5, time.Second, 5) + queue := NewFDQueue[[][]byte](5, time.Second, 5) ctx := context.Background() - records := []int{1, 2, 3, 4, 5} + records := [][]byte{{1}, {2}, {3}, {4}, {5}} wg := sync.WaitGroup{} // Add more elements than the channel limit to test blocking behavior @@ -193,7 +187,7 @@ func TestFDQueue_ChannelLimitBlockingBehavior(t *testing.T) { defer wg.Done() for i := 0; i < 10; i++ { err := queue.Drain(ctx, records) - assert.NoError(t, err, "Unexpected error in Drain") + assert.NoError(t, err, "Unexpected error in Drain %d", i) } }() @@ -203,9 +197,9 @@ func TestFDQueue_ChannelLimitBlockingBehavior(t *testing.T) { defer wg.Done() for i := 0; i < 10; i++ { result, err := queue.Feed(ctx) - assert.NoError(t, err, "Unexpected error in Feed") - assert.Equal(t, len(records), len(result), "Mismatch in record length") - assert.Equal(t, records, result, "Mismatch in records") + assert.NoError(t, err, "Unexpected error in Feed %d", i) + assert.Equal(t, len(records), len(result), "Mismatch in record length %d", i) + assert.Equal(t, records, result, "Mismatch in records %d", i) } }()