diff --git a/protocol/rep/rep.go b/protocol/rep/rep.go index ac98e59..8c17f36 100644 --- a/protocol/rep/rep.go +++ b/protocol/rep/rep.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -96,7 +96,7 @@ func (c *context) RecvMsg() (*protocol.Message, error) { expireTime := c.recvExpire s.Unlock() - if expireTime > 0 { + if expireTime != 0 { wq = time.After(expireTime) } @@ -107,10 +107,15 @@ func (c *context) RecvMsg() (*protocol.Message, error) { select { case entry := <-s.recvQ: m, p = entry.m, entry.p - case <-wq: - err = protocol.ErrRecvTimeout - case <-cq: - err = protocol.ErrClosed + default: + select { + case entry := <-s.recvQ: + m, p = entry.m, entry.p + case <-wq: + err = protocol.ErrRecvTimeout + case <-cq: + err = protocol.ErrClosed + } } s.Lock() @@ -144,7 +149,7 @@ func (c *context) SendMsg(m *protocol.Message) error { timeQ := nilQ if bestEffort { timeQ = closedQ - } else if c.sendExpire > 0 { + } else if c.sendExpire != 0 { timeQ = time.After(c.sendExpire) } @@ -154,26 +159,31 @@ func (c *context) SendMsg(m *protocol.Message) error { r.Unlock() select { - case <-cq: - m.Header = nil - return protocol.ErrClosed - case <-p.closeQ: - // Pipe closed, so no way to get it to the recipient. - // Just discard the message. - m.Free() + case p.sendQ <- m: return nil - case <-timeQ: - if bestEffort { - // No way to report to caller, so just discard - // the message. + default: + select { + case <-cq: + m.Header = nil + return protocol.ErrClosed + case <-p.closeQ: + // Pipe closed, so no way to get it to the recipient. + // Just discard the message. m.Free() return nil - } - m.Header = nil - return protocol.ErrSendTimeout + case <-timeQ: + if bestEffort { + // No way to report to caller, so just discard + // the message. + m.Free() + return nil + } + m.Header = nil + return protocol.ErrSendTimeout - case p.sendQ <- m: - return nil + case p.sendQ <- m: + return nil + } } } @@ -228,7 +238,7 @@ func (c *context) SetOption(name string, v interface{}) error { return protocol.ErrBadValue case protocol.OptionSendDeadline: - if val, ok := v.(time.Duration); ok && val > 0 { + if val, ok := v.(time.Duration); ok { c.s.Lock() c.sendExpire = val c.s.Unlock() @@ -237,7 +247,7 @@ func (c *context) SetOption(name string, v interface{}) error { return protocol.ErrBadValue case protocol.OptionRecvDeadline: - if val, ok := v.(time.Duration); ok && val > 0 { + if val, ok := v.(time.Duration); ok { c.s.Lock() c.recvExpire = val c.s.Unlock() diff --git a/protocol/rep/rep_test.go b/protocol/rep/rep_test.go index c2a31c4..a6c6b4f 100644 --- a/protocol/rep/rep_test.go +++ b/protocol/rep/rep_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -121,6 +121,40 @@ func TestRepBestEffortSend(t *testing.T) { MustSucceed(t, p.Close()) } +func TestRepSendNonBlocking(t *testing.T) { + s := GetSocket(t, NewSocket) + p := GetSocket(t, xreq.NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 1)) + MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 1)) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Duration(-1))) + + ConnectPair(t, s, p) + for i := 0; i < 100; i++ { + // We have to make a raw message when using xreq. We + // use xreq because normal req will simply discard + // messages for requests it doesn't have outstanding. + m := mangos.NewMessage(0) + m.Header = make([]byte, 4) + binary.BigEndian.PutUint32(m.Header, uint32(i)|0x80000000) + MustSucceed(t, p.SendMsg(m)) + m = MustRecvMsg(t, s) + start := time.Now() + e := s.SendMsg(m) + if e != nil { + MustBeError(t, e, mangos.ErrSendTimeout) + m.Free() + MustBeTrue(t, time.Since(start) < time.Second) + break + } else if i > 20 { + MustBeError(t, e, mangos.ErrSendTimeout) + } + // NB: We never ask the peer to receive it -- this ensures we + // encounter back-pressure. + } + MustSucceed(t, s.Close()) + MustSucceed(t, p.Close()) +} + // This verifies that closing the socket aborts a blocking send. // We use a context because closing the socket also closes pipes // making it less reproducible. @@ -190,6 +224,23 @@ func TestRepRecvJunk(t *testing.T) { MustSucceed(t, self.Close()) } +func TestRepRecvNonBlocking(t *testing.T) { + s := GetSocket(t, NewSocket) + defer MustClose(t, s) + p := GetSocket(t, req.NewSocket) + defer MustClose(t, p) + + MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1))) + + ConnectPair(t, s, p) + + start := time.Now() + m, e := s.Recv() + MustBeNil(t, m) + MustBeError(t, e, mangos.ErrRecvTimeout) + MustBeTrue(t, time.Since(start) < time.Second) +} + func TestRepDoubleRecv(t *testing.T) { self := GetSocket(t, NewSocket) MustSucceed(t, self.SetOption(mangos.OptionRecvDeadline, time.Second)) @@ -266,7 +317,7 @@ func TestRepPipeRecvCloseSocket(t *testing.T) { // This sets up a bunch of contexts to run in parallel, and verifies that // they all seem to run with no mis-deliveries. -func TestRespondentMultiContexts(t *testing.T) { +func TestRepMultiContexts(t *testing.T) { count := 30 repeat := 20 diff --git a/protocol/req/req.go b/protocol/req/req.go index 668363d..a01c85c 100644 --- a/protocol/req/req.go +++ b/protocol/req/req.go @@ -273,6 +273,8 @@ func (c *context) SendMsg(m *protocol.Message) error { } s.Unlock() }) + } else if c.sendExpire < 0 { + expired = true } s.send() @@ -322,6 +324,10 @@ func (c *context) RecvMsg() (*protocol.Message, error) { } for id == c.reqID && c.repMsg == nil { + if c.recvExpire < 0 { + c.cancel() + return nil, protocol.ErrRecvTimeout + } c.cond.Wait() } diff --git a/protocol/req/req_test.go b/protocol/req/req_test.go index 4a1329f..d7d1af2 100644 --- a/protocol/req/req_test.go +++ b/protocol/req/req_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -78,6 +78,22 @@ func TestReqRecvDeadline(t *testing.T) { MustSucceed(t, peer.Close()) } +func TestReqRecvNonBlocking(t *testing.T) { + self := GetSocket(t, NewSocket) + peer := GetSocket(t, rep.NewSocket) + ConnectPair(t, self, peer) + MustSucceed(t, self.SetOption(mangos.OptionRecvDeadline, time.Duration(-1))) + MustSucceed(t, self.Send([]byte{})) + _ = MustRecv(t, peer) + start := time.Now() + m, e := self.RecvMsg() + MustBeError(t, e, mangos.ErrRecvTimeout) + MustBeTrue(t, time.Since(start) < time.Second) + MustBeNil(t, m) + MustSucceed(t, self.Close()) + MustSucceed(t, peer.Close()) +} + func TestReqContextClosed(t *testing.T) { s := GetSocket(t, NewSocket) c, e := s.OpenContext() @@ -160,8 +176,21 @@ func TestReqBestEffort(t *testing.T) { MustSucceed(t, s.SetOption(mangos.OptionBestEffort, false)) MustBeError(t, s.Send(msg), mangos.ErrSendTimeout) MustBeError(t, s.Send(msg), mangos.ErrSendTimeout) + MustClose(t, s) } +func TestReqSendNonBlocking(t *testing.T) { + timeout := -time.Millisecond + msg := []byte{'0', '1', '2', '3'} + + s := GetSocket(t, NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, timeout)) + MustSucceed(t, s.Listen(AddrTestInp())) + MustBeError(t, s.Send(msg), mangos.ErrSendTimeout) + MustClose(t, s) +} + + // This test demonstrates cancellation before calling receive but after the // message is received causes the original message to be discarded. func TestReqRetry(t *testing.T) { diff --git a/protocol/respondent/respondent.go b/protocol/respondent/respondent.go index cf699cc..d400272 100644 --- a/protocol/respondent/respondent.go +++ b/protocol/respondent/respondent.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -102,6 +102,8 @@ func (c *context) RecvMsg() (*protocol.Message, error) { if expTime > 0 { tq = time.After(expTime) + } else if expTime < 0 { + tq = closedQ } select { @@ -147,6 +149,8 @@ func (c *context) SendMsg(m *protocol.Message) error { tq = closedQ } else if c.sendExpire > 0 { tq = time.After(c.sendExpire) + } else if c.sendExpire < 0 { + tq = closedQ } m.Header = bt @@ -220,7 +224,7 @@ func (c *context) GetOption(name string) (interface{}, error) { func (c *context) SetOption(name string, v interface{}) error { switch name { case protocol.OptionSendDeadline: - if val, ok := v.(time.Duration); ok && val.Nanoseconds() > 0 { + if val, ok := v.(time.Duration); ok { c.s.Lock() c.sendExpire = val c.s.Unlock() @@ -229,7 +233,7 @@ func (c *context) SetOption(name string, v interface{}) error { return protocol.ErrBadValue case protocol.OptionRecvDeadline: - if val, ok := v.(time.Duration); ok && val.Nanoseconds() > 0 { + if val, ok := v.(time.Duration); ok { c.s.Lock() c.recvExpire = val c.s.Unlock() diff --git a/protocol/respondent/respondent_test.go b/protocol/respondent/respondent_test.go index c4653f7..e0e1969 100644 --- a/protocol/respondent/respondent_test.go +++ b/protocol/respondent/respondent_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -264,6 +264,15 @@ func TestRespondentRecvExpire(t *testing.T) { MustSucceed(t, s.Close()) } +func TestRespondentRecvNonBlocking(t *testing.T) { + s := GetSocket(t, NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1))) + v, e := s.RecvMsg() + MustBeError(t, e, mangos.ErrRecvTimeout) + MustBeNil(t, v) + MustSucceed(t, s.Close()) +} + func TestRespondentSendState(t *testing.T) { s := GetSocket(t, NewSocket) MustBeError(t, s.Send([]byte{}), mangos.ErrProtoState) @@ -297,6 +306,43 @@ func TestRespondentBestEffortSend(t *testing.T) { MustSucceed(t, p.Close()) } +func TestRespondentSendNonBlocking(t *testing.T) { + s := GetSocket(t, NewSocket) + p := GetSocket(t, xsurveyor.NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionWriteQLen, 1)) + MustSucceed(t, p.SetOption(mangos.OptionReadQLen, 1)) + MustSucceed(t, s.SetOption(mangos.OptionSendDeadline, time.Duration(-1))) + + ConnectPair(t, s, p) + for i := 0; i < 100; i++ { + // We have to make a raw message when using xsurveyor. We + // use xsurveyor because normal surveyor will simply discard + // messages for surveys it doesn't have outstanding. + m := mangos.NewMessage(0) + m.Header = make([]byte, 4) + binary.BigEndian.PutUint32(m.Header, uint32(i)|0x80000000) + MustSucceed(t, p.SendMsg(m)) + m, e := s.RecvMsg() + MustSucceed(t, e) + MustNotBeNil(t, m) + start := time.Now() + e = s.SendMsg(m) + if e != nil { + MustBeError(t, e, mangos.ErrSendTimeout) + m.Free() + MustBeTrue(t, time.Since(start) < time.Second) + break + } else if i > 20 { + MustBeError(t, e, mangos.ErrSendTimeout) + } + + // NB: We never ask the peer to receive it -- this ensures we + // encounter backpressure. + } + MustSucceed(t, s.Close()) + MustSucceed(t, p.Close()) +} + func TestRespondentSendBackPressure(t *testing.T) { s := GetSocket(t, NewSocket) p := GetSocket(t, xsurveyor.NewSocket) diff --git a/protocol/sub/sub.go b/protocol/sub/sub.go index 1ad267f..754ad71 100644 --- a/protocol/sub/sub.go +++ b/protocol/sub/sub.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -75,26 +75,32 @@ func (c *context) RecvMsg() (*protocol.Message, error) { recvQ := c.recvQ sizeQ := c.sizeQ closeQ := c.closeQ - if c.recvExpire > 0 { + if c.recvExpire != 0 { timeQ = time.After(c.recvExpire) } s.Unlock() for { select { - case <-timeQ: - return nil, protocol.ErrRecvTimeout - case <-closeQ: - return nil, protocol.ErrClosed - case <-sizeQ: - s.Lock() - sizeQ = c.sizeQ - recvQ = c.recvQ - s.Unlock() - continue case m := <-recvQ: m = m.MakeUnique() return m, nil + default: + select { + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case <-closeQ: + return nil, protocol.ErrClosed + case <-sizeQ: + s.Lock() + sizeQ = c.sizeQ + recvQ = c.recvQ + s.Unlock() + continue + case m := <-recvQ: + m = m.MakeUnique() + return m, nil + } } } } diff --git a/protocol/sub/sub_test.go b/protocol/sub/sub_test.go index fb9315e..147ffbe 100644 --- a/protocol/sub/sub_test.go +++ b/protocol/sub/sub_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -232,6 +232,25 @@ func TestSubRecvResizeContinue(t *testing.T) { MustBeTrue(t, pass) } +func TestSubRecvNonBlocking(t *testing.T) { + s := GetSocket(t, NewSocket) + defer MustClose(t, s) + p := GetSocket(t, pub.NewSocket) + defer MustClose(t, p) + + MustSucceed(t, s.SetOption(OptionRecvDeadline, time.Duration(-1))) + MustSucceed(t, s.SetOption(OptionReadQLen, 10)) + MustSucceed(t, s.SetOption(OptionSubscribe, []byte{})) + + ConnectPair(t, s, p) + + start := time.Now() + m, e := s.Recv() + MustBeNil(t, m) + MustBeError(t, e, mangos.ErrRecvTimeout) + MustBeTrue(t, time.Since(start) < time.Second) +} + func TestSubContextOpen(t *testing.T) { s, e := NewSocket() MustSucceed(t, e) diff --git a/protocol/surveyor/surveyor.go b/protocol/surveyor/surveyor.go index 03b8885..9727ef8 100644 --- a/protocol/surveyor/surveyor.go +++ b/protocol/surveyor/surveyor.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -76,8 +76,14 @@ type socket struct { var ( nilQ <-chan time.Time + closedQ chan time.Time ) +func init() { + closedQ = make(chan time.Time) + close(closedQ) +} + const defaultQLen = 128 func (s *survey) cancel(err error) { @@ -170,6 +176,10 @@ func (c *context) RecvMsg() (*protocol.Message, error) { timeq := nilQ if c.recvExpire > 0 { timeq = time.After(c.recvExpire) + } else if c.recvExpire < 0 { + tq := make(chan time.Time) + close(tq) + timeq = tq } s.Unlock() @@ -177,9 +187,6 @@ func (c *context) RecvMsg() (*protocol.Message, error) { return nil, protocol.ErrProtoState } select { - case <-c.closeQ: - return nil, protocol.ErrClosed - case m := <-surv.recvQ: if m == nil { // Sometimes the recvQ can get closed ahead of @@ -187,9 +194,22 @@ func (c *context) RecvMsg() (*protocol.Message, error) { return nil, surv.err } return m, nil + default: + select { + case <-c.closeQ: + return nil, protocol.ErrClosed + + case m := <-surv.recvQ: + if m == nil { + // Sometimes the recvQ can get closed ahead of + // the closeQ, but the closeQ takes precedence. + return nil, surv.err + } + return m, nil - case <-timeq: - return nil, protocol.ErrRecvTimeout + case <-timeq: + return nil, protocol.ErrRecvTimeout + } } } diff --git a/protocol/surveyor/surveyor_test.go b/protocol/surveyor/surveyor_test.go index e74b207..b066131 100644 --- a/protocol/surveyor/surveyor_test.go +++ b/protocol/surveyor/surveyor_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Mangos Authors +// Copyright 2020 The Mangos Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use file except in compliance with the License. @@ -177,6 +177,19 @@ func TestSurveyExpire(t *testing.T) { MustSucceed(t, s.Close()) } +// This test demonstrates that surveys expire on their own. +func TestSurveyRecvNonBlocking(t *testing.T) { + s := GetSocket(t, NewSocket) + MustSucceed(t, s.SetOption(mangos.OptionSurveyTime, time.Millisecond*10)) + MustSucceed(t, s.SetOption(mangos.OptionRecvDeadline, time.Duration(-1))) + MustSucceed(t, s.Send([]byte("first"))) + v, e := s.Recv() + MustBeError(t, e, mangos.ErrRecvTimeout) + MustBeNil(t, v) + MustClose(t, s) +} + + // This test demonstrates that we can keep sending even if the pipes are full. func TestSurveyorBestEffortSend(t *testing.T) { s := GetSocket(t, NewSocket) diff --git a/protocol/xbus/xbus.go b/protocol/xbus/xbus.go index 5c0aa2c..66a3a59 100644 --- a/protocol/xbus/xbus.go +++ b/protocol/xbus/xbus.go @@ -33,6 +33,13 @@ type pipe struct { sendQ chan *protocol.Message } +var closedQ chan time.Time + +func init() { + closedQ = make(chan time.Time) + close(closedQ) +} + type socket struct { closed bool closeQ chan struct{} @@ -109,18 +116,25 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { tq := nilQ if s.recvExpire > 0 { tq = time.After(s.recvExpire) + } else if s.recvExpire < 0 { + tq = closedQ } s.Unlock() select { - case <-cq: - return nil, protocol.ErrClosed - case <-zq: - continue - case <-tq: - return nil, protocol.ErrRecvTimeout case m := <-rq: return m, nil + default: + select { + case <-cq: + return nil, protocol.ErrClosed + case <-zq: + continue + case <-tq: + return nil, protocol.ErrRecvTimeout + case m := <-rq: + return m, nil + } } } } diff --git a/protocol/xpair/xpair.go b/protocol/xpair/xpair.go index 82964de..2952f03 100644 --- a/protocol/xpair/xpair.go +++ b/protocol/xpair/xpair.go @@ -73,7 +73,7 @@ func (s *socket) SendMsg(m *protocol.Message) error { } if s.bestEffort { timeQ = closedQ - } else if s.sendExpire > 0 { + } else if s.sendExpire != 0 { timeQ = time.After(s.sendExpire) } sizeQ := s.sizeQ @@ -82,21 +82,26 @@ func (s *socket) SendMsg(m *protocol.Message) error { s.Unlock() select { - case <-closeQ: - return protocol.ErrClosed - case <-timeQ: - if timeQ == closedQ { + case sendQ <- m: + return nil + default: + select { + case <-closeQ: + return protocol.ErrClosed + case <-timeQ: + if timeQ == closedQ { + m.Free() + return nil + } + return protocol.ErrSendTimeout + + case <-sizeQ: m.Free() return nil - } - return protocol.ErrSendTimeout - case <-sizeQ: - m.Free() - return nil - - case sendQ <- m: - return nil + case sendQ <- m: + return nil + } } } @@ -104,7 +109,7 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { for { timeQ := nilQ s.Lock() - if s.recvExpire > 0 { + if s.recvExpire != 0 { timeQ = time.After(s.recvExpire) } closeQ := s.closeQ @@ -112,13 +117,18 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { sizeQ := s.sizeQ s.Unlock() select { - case <-closeQ: - return nil, protocol.ErrClosed - case <-timeQ: - return nil, protocol.ErrRecvTimeout case m := <-recvQ: return m, nil - case <-sizeQ: + default: + select { + case <-closeQ: + return nil, protocol.ErrClosed + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case m := <-recvQ: + return m, nil + case <-sizeQ: + } } } } diff --git a/protocol/xpull/xpull.go b/protocol/xpull/xpull.go index d58383c..dcde757 100644 --- a/protocol/xpull/xpull.go +++ b/protocol/xpull/xpull.go @@ -65,7 +65,7 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { tq := nilQ for { s.Lock() - if s.recvExpire > 0 { + if s.recvExpire != 0 { tq = time.After(s.recvExpire) } cq := s.closeQ @@ -73,14 +73,19 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { zq := s.sizeQ s.Unlock() select { - case <-cq: - return nil, protocol.ErrClosed - case <-tq: - return nil, protocol.ErrRecvTimeout - case <-zq: - continue case m := <-rq: return m, nil + default: + select { + case <-cq: + return nil, protocol.ErrClosed + case <-tq: + return nil, protocol.ErrRecvTimeout + case <-zq: + continue + case m := <-rq: + return m, nil + } } } } diff --git a/protocol/xpush/xpush.go b/protocol/xpush/xpush.go index c11a9cb..6e24b0d 100644 --- a/protocol/xpush/xpush.go +++ b/protocol/xpush/xpush.go @@ -72,7 +72,7 @@ func (s *socket) SendMsg(m *protocol.Message) error { tq := nilQ if bestEffort { tq = closedQ - } else if s.sendExpire > 0 { + } else if s.sendExpire != 0 { tq = time.After(s.sendExpire) } if s.closed { @@ -83,14 +83,18 @@ func (s *socket) SendMsg(m *protocol.Message) error { select { case s.sendQ <- m: - case <-s.closeQ: - return protocol.ErrClosed - case <-tq: - if bestEffort { - m.Free() - return nil + default: + select { + case s.sendQ <- m: + case <-s.closeQ: + return protocol.ErrClosed + case <-tq: + if bestEffort { + m.Free() + return nil + } + return protocol.ErrSendTimeout } - return protocol.ErrSendTimeout } s.Lock() diff --git a/protocol/xrep/xrep.go b/protocol/xrep/xrep.go index 57ad385..4054b38 100644 --- a/protocol/xrep/xrep.go +++ b/protocol/xrep/xrep.go @@ -98,7 +98,7 @@ func (s *socket) SendMsg(m *protocol.Message) error { } if bestEffort { tq = closedQ - } else if s.sendExpire > 0 { + } else if s.sendExpire != 0 { tq = time.After(s.sendExpire) } s.Unlock() @@ -106,18 +106,23 @@ func (s *socket) SendMsg(m *protocol.Message) error { select { case p.sendQ <- m: return nil - case <-p.closeQ: - // restore the header - m.Header = hdr - return protocol.ErrClosed - case <-tq: - if bestEffort { - m.Free() + default: + select { + case p.sendQ <- m: return nil + case <-p.closeQ: + // restore the header + m.Header = hdr + return protocol.ErrClosed + case <-tq: + if bestEffort { + m.Free() + return nil + } + // restore the header + m.Header = hdr + return protocol.ErrSendTimeout } - // restore the header - m.Header = hdr - return protocol.ErrSendTimeout } } @@ -128,18 +133,24 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { recvQ := s.recvQ sizeQ := s.sizeQ closeQ := s.closeQ - if s.recvExpire > 0 { + if s.recvExpire != 0 { timeQ = time.After(s.recvExpire) } s.Unlock() + select { - case <-closeQ: - return nil, protocol.ErrClosed - case <-timeQ: - return nil, protocol.ErrRecvTimeout case m := <-recvQ: return m, nil - case <-sizeQ: + default: + select { + case <-closeQ: + return nil, protocol.ErrClosed + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case m := <-recvQ: + return m, nil + case <-sizeQ: + } } } } diff --git a/protocol/xreq/xreq.go b/protocol/xreq/xreq.go index 96a4e9d..b46518c 100644 --- a/protocol/xreq/xreq.go +++ b/protocol/xreq/xreq.go @@ -73,7 +73,7 @@ func (s *socket) SendMsg(m *protocol.Message) error { timeQ := nilQ if bestEffort { timeQ = closedQ - } else if s.sendExpire > 0 { + } else if s.sendExpire != 0 { timeQ = time.After(s.sendExpire) } sendQ := s.sendQ @@ -84,17 +84,22 @@ func (s *socket) SendMsg(m *protocol.Message) error { select { case sendQ <- m: return nil - case <-sizeQ: - m.Free() - return nil - case <-closeQ: - return protocol.ErrClosed - case <-timeQ: - if bestEffort { + default: + select { + case sendQ <- m: + return nil + case <-sizeQ: m.Free() return nil + case <-closeQ: + return protocol.ErrClosed + case <-timeQ: + if bestEffort { + m.Free() + return nil + } + return protocol.ErrSendTimeout } - return protocol.ErrSendTimeout } } @@ -102,7 +107,7 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { for { timeQ := nilQ s.Lock() - if s.recvExpire > 0 { + if s.recvExpire != 0 { timeQ = time.After(s.recvExpire) } sizeQ := s.sizeQ @@ -110,14 +115,19 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { closeQ := s.closeQ s.Unlock() select { - case <-closeQ: - return nil, protocol.ErrClosed - case <-timeQ: - return nil, protocol.ErrRecvTimeout case m := <-recvQ: return m, nil - case <-sizeQ: - continue + default: + select { + case <-closeQ: + return nil, protocol.ErrClosed + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case m := <-recvQ: + return m, nil + case <-sizeQ: + continue + } } } } diff --git a/protocol/xrespondent/xrespondent.go b/protocol/xrespondent/xrespondent.go index 995d708..c269444 100644 --- a/protocol/xrespondent/xrespondent.go +++ b/protocol/xrespondent/xrespondent.go @@ -97,7 +97,7 @@ func (s *socket) SendMsg(m *protocol.Message) error { tq := nilQ if bestEffort { tq = closedQ - } else if s.sendExpire > 0 { + } else if s.sendExpire != 0 { tq = time.After(s.sendExpire) } s.Unlock() @@ -105,24 +105,29 @@ func (s *socket) SendMsg(m *protocol.Message) error { select { case p.sendQ <- m: return nil - case <-p.closeQ: - m.Free() - return nil // No way to return the message - case <-tq: - if bestEffort { - m.Free() + default: + select { + case p.sendQ <- m: return nil + case <-p.closeQ: + m.Free() + return nil // No way to return the message + case <-tq: + if bestEffort { + m.Free() + return nil + } + // restore the header + m.Header = hdr + return protocol.ErrSendTimeout } - // restore the header - m.Header = hdr - return protocol.ErrSendTimeout } } func (s *socket) RecvMsg() (*protocol.Message, error) { timeQ := nilQ s.Lock() - if s.recvExpire > 0 { + if s.recvExpire != 0 { timeQ = time.After(s.recvExpire) } recvQ := s.recvQ @@ -132,17 +137,22 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { for { select { - case <-closeQ: - return nil, protocol.ErrClosed - case <-timeQ: - return nil, protocol.ErrRecvTimeout case m := <-recvQ: return m, nil - case <-sizeQ: - s.Lock() - recvQ = s.recvQ - sizeQ = s.sizeQ - s.Unlock() + default: + select { + case <-closeQ: + return nil, protocol.ErrClosed + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case m := <-recvQ: + return m, nil + case <-sizeQ: + s.Lock() + recvQ = s.recvQ + sizeQ = s.sizeQ + s.Unlock() + } } } } diff --git a/protocol/xstar/xstar.go b/protocol/xstar/xstar.go index b84adbb..47511c4 100644 --- a/protocol/xstar/xstar.go +++ b/protocol/xstar/xstar.go @@ -93,17 +93,22 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { // based on socket pipes. tq := nilQ s.Lock() - if s.recvExpire > 0 { + if s.recvExpire != 0 { tq = time.After(s.recvExpire) } s.Unlock() select { - case <-s.closeq: - return nil, protocol.ErrClosed - case <-tq: - return nil, protocol.ErrRecvTimeout case m := <-s.recvq: return m, nil + default: + select { + case <-s.closeq: + return nil, protocol.ErrClosed + case <-tq: + return nil, protocol.ErrRecvTimeout + case m := <-s.recvq: + return m, nil + } } } diff --git a/protocol/xsub/xsub.go b/protocol/xsub/xsub.go index c2bf4a6..685a8c4 100644 --- a/protocol/xsub/xsub.go +++ b/protocol/xsub/xsub.go @@ -63,22 +63,28 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { timeQ := nilQ for { s.Lock() - if s.recvExpire > 0 { + if s.recvExpire != 0 { timeQ = time.After(s.recvExpire) } closeQ := s.closeQ sizeQ := s.sizeQ recvQ := s.recvQ s.Unlock() + select { - case <-closeQ: - return nil, protocol.ErrClosed - case <-timeQ: - return nil, protocol.ErrRecvTimeout - case <-sizeQ: - continue case m := <-recvQ: return m, nil + default: + select { + case <-closeQ: + return nil, protocol.ErrClosed + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case <-sizeQ: + continue + case m := <-recvQ: + return m, nil + } } } } diff --git a/protocol/xsurveyor/xsurveyor.go b/protocol/xsurveyor/xsurveyor.go index be38fc8..50abc61 100644 --- a/protocol/xsurveyor/xsurveyor.go +++ b/protocol/xsurveyor/xsurveyor.go @@ -85,7 +85,7 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { for { s.Lock() timeQ := nilQ - if s.recvExpire > 0 { + if s.recvExpire != 0 { timeQ = time.After(s.recvExpire) } recvQ := s.recvQ @@ -96,12 +96,17 @@ func (s *socket) RecvMsg() (*protocol.Message, error) { select { case m := <-recvQ: return m, nil - case <-closeQ: - return nil, protocol.ErrClosed - case <-timeQ: - return nil, protocol.ErrRecvTimeout - case <-sizeQ: - continue + default: + select { + case m := <-recvQ: + return m, nil + case <-closeQ: + return nil, protocol.ErrClosed + case <-timeQ: + return nil, protocol.ErrRecvTimeout + case <-sizeQ: + continue + } } } }