Skip to content

Commit

Permalink
Merge branch 'master' into connection
Browse files Browse the repository at this point in the history
  • Loading branch information
meling committed Mar 26, 2024
2 parents 3ded667 + 0c2686d commit ebe8380
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
15 changes: 13 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type request struct {
opts callOptions
}

// waitForSend returns true if the WithNoSendWaiting call option is not set.
func (req request) waitForSend() bool {
return req.opts.callType != nil && !req.opts.noSendWaiting
}

type response struct {
nid uint32
msg protoreflect.ProtoMessage
Expand Down Expand Up @@ -154,9 +159,15 @@ func (c *channel) deleteRouter(msgID uint64) {
}

func (c *channel) sendMsg(req request) (err error) {
// unblock the waiting caller unless noSendWaiting is enabled
defer func() {
if req.opts.callType == E_Multicast || req.opts.callType == E_Unicast && !req.opts.noSendWaiting {
// While the default is to block the caller until the message has been sent, we
// can provide the WithNoSendWaiting call option to more quickly unblock the caller.
// Hence, after sending, we unblock the waiting caller if the call option is not set;
// that is, waitForSend is true. Conversely, if the call option is set, the call type
// will not block on the response channel, and the "receiver" goroutine below will
// eventually clean up the responseRouter map by calling routeResponse.
if req.waitForSend() {
// unblock the caller and clean up the responseRouter map
c.routeResponse(req.msg.Metadata.MessageID, response{})
}
}()
Expand Down
3 changes: 1 addition & 2 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func (c RawConfiguration) Multicast(ctx context.Context, d QuorumCallData, opts

// nodeStream sends an empty reply on replyChan when the message has been sent
// wait until the message has been sent
for sentMsgs > 0 {
for ; sentMsgs > 0; sentMsgs-- {
<-replyChan
sentMsgs--
}
}
7 changes: 4 additions & 3 deletions tests/oneway/oneway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,18 @@ func BenchmarkUnicast(b *testing.B) {
for _, srv := range srvs {
srv.benchmark = true
}
node := cfg.Nodes()[0]
in := &oneway.Request{Num: 0}
b.Run("UnicastSendWaiting__", func(b *testing.B) {
for c := 1; c <= b.N; c++ {
in.Num = uint64(c)
cfg.Nodes()[0].Unicast(context.Background(), in)
node.Unicast(context.Background(), in)
}
})
b.Run("UnicastNoSendWaiting", func(b *testing.B) {
for c := 1; c <= b.N; c++ {
in.Num = uint64(c)
cfg.Nodes()[0].Unicast(context.Background(), in, gorums.WithNoSendWaiting())
node.Unicast(context.Background(), in, gorums.WithNoSendWaiting())
}
})
teardown()
Expand All @@ -263,7 +264,7 @@ func BenchmarkMulticast(b *testing.B) {
b.Run("MulticastNoSendWaiting", func(b *testing.B) {
for c := 1; c <= b.N; c++ {
in.Num = uint64(c)
cfg.Nodes()[0].Unicast(context.Background(), in, gorums.WithNoSendWaiting())
cfg.Multicast(context.Background(), in, gorums.WithNoSendWaiting())
}
})
teardown()
Expand Down

0 comments on commit ebe8380

Please sign in to comment.