Skip to content

Commit 63ffba6

Browse files
authored
Unix conn (#4)
* factor out serve helper and readSender interface * factor out workerLoop * added unix socket benchmark, test coverage for unix sockets
1 parent ad23b52 commit 63ffba6

File tree

6 files changed

+427
-85
lines changed

6 files changed

+427
-85
lines changed

lolatency_test.go

+41-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
//
1616
// Update 3/11/2017 [briansorahan]
1717
// This benchmarks around 100us on my Dell Latitude E6510 with a single-core Core i7 @ 2.8GHz
18-
func BenchmarkMessageSend(b *testing.B) {
18+
func BenchmarkUDPSend(b *testing.B) {
1919
laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
2020
if err != nil {
2121
b.Fatal(err)
@@ -53,7 +53,7 @@ func BenchmarkMessageSend(b *testing.B) {
5353
}
5454

5555
// Including a single argument does not seem to have much effect on latency [briansorahan].
56-
func BenchmarkMessageSendOneArgument(b *testing.B) {
56+
func BenchmarkUDPSendOneArgument(b *testing.B) {
5757
laddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
5858
if err != nil {
5959
b.Fatal(err)
@@ -93,3 +93,42 @@ func BenchmarkMessageSendOneArgument(b *testing.B) {
9393
<-ch
9494
}
9595
}
96+
97+
func BenchmarkUnixSend(b *testing.B) {
98+
const network = "unixgram"
99+
100+
laddr, err := net.ResolveUnixAddr(network, osc.TempSocket())
101+
if err != nil {
102+
b.Fatal(err)
103+
}
104+
srv, err := osc.ListenUnix("unixgram", laddr)
105+
if err != nil {
106+
b.Fatal(err)
107+
}
108+
raddr, err := net.ResolveUnixAddr(network, srv.LocalAddr().String())
109+
if err != nil {
110+
b.Fatal(err)
111+
}
112+
conn, err := osc.DialUnix(network, nil, raddr)
113+
if err != nil {
114+
b.Fatal(err)
115+
}
116+
var (
117+
ch = make(chan struct{})
118+
val = struct{}{}
119+
)
120+
go srv.Serve(8, osc.Dispatcher{
121+
"/ping": osc.Method(func(m osc.Message) error {
122+
ch <- val
123+
return nil
124+
}),
125+
})
126+
msg := osc.Message{Address: "/ping"}
127+
128+
b.ResetTimer()
129+
130+
for i := 0; i < b.N; i++ {
131+
conn.Send(msg)
132+
<-ch
133+
}
134+
}

osc.go

+89
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ package osc
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/binary"
7+
"net"
8+
"strings"
9+
10+
"github.com/pkg/errors"
611
)
712

813
const (
@@ -91,3 +96,87 @@ func ReadBlob(length int32, data []byte) ([]byte, int64) {
9196
}
9297
return data[:idx], int64(idx)
9398
}
99+
100+
// Incoming represents incoming data.
101+
type Incoming struct {
102+
Data []byte
103+
Sender net.Addr
104+
}
105+
106+
type netWriter interface {
107+
SetWriteBuffer(bytes int) error
108+
WriteTo([]byte, net.Addr) (int, error)
109+
}
110+
111+
var errClosed = errors.New("conn is closed")
112+
113+
func checkDispatcher(dispatcher Dispatcher) error {
114+
if dispatcher == nil {
115+
return ErrNilDispatcher
116+
}
117+
for addr := range dispatcher {
118+
if err := ValidateAddress(addr); err != nil {
119+
return err
120+
}
121+
}
122+
return nil
123+
}
124+
125+
// readSender knows how to read bytes and return the net.Addr
126+
// of the sender of the bytes.
127+
type readSender interface {
128+
CloseChan() <-chan struct{}
129+
Context() context.Context
130+
read([]byte) (int, net.Addr, error)
131+
}
132+
133+
func serve(r readSender, numWorkers int, dispatcher Dispatcher) error {
134+
if err := checkDispatcher(dispatcher); err != nil {
135+
return err
136+
}
137+
var (
138+
errChan = make(chan error)
139+
ready = make(chan Worker, numWorkers)
140+
)
141+
for i := 0; i < numWorkers; i++ {
142+
go Worker{
143+
DataChan: make(chan Incoming),
144+
Dispatcher: dispatcher,
145+
ErrChan: errChan,
146+
Ready: ready,
147+
}.Run()
148+
}
149+
go workerLoop(r, ready, errChan)
150+
151+
// If the connection is closed or the context is canceled then stop serving.
152+
select {
153+
case err := <-errChan:
154+
return errors.Wrap(err, "error serving udp")
155+
case <-r.CloseChan():
156+
case <-r.Context().Done():
157+
return r.Context().Err()
158+
}
159+
return nil
160+
}
161+
162+
func workerLoop(r readSender, ready chan Worker, errChan chan error) {
163+
for {
164+
data := make([]byte, bufSize)
165+
_, sender, err := r.read(data)
166+
if err != nil {
167+
// Tried non-blocking select on closeChan right before ReadFromUDP
168+
// but that didn't stop us from reading a closed connection. [briansorahan]
169+
if strings.Contains(err.Error(), "use of closed network connection") {
170+
return
171+
}
172+
errChan <- err
173+
return
174+
}
175+
176+
// Get the next worker.
177+
worker := <-ready
178+
179+
// Assign them the data we just read.
180+
worker.DataChan <- Incoming{Data: data, Sender: sender}
181+
}
182+
}

udp.go

+23-76
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,22 @@ package osc
33
import (
44
"context"
55
"net"
6-
"strings"
76

87
"github.com/pkg/errors"
98
)
109

1110
// udpConn includes exactly the methods we need from *net.UDPConn
1211
type udpConn interface {
1312
net.Conn
13+
netWriter
1414

1515
ReadFromUDP([]byte) (int, *net.UDPAddr, error)
16-
SetWriteBuffer(bytes int) error
17-
WriteTo([]byte, net.Addr) (int, error)
1816
}
1917

2018
// UDPConn is an OSC connection over UDP.
2119
type UDPConn struct {
2220
udpConn
21+
2322
closeChan chan struct{}
2423
ctx context.Context
2524
errChan chan error
@@ -65,17 +64,34 @@ func ListenUDPContext(ctx context.Context, network string, laddr *net.UDPAddr) (
6564
return uc.initialize()
6665
}
6766

67+
// Close closes the udp conn.
68+
func (conn *UDPConn) Close() error {
69+
close(conn.closeChan)
70+
return conn.udpConn.Close()
71+
}
72+
73+
// CloseChan returns a channel that is closed when the connection gets closed.
74+
func (conn *UDPConn) CloseChan() <-chan struct{} {
75+
return conn.closeChan
76+
}
77+
78+
// Context returns the context associated with the conn.
79+
func (conn *UDPConn) Context() context.Context {
80+
return conn.ctx
81+
}
82+
6883
// initialize initializes a UDP connection.
6984
func (conn *UDPConn) initialize() (*UDPConn, error) {
7085
if err := conn.udpConn.SetWriteBuffer(bufSize); err != nil {
7186
return nil, errors.Wrap(err, "setting write buffer size")
7287
}
7388
return conn, nil
89+
7490
}
7591

76-
// Context returns the context associated with the conn.
77-
func (conn *UDPConn) Context() context.Context {
78-
return conn.ctx
92+
// read reads bytes and returns the net.Addr of the sender.
93+
func (conn *UDPConn) read(data []byte) (int, net.Addr, error) {
94+
return conn.ReadFromUDP(data)
7995
}
8096

8197
// Send sends an OSC message over UDP.
@@ -95,79 +111,10 @@ func (conn *UDPConn) SendTo(addr net.Addr, p Packet) error {
95111
// Note that this means that errors returned from a dispatcher method will kill your server.
96112
// If context.Canceled or context.DeadlineExceeded are encountered they will be returned directly.
97113
func (conn *UDPConn) Serve(numWorkers int, dispatcher Dispatcher) error {
98-
if dispatcher == nil {
99-
return ErrNilDispatcher
100-
}
101-
for addr := range dispatcher {
102-
if err := ValidateAddress(addr); err != nil {
103-
return err
104-
}
105-
}
106-
var (
107-
errChan = make(chan error)
108-
ready = make(chan Worker, numWorkers)
109-
)
110-
for i := 0; i < numWorkers; i++ {
111-
go Worker{
112-
DataChan: make(chan Incoming),
113-
Dispatcher: dispatcher,
114-
ErrChan: errChan,
115-
Ready: ready,
116-
}.Run()
117-
}
118-
go func() {
119-
for {
120-
if err := conn.serve(ready); err != nil {
121-
if err == errClosed {
122-
return
123-
}
124-
errChan <- err
125-
}
126-
}
127-
}()
128-
// If the connection is closed or the context is canceled then stop serving.
129-
select {
130-
case err := <-errChan:
131-
return errors.Wrap(err, "error serving udp")
132-
case <-conn.closeChan:
133-
case <-conn.ctx.Done():
134-
return conn.ctx.Err()
135-
}
136-
return nil
137-
}
138-
139-
// serve retrieves OSC packets.
140-
func (conn *UDPConn) serve(ready <-chan Worker) error {
141-
data := make([]byte, bufSize)
142-
_, sender, err := conn.ReadFromUDP(data)
143-
if err != nil {
144-
// Tried non-blocking select on closeChan right before ReadFromUDP
145-
// but that didn't stop us from reading a closed connection. [briansorahan]
146-
if strings.Contains(err.Error(), "use of closed network connection") {
147-
return errClosed
148-
}
149-
return err
150-
}
151-
worker := <-ready
152-
worker.DataChan <- Incoming{Data: data, Sender: sender}
153-
return nil
114+
return serve(conn, numWorkers, dispatcher)
154115
}
155116

156117
// SetContext sets the context associated with the conn.
157118
func (conn *UDPConn) SetContext(ctx context.Context) {
158119
conn.ctx = ctx
159120
}
160-
161-
// Close closes the udp conn.
162-
func (conn *UDPConn) Close() error {
163-
close(conn.closeChan)
164-
return conn.udpConn.Close()
165-
}
166-
167-
// Incoming represents incoming data.
168-
type Incoming struct {
169-
Data []byte
170-
Sender net.Addr
171-
}
172-
173-
var errClosed = errors.New("conn is closed")

udp_test.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestDialUDP(t *testing.T) {
3737
}
3838

3939
func TestDialUDPSetWriteBufferError(t *testing.T) {
40-
uc := &UDPConn{udpConn: errConn{}}
40+
uc := &UDPConn{udpConn: errUDPConn{}}
4141
_, err := uc.initialize()
4242
if err == nil {
4343
t.Fatal("expected error, got nil")
@@ -101,7 +101,10 @@ func testUDPServer(t *testing.T, dispatcher Dispatcher) (*UDPConn, *UDPConn, cha
101101
errChan := make(chan error)
102102

103103
go func() {
104-
errChan <- server.Serve(1, dispatcher)
104+
if err := server.Serve(1, dispatcher); err != nil {
105+
errChan <- err
106+
}
107+
close(errChan)
105108
}()
106109

107110
raddr, err := net.ResolveUDPAddr("udp", server.LocalAddr().String())
@@ -125,16 +128,16 @@ func TestUDPConnSend_OK(t *testing.T) {
125128
}
126129
}
127130

128-
// errConn is an implementation of the udpConn interface that returns errors from all it's methods.
129-
type errConn struct {
131+
// errUDPConn is an implementation of the udpConn interface that returns errors from all it's methods.
132+
type errUDPConn struct {
130133
udpConn
131134
}
132135

133-
func (e errConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) {
136+
func (e errUDPConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) {
134137
return 0, nil, errors.New("oops")
135138
}
136139

137-
func (e errConn) SetWriteBuffer(bytes int) error {
140+
func (e errUDPConn) SetWriteBuffer(bytes int) error {
138141
return errors.New("derp")
139142
}
140143

@@ -176,7 +179,7 @@ func TestUDPConnServe_ReadError(t *testing.T) {
176179
t.Fatal(err)
177180
}
178181
server := &UDPConn{
179-
udpConn: errConn{udpConn: serverConn},
182+
udpConn: errUDPConn{udpConn: serverConn},
180183
ctx: context.Background(),
181184
}
182185
go func() {

0 commit comments

Comments
 (0)