Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Message to support larger number of peers #114

Merged
merged 10 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 60 additions & 76 deletions messaging/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"time"

libp2p "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -15,20 +16,21 @@ import (
"github.com/statechannels/go-nitro-testground/peer"
"github.com/statechannels/go-nitro/client/engine/store/safesync"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/types"
"github.com/testground/sdk-go/runtime"
)

const (
MESSAGE_ADDRESS = "/messages/1.0.0"
DELIMETER = '\n'
BUFFER_SIZE = 1_000_000
MESSAGE_ADDRESS = "/messages/1.0.0"
DELIMITER = '\n'
BUFFER_SIZE = 1_000_000
NUM_CONNECT_ATTEMPTS = 20
RETRY_SLEEP_DURATION = 5 * time.Second
)

// P2PMessageService is a rudimentary message service that uses TCP to send and receive messages
type P2PMessageService struct {
out chan protocols.Message // for sending message to engine
in chan protocols.Message // for receiving messages from engine
out chan protocols.Message // for sending message to engine

peers *safesync.Map[peer.PeerInfo]

quit chan struct{} // quit is used to signal the goroutine to stop
Expand Down Expand Up @@ -56,106 +58,88 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim

safePeers := safesync.Map[peer.PeerInfo]{}
for _, p := range peers {
if p.Address == me.Address {
continue
}
safePeers.Store(p.Address.String(), p)

// Extract the peer ID from the multiaddr.
info, _ := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress())
// Add the destination's peer multiaddress in the peerstore.
// This will be used during connection and stream creation by libp2p.
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
}
h := &P2PMessageService{
in: make(chan protocols.Message, BUFFER_SIZE),
out: make(chan protocols.Message, BUFFER_SIZE),
peers: &safePeers,
p2pHost: host,
quit: make(chan struct{}),
me: me,
metrics: metrics,
}

h.p2pHost.SetStreamHandler(MESSAGE_ADDRESS, func(stream network.Stream) {

reader := bufio.NewReader(stream)
for {
select {
case <-h.quit:
select {
case <-h.quit:
stream.Close()
return
default:

reader := bufio.NewReader(stream)
// Create a buffer stream for non blocking read and write.
raw, err := reader.ReadString(DELIMITER)

// An EOF means the stream has been closed by the other side.
if errors.Is(err, io.EOF) {
stream.Close()
return
default:

// Create a buffer stream for non blocking read and write.
raw, err := reader.ReadString(DELIMETER)
// TODO: If the stream has been closed we just bail for now
// TODO: Properly check for and handle stream reset error
if errors.Is(err, io.EOF) || fmt.Sprintf("%s", err) == "stream reset" {
stream.Close()
return
}
h.checkError(err)
m, err := protocols.DeserializeMessage(raw)

h.checkError(err)
h.out <- m
}
h.checkError(err)
m, err := protocols.DeserializeMessage(raw)

h.checkError(err)
h.out <- m
stream.Close()
}

})

return h

}

// DialPeers dials all peers in the peer list and establishs a connection with them.
// This should be called once all the message services are running.
// TODO: The message service should handle this internally
func (s *P2PMessageService) DialPeers() {
go s.connectToPeers()
}

// connectToPeers establishes a stream with all our peers and uses that stream to send messages
func (s *P2PMessageService) connectToPeers() {
// create a map with streams to all peers
peerStreams := make(map[types.Address]network.Stream)
s.peers.Range(func(key string, p peer.PeerInfo) bool {
// Send sends messages to other participants
func (ms *P2PMessageService) Send(msg protocols.Message) {
start := time.Now()
raw, err := msg.Serialize()
ms.checkError(err)

if p.Address == s.me.Address {
return false
}
// Extract the peer ID from the multiaddr.
info, err := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress())
s.checkError(err)

// Add the destination's peer multiaddress in the peerstore.
// This will be used during connection and stream creation by libp2p.
s.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
peer, ok := ms.peers.Load(msg.To.String())
if !ok {
panic(fmt.Errorf("could not load peer %s", msg.To.String()))
}

stream, err := s.p2pHost.NewStream(context.Background(), info.ID, MESSAGE_ADDRESS)
s.checkError(err)
peerStreams[p.Address] = stream
return true
})
for {
select {
case <-s.quit:
for i := 0; i < NUM_CONNECT_ATTEMPTS; i++ {
s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS)
if err == nil {
writer := bufio.NewWriter(s)
_, err = writer.WriteString(raw + string(DELIMITER))
ms.checkError(err)
ms.recordOutgoingMessageMetrics(msg, []byte(raw))

for _, writer := range peerStreams {
writer.Close()
}
return
case m := <-s.in:
raw, err := m.Serialize()
s.checkError(err)
s.recordOutgoingMessageMetrics(m, []byte(raw))
writer := bufio.NewWriter(peerStreams[m.To])
_, err = writer.WriteString(raw)
s.checkError(err)
err = writer.WriteByte(DELIMETER)
s.checkError(err)
writer.Flush()
s.Close()

return
}
}

}
fmt.Printf("attempt %d: could not open stream to %s, retrying in %s\n", i, peer.Address.String(), RETRY_SLEEP_DURATION.String())
time.Sleep(RETRY_SLEEP_DURATION)

// Send dispatches messages
func (h *P2PMessageService) Send(msg protocols.Message) {
}
ms.metrics.Timer(fmt.Sprintf("msg_send,sender=%s", ms.me.Address)).Update(time.Since(start))

// TODO: Now that the in chan has been deprecated from the API we should remove in from this message serviceß
h.in <- msg
}

// checkError panics if the SimpleTCPMessageService is running, otherwise it just returns
Expand Down Expand Up @@ -197,4 +181,4 @@ func (h *P2PMessageService) recordOutgoingMessageMetrics(msg protocols.Message,
h.metrics.Gauge(fmt.Sprintf("msg_payload_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(totalPayloadsSize))

h.metrics.Gauge(fmt.Sprintf("msg_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(len(raw)))
}
}
2 changes: 1 addition & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// START_PORT is the start of the port range we'll use to issue unique ports.
const START_PORT = 7000
const START_PORT = 49000

type Role = uint

Expand Down
1 change: 0 additions & 1 deletion tests/virtual-payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err
// We wait until everyone has chosen an address.
client.MustSignalAndWait(ctx, "client created", runEnv.TestInstanceCount)

ms.DialPeers()
client.MustSignalAndWait(ctx, "message service connected", runEnv.TestInstanceCount)

ledgerIds := []types.Destination{}
Expand Down