diff --git a/network/udp/net.go b/network/udp/net.go index faf8a0a..3cc5d39 100644 --- a/network/udp/net.go +++ b/network/udp/net.go @@ -8,15 +8,23 @@ import ( "log" "net" "sync" + "time" "github.com/ConsenSys/handel" h "github.com/ConsenSys/handel" "github.com/ConsenSys/handel/network" ) -// Network is a handel.Network implementation using UDP as its transport layer +type Network interface { + Send(identities []h.Identity, packet *h.Packet) + RegisterListener(listener h.Listener) + getListeners() []handel.Listener + Stop() +} + +// UDPNetwork is a handel.Network implementation using UDP as its transport layer // listens on 0.0.0.0 -type Network struct { +type UDPNetwork struct { sync.RWMutex udpSock *net.UDPConn listeners []h.Listener @@ -31,8 +39,65 @@ type Network struct { rcvd int } +type delayedPacket struct { + toSendAt time.Time + identities []h.Identity + data h.Packet +} + +type DelayedUDPNetwork struct { + network Network + delay time.Duration + in chan *delayedPacket +} + +func (n *DelayedUDPNetwork) Send(identities []h.Identity, packet *h.Packet) { + n.in <- &delayedPacket{time.Now().Add(n.delay), identities,*packet} +} + +func (n *DelayedUDPNetwork) backgroundSend() { + for dp := range n.in { + delta := dp.toSendAt.Sub(time.Now()) + if delta.Nanoseconds() > time.Millisecond.Nanoseconds() { + // Not really useful to sleep for less than 1ms + time.Sleep(delta) + } + n.network.Send(dp.identities, &dp.data) + } +} + +func (n *DelayedUDPNetwork) Stop() { + n.network.Stop() + close(n.in) +} + +func (n *DelayedUDPNetwork) RegisterListener(listener h.Listener) { + n.network.RegisterListener(listener) +} + +func (n *DelayedUDPNetwork) getListeners() []handel.Listener{ + return n.network.getListeners() +} + +func NewDelayedUDPNetwork(delay time.Duration, addr string, enc network.Encoding) (*DelayedUDPNetwork, error) { + n, err := NewNetwork(addr, enc) + if err != nil { + return nil, err + } + + res := &DelayedUDPNetwork{ + n, + delay, + make(chan *delayedPacket, 10000), + } + + go res.backgroundSend() + + return res, nil +} + // NewNetwork creates Network baked by udp protocol -func NewNetwork(addr string, enc network.Encoding) (*Network, error) { +func NewNetwork(addr string, enc network.Encoding) (Network, error) { _, port, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -50,7 +115,7 @@ func NewNetwork(addr string, enc network.Encoding) (*Network, error) { return nil, err } - udpNet := &Network{ + udpNet := &UDPNetwork{ udpSock: udpSock, enc: enc, newPacket: make(chan *handel.Packet, 20000), @@ -58,6 +123,7 @@ func NewNetwork(addr string, enc network.Encoding) (*Network, error) { ready: make(chan bool, 1), done: make(chan bool, 1), } + go udpNet.handler() go udpNet.loop() go udpNet.dispatchLoop() @@ -65,7 +131,7 @@ func NewNetwork(addr string, enc network.Encoding) (*Network, error) { } // Stop closes -func (udpNet *Network) Stop() { +func (udpNet *UDPNetwork) Stop() { udpNet.Lock() defer udpNet.Unlock() if udpNet.quit { @@ -77,14 +143,14 @@ func (udpNet *Network) Stop() { } //RegisterListener registers listener for processing incoming packets -func (udpNet *Network) RegisterListener(listener h.Listener) { +func (udpNet *UDPNetwork) RegisterListener(listener h.Listener) { udpNet.Lock() defer udpNet.Unlock() udpNet.listeners = append(udpNet.listeners, listener) } //Send sends a packet to supplied identities -func (udpNet *Network) Send(identities []h.Identity, packet *h.Packet) { +func (udpNet *UDPNetwork) Send(identities []h.Identity, packet *h.Packet) { udpNet.Lock() udpNet.sent += len(identities) udpNet.Unlock() @@ -93,7 +159,7 @@ func (udpNet *Network) Send(identities []h.Identity, packet *h.Packet) { } } -func (udpNet *Network) send(identity h.Identity, packet *h.Packet) { +func (udpNet *UDPNetwork) send(identity h.Identity, packet *h.Packet) { addr := identity.Address() udpAddr, err := net.ResolveUDPAddr("udp4", addr) if err != nil { @@ -121,7 +187,7 @@ func (udpNet *Network) send(identity h.Identity, packet *h.Packet) { //fmt.Printf("%s -> sending packet to %s\n", udpSock.LocalAddr().String(), addr) } -func (udpNet *Network) handler() { +func (udpNet *UDPNetwork) handler() { enc := udpNet.enc for { //udpNet.quit and udpNet.listeners have to be guarded by a read lock @@ -145,7 +211,7 @@ func (udpNet *Network) handler() { } } -func (udpNet *Network) loop() { +func (udpNet *UDPNetwork) loop() { pendings := list.New() var ready = false send := func() { @@ -179,14 +245,14 @@ func (udpNet *Network) loop() { } } -func (udpNet *Network) getListeners() []handel.Listener { +func (udpNet *UDPNetwork) getListeners() []handel.Listener { udpNet.RLock() defer udpNet.RUnlock() udpNet.rcvd++ return udpNet.listeners } -func (udpNet *Network) dispatchLoop() { +func (udpNet *UDPNetwork) dispatchLoop() { dispatch := func(p *handel.Packet) { listeners := udpNet.getListeners() for _, listener := range listeners { diff --git a/network/udp/net_test.go b/network/udp/net_test.go index b663a0c..9ab6f1a 100644 --- a/network/udp/net_test.go +++ b/network/udp/net_test.go @@ -10,7 +10,7 @@ import ( ) func TestUDPNetwork(t *testing.T) { - n1, err := NewNetwork("127.0.0.1:3000", network.NewGOBEncoding()) + n1, err := NewDelayedUDPNetwork(1*time.Millisecond, "127.0.0.1:3000", network.NewGOBEncoding()) require.NoError(t, err) n2, err := NewNetwork("127.0.0.1:3001", network.NewGOBEncoding()) require.NoError(t, err) diff --git a/simul/bad.toml b/simul/bad.toml index 4a311ff..68032d8 100644 --- a/simul/bad.toml +++ b/simul/bad.toml @@ -1,4 +1,4 @@ -Network = "udp" +Network = "delayed_udp" Curve = "bn256/cf" Encoding = "gob" MonitorPort = 10000 diff --git a/simul/config_example.toml b/simul/config_example.toml index 76f4d19..0f43603 100644 --- a/simul/config_example.toml +++ b/simul/config_example.toml @@ -1,4 +1,4 @@ -Network = "udp" +Network = "delayed_udp" Curve = "bn256/cf" Encoding = "gob" MonitorPort = 9980 diff --git a/simul/lib/config.go b/simul/lib/config.go index 767aab4..156b33c 100644 --- a/simul/lib/config.go +++ b/simul/lib/config.go @@ -174,6 +174,8 @@ func (c *Config) selectNetwork(id handel.Identity) (handel.Network, error) { switch c.Network { case "udp": return udp.NewNetwork(id.Address(), encoding) + case "delayed_udp": + return udp.NewDelayedUDPNetwork(2000* time.Millisecond, id.Address(), encoding) case "quic-test-insecure": cfg := quic.NewInsecureTestConfig() return quic.NewNetwork(id.Address(), encoding, cfg) diff --git a/simul/lib/sync.go b/simul/lib/sync.go index d63b850..366a9a1 100644 --- a/simul/lib/sync.go +++ b/simul/lib/sync.go @@ -30,7 +30,7 @@ type SyncMaster struct { exp int probExp int // probabilistically expected nb,i.e. 95% of exp total int - n *udp.Network + n udp.Network states map[int]*state } @@ -193,7 +193,7 @@ type SyncSlave struct { sync.Mutex own string master string - net *udp.Network + net udp.Network ids []int states map[int]*slaveState }