diff --git a/messaging/service.go b/messaging/service.go index 411fed6d..76cf2471 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "time" libp2p "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" @@ -15,20 +16,21 @@ import ( "github.com/statechannels/go-nitro-testground/peer" "github.com/statechannels/go-nitro/client/engine/store/safesync" "github.com/statechannels/go-nitro/protocols" - "github.com/statechannels/go-nitro/types" "github.com/testground/sdk-go/runtime" ) const ( - MESSAGE_ADDRESS = "/messages/1.0.0" - DELIMETER = '\n' - BUFFER_SIZE = 1_000_000 + MESSAGE_ADDRESS = "/messages/1.0.0" + DELIMITER = '\n' + BUFFER_SIZE = 1_000_000 + NUM_CONNECT_ATTEMPTS = 20 + RETRY_SLEEP_DURATION = 5 * time.Second ) // P2PMessageService is a rudimentary message service that uses TCP to send and receive messages type P2PMessageService struct { - out chan protocols.Message // for sending message to engine - in chan protocols.Message // for receiving messages from engine + out chan protocols.Message // for sending message to engine + peers *safesync.Map[peer.PeerInfo] quit chan struct{} // quit is used to signal the goroutine to stop @@ -56,10 +58,18 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim safePeers := safesync.Map[peer.PeerInfo]{} for _, p := range peers { + if p.Address == me.Address { + continue + } safePeers.Store(p.Address.String(), p) + + // Extract the peer ID from the multiaddr. + info, _ := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress()) + // Add the destination's peer multiaddress in the peerstore. + // This will be used during connection and stream creation by libp2p. + host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) } h := &P2PMessageService{ - in: make(chan protocols.Message, BUFFER_SIZE), out: make(chan protocols.Message, BUFFER_SIZE), peers: &safePeers, p2pHost: host, @@ -67,95 +77,69 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim me: me, metrics: metrics, } + h.p2pHost.SetStreamHandler(MESSAGE_ADDRESS, func(stream network.Stream) { - reader := bufio.NewReader(stream) - for { - select { - case <-h.quit: + select { + case <-h.quit: + stream.Close() + return + default: + + reader := bufio.NewReader(stream) + // Create a buffer stream for non blocking read and write. + raw, err := reader.ReadString(DELIMITER) + + // An EOF means the stream has been closed by the other side. + if errors.Is(err, io.EOF) { stream.Close() return - default: - - // Create a buffer stream for non blocking read and write. - raw, err := reader.ReadString(DELIMETER) - // TODO: If the stream has been closed we just bail for now - // TODO: Properly check for and handle stream reset error - if errors.Is(err, io.EOF) || fmt.Sprintf("%s", err) == "stream reset" { - stream.Close() - return - } - h.checkError(err) - m, err := protocols.DeserializeMessage(raw) - - h.checkError(err) - h.out <- m } + h.checkError(err) + m, err := protocols.DeserializeMessage(raw) + + h.checkError(err) + h.out <- m + stream.Close() } + }) return h } -// DialPeers dials all peers in the peer list and establishs a connection with them. -// This should be called once all the message services are running. -// TODO: The message service should handle this internally -func (s *P2PMessageService) DialPeers() { - go s.connectToPeers() -} - -// connectToPeers establishes a stream with all our peers and uses that stream to send messages -func (s *P2PMessageService) connectToPeers() { - // create a map with streams to all peers - peerStreams := make(map[types.Address]network.Stream) - s.peers.Range(func(key string, p peer.PeerInfo) bool { +// Send sends messages to other participants +func (ms *P2PMessageService) Send(msg protocols.Message) { + start := time.Now() + raw, err := msg.Serialize() + ms.checkError(err) - if p.Address == s.me.Address { - return false - } - // Extract the peer ID from the multiaddr. - info, err := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress()) - s.checkError(err) - - // Add the destination's peer multiaddress in the peerstore. - // This will be used during connection and stream creation by libp2p. - s.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + peer, ok := ms.peers.Load(msg.To.String()) + if !ok { + panic(fmt.Errorf("could not load peer %s", msg.To.String())) + } - stream, err := s.p2pHost.NewStream(context.Background(), info.ID, MESSAGE_ADDRESS) - s.checkError(err) - peerStreams[p.Address] = stream - return true - }) - for { - select { - case <-s.quit: + for i := 0; i < NUM_CONNECT_ATTEMPTS; i++ { + s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS) + if err == nil { + writer := bufio.NewWriter(s) + _, err = writer.WriteString(raw + string(DELIMITER)) + ms.checkError(err) + ms.recordOutgoingMessageMetrics(msg, []byte(raw)) - for _, writer := range peerStreams { - writer.Close() - } - return - case m := <-s.in: - raw, err := m.Serialize() - s.checkError(err) - s.recordOutgoingMessageMetrics(m, []byte(raw)) - writer := bufio.NewWriter(peerStreams[m.To]) - _, err = writer.WriteString(raw) - s.checkError(err) - err = writer.WriteByte(DELIMETER) - s.checkError(err) writer.Flush() + s.Close() + return } - } -} + fmt.Printf("attempt %d: could not open stream to %s, retrying in %s\n", i, peer.Address.String(), RETRY_SLEEP_DURATION.String()) + time.Sleep(RETRY_SLEEP_DURATION) -// Send dispatches messages -func (h *P2PMessageService) Send(msg protocols.Message) { + } + ms.metrics.Timer(fmt.Sprintf("msg_send,sender=%s", ms.me.Address)).Update(time.Since(start)) - // TODO: Now that the in chan has been deprecated from the API we should remove in from this message serviceß - h.in <- msg } // checkError panics if the SimpleTCPMessageService is running, otherwise it just returns @@ -197,4 +181,4 @@ func (h *P2PMessageService) recordOutgoingMessageMetrics(msg protocols.Message, h.metrics.Gauge(fmt.Sprintf("msg_payload_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(totalPayloadsSize)) h.metrics.Gauge(fmt.Sprintf("msg_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(len(raw))) -} \ No newline at end of file +} diff --git a/peer/peer.go b/peer/peer.go index 0f4ba249..2a24ba35 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -15,7 +15,7 @@ import ( ) // START_PORT is the start of the port range we'll use to issue unique ports. -const START_PORT = 7000 +const START_PORT = 49000 type Role = uint diff --git a/tests/virtual-payment.go b/tests/virtual-payment.go index 6d8449a0..6bc28ccc 100644 --- a/tests/virtual-payment.go +++ b/tests/virtual-payment.go @@ -76,7 +76,6 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err // We wait until everyone has chosen an address. client.MustSignalAndWait(ctx, "client created", runEnv.TestInstanceCount) - ms.DialPeers() client.MustSignalAndWait(ctx, "message service connected", runEnv.TestInstanceCount) ledgerIds := []types.Destination{}