Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #198 negative send/receive deadlines do not work #216

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions protocol/rep/rep.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -96,7 +96,7 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
expireTime := c.recvExpire
s.Unlock()

if expireTime > 0 {
if expireTime != 0 {
wq = time.After(expireTime)
}

Expand All @@ -107,10 +107,15 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
select {
case entry := <-s.recvQ:
m, p = entry.m, entry.p
case <-wq:
err = protocol.ErrRecvTimeout
case <-cq:
err = protocol.ErrClosed
default:
select {
case entry := <-s.recvQ:
m, p = entry.m, entry.p
case <-wq:
err = protocol.ErrRecvTimeout
case <-cq:
err = protocol.ErrClosed
}
}

s.Lock()
Expand Down Expand Up @@ -144,7 +149,7 @@ func (c *context) SendMsg(m *protocol.Message) error {
timeQ := nilQ
if bestEffort {
timeQ = closedQ
} else if c.sendExpire > 0 {
} else if c.sendExpire != 0 {
timeQ = time.After(c.sendExpire)
}

Expand All @@ -154,26 +159,31 @@ func (c *context) SendMsg(m *protocol.Message) error {
r.Unlock()

select {
case <-cq:
m.Header = nil
return protocol.ErrClosed
case <-p.closeQ:
// Pipe closed, so no way to get it to the recipient.
// Just discard the message.
m.Free()
case p.sendQ <- m:
return nil
case <-timeQ:
if bestEffort {
// No way to report to caller, so just discard
// the message.
default:
select {
case <-cq:
m.Header = nil
return protocol.ErrClosed
case <-p.closeQ:
// Pipe closed, so no way to get it to the recipient.
// Just discard the message.
m.Free()
return nil
}
m.Header = nil
return protocol.ErrSendTimeout
case <-timeQ:
if bestEffort {
// No way to report to caller, so just discard
// the message.
m.Free()
return nil
}
m.Header = nil
return protocol.ErrSendTimeout

case p.sendQ <- m:
return nil
case p.sendQ <- m:
return nil
}
}
}

Expand Down Expand Up @@ -228,7 +238,7 @@ func (c *context) SetOption(name string, v interface{}) error {
return protocol.ErrBadValue

case protocol.OptionSendDeadline:
if val, ok := v.(time.Duration); ok && val > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.sendExpire = val
c.s.Unlock()
Expand All @@ -237,7 +247,7 @@ func (c *context) SetOption(name string, v interface{}) error {
return protocol.ErrBadValue

case protocol.OptionRecvDeadline:
if val, ok := v.(time.Duration); ok && val > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.recvExpire = val
c.s.Unlock()
Expand Down
55 changes: 53 additions & 2 deletions protocol/rep/rep_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -121,6 +121,40 @@ func TestRepBestEffortSend(t *testing.T) {
MustSucceed(t, p.Close())
}

func TestRepSendNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
p := GetSocket(t, xreq.NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 1))
MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 1))
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Duration(-1)))

ConnectPair(t, s, p)
for i := 0; i < 100; i++ {
// We have to make a raw message when using xreq. We
// use xreq because normal req will simply discard
// messages for requests it doesn't have outstanding.
m := mangos.NewMessage(0)
m.Header = make([]byte, 4)
binary.BigEndian.PutUint32(m.Header, uint32(i)|0x80000000)
MustSucceed(t, p.SendMsg(m))
m = MustRecvMsg(t, s)
start := time.Now()
e := s.SendMsg(m)
if e != nil {
MustBeError(t, e, mangos.ErrSendTimeout)
m.Free()
MustBeTrue(t, time.Since(start) < time.Second)
break
} else if i > 20 {
MustBeError(t, e, mangos.ErrSendTimeout)
}
// NB: We never ask the peer to receive it -- this ensures we
// encounter back-pressure.
}
MustSucceed(t, s.Close())
MustSucceed(t, p.Close())
}

// This verifies that closing the socket aborts a blocking send.
// We use a context because closing the socket also closes pipes
// making it less reproducible.
Expand Down Expand Up @@ -190,6 +224,23 @@ func TestRepRecvJunk(t *testing.T) {
MustSucceed(t, self.Close())
}

func TestRepRecvNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
defer MustClose(t, s)
p := GetSocket(t, req.NewSocket)
defer MustClose(t, p)

MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1)))

ConnectPair(t, s, p)

start := time.Now()
m, e := s.Recv()
MustBeNil(t, m)
MustBeError(t, e, mangos.ErrRecvTimeout)
MustBeTrue(t, time.Since(start) < time.Second)
}

func TestRepDoubleRecv(t *testing.T) {
self := GetSocket(t, NewSocket)
MustSucceed(t, self.SetOption(mangos.OptionRecvDeadline, time.Second))
Expand Down Expand Up @@ -266,7 +317,7 @@ func TestRepPipeRecvCloseSocket(t *testing.T) {

// This sets up a bunch of contexts to run in parallel, and verifies that
// they all seem to run with no mis-deliveries.
func TestRespondentMultiContexts(t *testing.T) {
func TestRepMultiContexts(t *testing.T) {
count := 30
repeat := 20

Expand Down
6 changes: 6 additions & 0 deletions protocol/req/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func (c *context) SendMsg(m *protocol.Message) error {
}
s.Unlock()
})
} else if c.sendExpire < 0 {
expired = true
}

s.send()
Expand Down Expand Up @@ -322,6 +324,10 @@ func (c *context) RecvMsg() (*protocol.Message, error) {
}

for id == c.reqID && c.repMsg == nil {
if c.recvExpire < 0 {
c.cancel()
return nil, protocol.ErrRecvTimeout
}
c.cond.Wait()
}

Expand Down
31 changes: 30 additions & 1 deletion protocol/req/req_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -78,6 +78,22 @@ func TestReqRecvDeadline(t *testing.T) {
MustSucceed(t, peer.Close())
}

func TestReqRecvNonBlocking(t *testing.T) {
self := GetSocket(t, NewSocket)
peer := GetSocket(t, rep.NewSocket)
ConnectPair(t, self, peer)
MustSucceed(t, self.SetOption(mangos.OptionRecvDeadline, time.Duration(-1)))
MustSucceed(t, self.Send([]byte{}))
_ = MustRecv(t, peer)
start := time.Now()
m, e := self.RecvMsg()
MustBeError(t, e, mangos.ErrRecvTimeout)
MustBeTrue(t, time.Since(start) < time.Second)
MustBeNil(t, m)
MustSucceed(t, self.Close())
MustSucceed(t, peer.Close())
}

func TestReqContextClosed(t *testing.T) {
s := GetSocket(t, NewSocket)
c, e := s.OpenContext()
Expand Down Expand Up @@ -160,8 +176,21 @@ func TestReqBestEffort(t *testing.T) {
MustSucceed(t, s.SetOption(mangos.OptionBestEffort, false))
MustBeError(t, s.Send(msg), mangos.ErrSendTimeout)
MustBeError(t, s.Send(msg), mangos.ErrSendTimeout)
MustClose(t, s)
}

func TestReqSendNonBlocking(t *testing.T) {
timeout := -time.Millisecond
msg := []byte{'0', '1', '2', '3'}

s := GetSocket(t, NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, timeout))
MustSucceed(t, s.Listen(AddrTestInp()))
MustBeError(t, s.Send(msg), mangos.ErrSendTimeout)
MustClose(t, s)
}


// This test demonstrates cancellation before calling receive but after the
// message is received causes the original message to be discarded.
func TestReqRetry(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions protocol/respondent/respondent.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -102,6 +102,8 @@ func (c *context) RecvMsg() (*protocol.Message, error) {

if expTime > 0 {
tq = time.After(expTime)
} else if expTime < 0 {
tq = closedQ
}

select {
Expand Down Expand Up @@ -147,6 +149,8 @@ func (c *context) SendMsg(m *protocol.Message) error {
tq = closedQ
} else if c.sendExpire > 0 {
tq = time.After(c.sendExpire)
} else if c.sendExpire < 0 {
tq = closedQ
}

m.Header = bt
Expand Down Expand Up @@ -220,7 +224,7 @@ func (c *context) GetOption(name string) (interface{}, error) {
func (c *context) SetOption(name string, v interface{}) error {
switch name {
case protocol.OptionSendDeadline:
if val, ok := v.(time.Duration); ok && val.Nanoseconds() > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.sendExpire = val
c.s.Unlock()
Expand All @@ -229,7 +233,7 @@ func (c *context) SetOption(name string, v interface{}) error {
return protocol.ErrBadValue

case protocol.OptionRecvDeadline:
if val, ok := v.(time.Duration); ok && val.Nanoseconds() > 0 {
if val, ok := v.(time.Duration); ok {
c.s.Lock()
c.recvExpire = val
c.s.Unlock()
Expand Down
48 changes: 47 additions & 1 deletion protocol/respondent/respondent_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Mangos Authors
// Copyright 2020 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -264,6 +264,15 @@ func TestRespondentRecvExpire(t *testing.T) {
MustSucceed(t, s.Close())
}

func TestRespondentRecvNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1)))
v, e := s.RecvMsg()
MustBeError(t, e, mangos.ErrRecvTimeout)
MustBeNil(t, v)
MustSucceed(t, s.Close())
}

func TestRespondentSendState(t *testing.T) {
s := GetSocket(t, NewSocket)
MustBeError(t, s.Send([]byte{}), mangos.ErrProtoState)
Expand Down Expand Up @@ -297,6 +306,43 @@ func TestRespondentBestEffortSend(t *testing.T) {
MustSucceed(t, p.Close())
}

func TestRespondentSendNonBlocking(t *testing.T) {
s := GetSocket(t, NewSocket)
p := GetSocket(t, xsurveyor.NewSocket)
MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 1))
MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 1))
MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Duration(-1)))

ConnectPair(t, s, p)
for i := 0; i < 100; i++ {
// We have to make a raw message when using xsurveyor. We
// use xsurveyor because normal surveyor will simply discard
// messages for surveys it doesn't have outstanding.
m := mangos.NewMessage(0)
m.Header = make([]byte, 4)
binary.BigEndian.PutUint32(m.Header, uint32(i)|0x80000000)
MustSucceed(t, p.SendMsg(m))
m, e := s.RecvMsg()
MustSucceed(t, e)
MustNotBeNil(t, m)
start := time.Now()
e = s.SendMsg(m)
if e != nil {
MustBeError(t, e, mangos.ErrSendTimeout)
m.Free()
MustBeTrue(t, time.Since(start) < time.Second)
break
} else if i > 20 {
MustBeError(t, e, mangos.ErrSendTimeout)
}

// NB: We never ask the peer to receive it -- this ensures we
// encounter backpressure.
}
MustSucceed(t, s.Close())
MustSucceed(t, p.Close())
}

func TestRespondentSendBackPressure(t *testing.T) {
s := GetSocket(t, NewSocket)
p := GetSocket(t, xsurveyor.NewSocket)
Expand Down
Loading