Skip to content

Commit

Permalink
proto: Buffer messages when sending in an attempt to avoid data races. (
Browse files Browse the repository at this point in the history
#62)

It might make more sense to use a mutex for this. Not entirely sure
yet.
  • Loading branch information
DeedleFake authored Oct 11, 2019
1 parent 05ab2d0 commit 2e71cfd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
4 changes: 3 additions & 1 deletion proto/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func Size(v interface{}) (uint32, error) {
return e.n, e.err
}

// Write encodes and writes a message to w.
// Write encodes and writes a message to w. It does not perform any
// buffering. It is the caller's responsibility to ensure that
// encoding does not interleave with other usages of w.
func Write(w io.Writer, v interface{}) error {
e := &encoder{w: w}
e.mode = e.write
Expand Down
26 changes: 25 additions & 1 deletion proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
package proto

import (
"bytes"
"errors"
"io"
"reflect"
"sync"

"github.com/DeedleFake/p9/internal/util"
)
Expand All @@ -25,6 +27,12 @@ const (
NoTag uint16 = 0xFFFF
)

var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

// Proto represents a protocol. It maps between message type IDs and
// the Go types that those IDs correspond to.
type Proto struct {
Expand Down Expand Up @@ -110,7 +118,23 @@ func (p Proto) Receive(r io.Reader, msize uint32) (msg interface{}, tag uint16,

// Send writes a message to w with the given tag. It returns any
// errors that occur.
//
// Encoded messages are buffered locally before sending to ensure that
// pieces of a message don't get mixed with another one.
func (p Proto) Send(w io.Writer, tag uint16, msg interface{}) (err error) {
buf := bufPool.Get().(*bytes.Buffer)
defer func() {
if err == nil {
_, err = io.Copy(w, buf)
if err != nil {
err = util.Errorf("send: %w", err)
}
}

buf.Reset()
bufPool.Put(buf)
}()

msgType, ok := p.IDFromType(reflect.Indirect(reflect.ValueOf(msg)).Type())
if !ok {
return util.Errorf("send: invalid message type: %T", msg)
Expand All @@ -121,7 +145,7 @@ func (p Proto) Send(w io.Writer, tag uint16, msg interface{}) (err error) {
return
}

err = Write(w, v)
err = Write(buf, v)
if err != nil {
err = util.Errorf("send %T: %w", msg, err)
}
Expand Down

0 comments on commit 2e71cfd

Please sign in to comment.