Skip to content

Commit

Permalink
Merge pull request nsqio#24 from StoneYunZhao/zhaoyun/peek
Browse files Browse the repository at this point in the history
Add PeekChan function
  • Loading branch information
ploxiln authored Oct 17, 2021
2 parents 02dd623 + 2cb4338 commit cc41549
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
13 changes: 13 additions & 0 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (l LogLevel) String() string {
type Interface interface {
Put([]byte) error
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Expand Down Expand Up @@ -91,6 +92,9 @@ type diskQueue struct {
// exposed via ReadChan()
readChan chan []byte

// exposed via PeekChan()
peekChan chan []byte

// internal channels
depthChan chan int64
writeChan chan []byte
Expand All @@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
peekChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
Expand Down Expand Up @@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte {
return d.readChan
}

func (d *diskQueue) PeekChan() <-chan []byte {
return d.peekChan
}

// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
d.RLock()
Expand Down Expand Up @@ -648,6 +657,7 @@ func (d *diskQueue) ioLoop() {
var err error
var count int64
var r chan []byte
var p chan []byte

syncTicker := time.NewTicker(d.syncTimeout)

Expand Down Expand Up @@ -676,13 +686,16 @@ func (d *diskQueue) ioLoop() {
}
}
r = d.readChan
p = d.peekChan
} else {
r = nil
p = nil
}

select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case p <- dataRead:
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
Expand Down
77 changes: 77 additions & 0 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,83 @@ func TestDiskQueueRoll(t *testing.T) {
}
}

func TestDiskQueuePeek(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
msg := bytes.Repeat([]byte{0}, 10)
ml := int64(len(msg))
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

t.Run("roll", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 0; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

t.Run("peek-read", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 0; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

t.Run("read-peek", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 1; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())

Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

}

func assertFileNotExist(t *testing.T, fn string) {
f, err := os.OpenFile(fn, os.O_RDONLY, 0600)
Equal(t, (*os.File)(nil), f)
Expand Down

0 comments on commit cc41549

Please sign in to comment.