Skip to content

Commit

Permalink
Merge pull request #114 from statechannels/fix-ms
Browse files Browse the repository at this point in the history
Update Message to support larger number of peers
  • Loading branch information
lalexgap authored Sep 30, 2022
2 parents a63ca40 + c0b17fb commit d763d15
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 78 deletions.
136 changes: 60 additions & 76 deletions messaging/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"time"

libp2p "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -15,20 +16,21 @@ import (
"github.com/statechannels/go-nitro-testground/peer"
"github.com/statechannels/go-nitro/client/engine/store/safesync"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/types"
"github.com/testground/sdk-go/runtime"
)

const (
MESSAGE_ADDRESS = "/messages/1.0.0"
DELIMETER = '\n'
BUFFER_SIZE = 1_000_000
MESSAGE_ADDRESS = "/messages/1.0.0"
DELIMITER = '\n'
BUFFER_SIZE = 1_000_000
NUM_CONNECT_ATTEMPTS = 20
RETRY_SLEEP_DURATION = 5 * time.Second
)

// P2PMessageService is a rudimentary message service that uses TCP to send and receive messages
type P2PMessageService struct {
out chan protocols.Message // for sending message to engine
in chan protocols.Message // for receiving messages from engine
out chan protocols.Message // for sending message to engine

peers *safesync.Map[peer.PeerInfo]

quit chan struct{} // quit is used to signal the goroutine to stop
Expand Down Expand Up @@ -56,106 +58,88 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim

safePeers := safesync.Map[peer.PeerInfo]{}
for _, p := range peers {
if p.Address == me.Address {
continue
}
safePeers.Store(p.Address.String(), p)

// Extract the peer ID from the multiaddr.
info, _ := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress())
// Add the destination's peer multiaddress in the peerstore.
// This will be used during connection and stream creation by libp2p.
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
}
h := &P2PMessageService{
in: make(chan protocols.Message, BUFFER_SIZE),
out: make(chan protocols.Message, BUFFER_SIZE),
peers: &safePeers,
p2pHost: host,
quit: make(chan struct{}),
me: me,
metrics: metrics,
}

h.p2pHost.SetStreamHandler(MESSAGE_ADDRESS, func(stream network.Stream) {

reader := bufio.NewReader(stream)
for {
select {
case <-h.quit:
select {
case <-h.quit:
stream.Close()
return
default:

reader := bufio.NewReader(stream)
// Create a buffer stream for non blocking read and write.
raw, err := reader.ReadString(DELIMITER)

// An EOF means the stream has been closed by the other side.
if errors.Is(err, io.EOF) {
stream.Close()
return
default:

// Create a buffer stream for non blocking read and write.
raw, err := reader.ReadString(DELIMETER)
// TODO: If the stream has been closed we just bail for now
// TODO: Properly check for and handle stream reset error
if errors.Is(err, io.EOF) || fmt.Sprintf("%s", err) == "stream reset" {
stream.Close()
return
}
h.checkError(err)
m, err := protocols.DeserializeMessage(raw)

h.checkError(err)
h.out <- m
}
h.checkError(err)
m, err := protocols.DeserializeMessage(raw)

h.checkError(err)
h.out <- m
stream.Close()
}

})

return h

}

// DialPeers dials all peers in the peer list and establishs a connection with them.
// This should be called once all the message services are running.
// TODO: The message service should handle this internally
func (s *P2PMessageService) DialPeers() {
go s.connectToPeers()
}

// connectToPeers establishes a stream with all our peers and uses that stream to send messages
func (s *P2PMessageService) connectToPeers() {
// create a map with streams to all peers
peerStreams := make(map[types.Address]network.Stream)
s.peers.Range(func(key string, p peer.PeerInfo) bool {
// Send sends messages to other participants
func (ms *P2PMessageService) Send(msg protocols.Message) {
start := time.Now()
raw, err := msg.Serialize()
ms.checkError(err)

if p.Address == s.me.Address {
return false
}
// Extract the peer ID from the multiaddr.
info, err := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress())
s.checkError(err)

// Add the destination's peer multiaddress in the peerstore.
// This will be used during connection and stream creation by libp2p.
s.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
peer, ok := ms.peers.Load(msg.To.String())
if !ok {
panic(fmt.Errorf("could not load peer %s", msg.To.String()))
}

stream, err := s.p2pHost.NewStream(context.Background(), info.ID, MESSAGE_ADDRESS)
s.checkError(err)
peerStreams[p.Address] = stream
return true
})
for {
select {
case <-s.quit:
for i := 0; i < NUM_CONNECT_ATTEMPTS; i++ {
s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS)
if err == nil {
writer := bufio.NewWriter(s)
_, err = writer.WriteString(raw + string(DELIMITER))
ms.checkError(err)
ms.recordOutgoingMessageMetrics(msg, []byte(raw))

for _, writer := range peerStreams {
writer.Close()
}
return
case m := <-s.in:
raw, err := m.Serialize()
s.checkError(err)
s.recordOutgoingMessageMetrics(m, []byte(raw))
writer := bufio.NewWriter(peerStreams[m.To])
_, err = writer.WriteString(raw)
s.checkError(err)
err = writer.WriteByte(DELIMETER)
s.checkError(err)
writer.Flush()
s.Close()

return
}
}

}
fmt.Printf("attempt %d: could not open stream to %s, retrying in %s\n", i, peer.Address.String(), RETRY_SLEEP_DURATION.String())
time.Sleep(RETRY_SLEEP_DURATION)

// Send dispatches messages
func (h *P2PMessageService) Send(msg protocols.Message) {
}
ms.metrics.Timer(fmt.Sprintf("msg_send,sender=%s", ms.me.Address)).Update(time.Since(start))

// TODO: Now that the in chan has been deprecated from the API we should remove in from this message serviceß
h.in <- msg
}

// checkError panics if the SimpleTCPMessageService is running, otherwise it just returns
Expand Down Expand Up @@ -197,4 +181,4 @@ func (h *P2PMessageService) recordOutgoingMessageMetrics(msg protocols.Message,
h.metrics.Gauge(fmt.Sprintf("msg_payload_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(totalPayloadsSize))

h.metrics.Gauge(fmt.Sprintf("msg_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(len(raw)))
}
}
2 changes: 1 addition & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// START_PORT is the start of the port range we'll use to issue unique ports.
const START_PORT = 7000
const START_PORT = 49000

type Role = uint

Expand Down
1 change: 0 additions & 1 deletion tests/virtual-payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err
// We wait until everyone has chosen an address.
client.MustSignalAndWait(ctx, "client created", runEnv.TestInstanceCount)

ms.DialPeers()
client.MustSignalAndWait(ctx, "message service connected", runEnv.TestInstanceCount)

ledgerIds := []types.Destination{}
Expand Down

0 comments on commit d763d15

Please sign in to comment.