diff --git a/diskqueue.go b/diskqueue.go index ee6c22d..f01ca71 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -47,6 +47,7 @@ func (l LogLevel) String() string { type Interface interface { Put([]byte) error ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel + PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 @@ -91,6 +92,9 @@ type diskQueue struct { // exposed via ReadChan() readChan chan []byte + // exposed via PeekChan() + peekChan chan []byte + // internal channels depthChan chan int64 writeChan chan []byte @@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), + peekChan: make(chan []byte), depthChan: make(chan int64), writeChan: make(chan []byte), writeResponseChan: make(chan error), @@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte { return d.readChan } +func (d *diskQueue) PeekChan() <-chan []byte { + return d.peekChan +} + // Put writes a []byte to the queue func (d *diskQueue) Put(data []byte) error { d.RLock() @@ -648,6 +657,7 @@ func (d *diskQueue) ioLoop() { var err error var count int64 var r chan []byte + var p chan []byte syncTicker := time.NewTicker(d.syncTimeout) @@ -676,13 +686,16 @@ func (d *diskQueue) ioLoop() { } } r = d.readChan + p = d.peekChan } else { r = nil + p = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read + case p <- dataRead: case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed diff --git a/diskqueue_test.go b/diskqueue_test.go index fc72406..9d9ae0b 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -130,6 +130,83 @@ func TestDiskQueueRoll(t *testing.T) { } } +func TestDiskQueuePeek(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := bytes.Repeat([]byte{0}, 10) + ml := int64(len(msg)) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + t.Run("roll", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("peek-read", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("read-peek", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 1; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + +} + func assertFileNotExist(t *testing.T, fn string) { f, err := os.OpenFile(fn, os.O_RDONLY, 0600) Equal(t, (*os.File)(nil), f)