Skip to content

Commit ad23b52

Browse files
authored
Workers (#3)
* use worker pool to serve * workers have made the benchmark slower somehow * avoid returning an error when reading from a closed connection
1 parent b7dcece commit ad23b52

6 files changed

+94
-46
lines changed

conn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type Conn interface {
2424
net.Conn
2525

2626
Context() context.Context
27-
Serve(Dispatcher) error
27+
Serve(int, Dispatcher) error
2828
Send(Packet) error
2929
SendTo(net.Addr, Packet) error
3030
}

lolatency_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func BenchmarkMessageSend(b *testing.B) {
3636
ch = make(chan struct{})
3737
val = struct{}{}
3838
)
39-
go srv.Serve(osc.Dispatcher{
39+
go srv.Serve(8, osc.Dispatcher{
4040
"/ping": osc.Method(func(m osc.Message) error {
4141
ch <- val
4242
return nil
@@ -74,7 +74,7 @@ func BenchmarkMessageSendOneArgument(b *testing.B) {
7474
ch = make(chan struct{})
7575
val = struct{}{}
7676
)
77-
go srv.Serve(osc.Dispatcher{
77+
go srv.Serve(1, osc.Dispatcher{
7878
"/ping": osc.Method(func(m osc.Message) error {
7979
if _, err := m.Arguments[0].ReadInt32(); err != nil {
8080
return err

udp.go

+38-35
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package osc
33
import (
44
"context"
55
"net"
6+
"strings"
67

78
"github.com/pkg/errors"
89
)
@@ -93,25 +94,37 @@ func (conn *UDPConn) SendTo(addr net.Addr, p Packet) error {
9394
// Any errors returned from a dispatched method will be returned.
9495
// Note that this means that errors returned from a dispatcher method will kill your server.
9596
// If context.Canceled or context.DeadlineExceeded are encountered they will be returned directly.
96-
func (conn *UDPConn) Serve(dispatcher Dispatcher) error {
97+
func (conn *UDPConn) Serve(numWorkers int, dispatcher Dispatcher) error {
9798
if dispatcher == nil {
9899
return ErrNilDispatcher
99100
}
100-
101101
for addr := range dispatcher {
102102
if err := ValidateAddress(addr); err != nil {
103103
return err
104104
}
105105
}
106-
107-
errChan := make(chan error)
108-
106+
var (
107+
errChan = make(chan error)
108+
ready = make(chan Worker, numWorkers)
109+
)
110+
for i := 0; i < numWorkers; i++ {
111+
go Worker{
112+
DataChan: make(chan Incoming),
113+
Dispatcher: dispatcher,
114+
ErrChan: errChan,
115+
Ready: ready,
116+
}.Run()
117+
}
109118
go func() {
110119
for {
111-
conn.serve(dispatcher, errChan)
120+
if err := conn.serve(ready); err != nil {
121+
if err == errClosed {
122+
return
123+
}
124+
errChan <- err
125+
}
112126
}
113127
}()
114-
115128
// If the connection is closed or the context is canceled then stop serving.
116129
select {
117130
case err := <-errChan:
@@ -124,38 +137,20 @@ func (conn *UDPConn) Serve(dispatcher Dispatcher) error {
124137
}
125138

126139
// serve retrieves OSC packets.
127-
func (conn *UDPConn) serve(dispatcher Dispatcher, errChan chan error) {
140+
func (conn *UDPConn) serve(ready <-chan Worker) error {
128141
data := make([]byte, bufSize)
129-
130142
_, sender, err := conn.ReadFromUDP(data)
131143
if err != nil {
132-
errChan <- err
133-
}
134-
135-
switch data[0] {
136-
case BundleTag[0]:
137-
go func() {
138-
bundle, err := ParseBundle(data, sender)
139-
if err != nil {
140-
errChan <- err
141-
}
142-
if err := dispatcher.Dispatch(bundle); err != nil {
143-
errChan <- errors.Wrap(err, "dispatch bundle")
144-
}
145-
}()
146-
case MessageChar:
147-
go func() {
148-
msg, err := ParseMessage(data, sender)
149-
if err != nil {
150-
errChan <- err
151-
}
152-
if err := dispatcher.Invoke(msg); err != nil {
153-
errChan <- errors.Wrap(err, "dispatch message")
154-
}
155-
}()
156-
default:
157-
errChan <- ErrParse
144+
// Tried non-blocking select on closeChan right before ReadFromUDP
145+
// but that didn't stop us from reading a closed connection. [briansorahan]
146+
if strings.Contains(err.Error(), "use of closed network connection") {
147+
return errClosed
148+
}
149+
return err
158150
}
151+
worker := <-ready
152+
worker.DataChan <- Incoming{Data: data, Sender: sender}
153+
return nil
159154
}
160155

161156
// SetContext sets the context associated with the conn.
@@ -168,3 +163,11 @@ func (conn *UDPConn) Close() error {
168163
close(conn.closeChan)
169164
return conn.udpConn.Close()
170165
}
166+
167+
// Incoming represents incoming data.
168+
type Incoming struct {
169+
Data []byte
170+
Sender net.Addr
171+
}
172+
173+
var errClosed = errors.New("conn is closed")

udp_example_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func ExampleUDPConn_pingpong() {
6666
}
6767

6868
func serverDispatch(server *UDPConn, errChan chan error) {
69-
errChan <- server.Serve(Dispatcher{
69+
errChan <- server.Serve(1, Dispatcher{
7070
"/ping": Method(func(msg Message) error {
7171
fmt.Println("Server received ping.")
7272
return server.SendTo(msg.Sender, Message{Address: "/pong"})
@@ -83,7 +83,7 @@ func serverDispatch(server *UDPConn, errChan chan error) {
8383
}
8484

8585
func clientDispatch(client *UDPConn, errChan chan error, pongChan chan struct{}, closeChan chan struct{}) {
86-
errChan <- client.Serve(Dispatcher{
86+
errChan <- client.Serve(1, Dispatcher{
8787
"/pong": Method(func(msg Message) error {
8888
fmt.Println("Client received pong.")
8989
close(pongChan)

udp_test.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestInvalidAddress(t *testing.T) {
2121
}
2222
defer func() { _ = server.Close() }() // Best effort.
2323

24-
if err := server.Serve(Dispatcher{
24+
if err := server.Serve(1, Dispatcher{
2525
"/[": Method(func(msg Message) error {
2626
return nil
2727
}),
@@ -72,7 +72,7 @@ func TestDialUDPContext(t *testing.T) {
7272
if c.Context() != ctxTimeout {
7373
t.Fatalf("expected %+v to be %+v", ctxTimeout, c.Context())
7474
}
75-
if err := c.Serve(Dispatcher{}); err != context.DeadlineExceeded {
75+
if err := c.Serve(1, Dispatcher{}); err != context.DeadlineExceeded {
7676
t.Fatalf("expected context.DeadlineExceeded, got %+v", err)
7777
}
7878
}
@@ -101,7 +101,7 @@ func testUDPServer(t *testing.T, dispatcher Dispatcher) (*UDPConn, *UDPConn, cha
101101
errChan := make(chan error)
102102

103103
go func() {
104-
errChan <- server.Serve(dispatcher)
104+
errChan <- server.Serve(1, dispatcher)
105105
}()
106106

107107
raddr, err := net.ResolveUDPAddr("udp", server.LocalAddr().String())
@@ -151,7 +151,7 @@ func TestUDPConnServe_ContextTimeout(t *testing.T) {
151151
}
152152
errChan := make(chan error)
153153
go func() {
154-
errChan <- server.Serve(Dispatcher{})
154+
errChan <- server.Serve(1, Dispatcher{})
155155
}()
156156
select {
157157
case <-time.After(200 * time.Millisecond):
@@ -180,7 +180,7 @@ func TestUDPConnServe_ReadError(t *testing.T) {
180180
ctx: context.Background(),
181181
}
182182
go func() {
183-
errChan <- server.Serve(Dispatcher{
183+
errChan <- server.Serve(1, Dispatcher{
184184
"/close": Method(func(msg Message) error {
185185
return server.Close()
186186
}),
@@ -214,7 +214,7 @@ func TestUDPConnServe_NilDispatcher(t *testing.T) {
214214
if err != nil {
215215
t.Fatal(err)
216216
}
217-
if err := server.Serve(nil); err == nil {
217+
if err := server.Serve(1, nil); err == nil {
218218
t.Fatal("expected error, got nil")
219219
}
220220
}
@@ -234,6 +234,8 @@ func TestUDPConnServe_BadInboundAddr(t *testing.T) {
234234
if err := conn.Send(packet); err != nil {
235235
t.Fatal(err)
236236
}
237+
t.Logf("sent message %s", string(packet.Bytes()))
238+
237239
if err := <-errChan; err == nil {
238240
t.Fatalf("(packet %d) expected error, got nil", i)
239241
}

worker.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package osc
2+
3+
import (
4+
"github.com/pkg/errors"
5+
)
6+
7+
type Worker struct {
8+
DataChan chan Incoming
9+
Dispatcher Dispatcher
10+
ErrChan chan error
11+
Ready chan<- Worker
12+
}
13+
14+
func (w Worker) Run() {
15+
w.Ready <- w
16+
17+
for incoming := range w.DataChan {
18+
data := incoming.Data
19+
20+
switch data[0] {
21+
case BundleTag[0]:
22+
bundle, err := ParseBundle(data, incoming.Sender)
23+
if err != nil {
24+
w.ErrChan <- err
25+
}
26+
if err := w.Dispatcher.Dispatch(bundle); err != nil {
27+
w.ErrChan <- errors.Wrap(err, "dispatch bundle")
28+
}
29+
case MessageChar:
30+
msg, err := ParseMessage(data, incoming.Sender)
31+
if err != nil {
32+
w.ErrChan <- err
33+
}
34+
if err := w.Dispatcher.Invoke(msg); err != nil {
35+
w.ErrChan <- errors.Wrap(err, "dispatch message")
36+
}
37+
default:
38+
w.ErrChan <- ErrParse
39+
}
40+
// Announce the worker is ready again.
41+
w.Ready <- w
42+
}
43+
}

0 commit comments

Comments
 (0)