From 269fa5419feb70398464414d29c6facf1984ad36 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Wed, 28 Sep 2022 14:17:23 -0700 Subject: [PATCH 01/10] p2p ms no longer uses long running streams The long running streams seem to reset eventually. --- messaging/service.go | 80 ++++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/messaging/service.go b/messaging/service.go index 411fed6d..91ff3f0b 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -15,7 +15,6 @@ import ( "github.com/statechannels/go-nitro-testground/peer" "github.com/statechannels/go-nitro/client/engine/store/safesync" "github.com/statechannels/go-nitro/protocols" - "github.com/statechannels/go-nitro/types" "github.com/testground/sdk-go/runtime" ) @@ -27,8 +26,8 @@ const ( // P2PMessageService is a rudimentary message service that uses TCP to send and receive messages type P2PMessageService struct { - out chan protocols.Message // for sending message to engine - in chan protocols.Message // for receiving messages from engine + out chan protocols.Message // for sending message to engine + peers *safesync.Map[peer.PeerInfo] quit chan struct{} // quit is used to signal the goroutine to stop @@ -59,7 +58,6 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim safePeers.Store(p.Address.String(), p) } h := &P2PMessageService{ - in: make(chan protocols.Message, BUFFER_SIZE), out: make(chan protocols.Message, BUFFER_SIZE), peers: &safePeers, p2pHost: host, @@ -67,6 +65,21 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim me: me, metrics: metrics, } + + for _, p := range peers { + if p.Address == h.me.Address { + continue + } + // Extract the peer ID from the multiaddr. + info, err := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress()) + h.checkError(err) + + // Add the destination's peer multiaddress in the peerstore. + // This will be used during connection and stream creation by libp2p. + h.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + + } + h.p2pHost.SetStreamHandler(MESSAGE_ADDRESS, func(stream network.Stream) { reader := bufio.NewReader(stream) @@ -79,10 +92,8 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim // Create a buffer stream for non blocking read and write. raw, err := reader.ReadString(DELIMETER) - // TODO: If the stream has been closed we just bail for now - // TODO: Properly check for and handle stream reset error - if errors.Is(err, io.EOF) || fmt.Sprintf("%s", err) == "stream reset" { - stream.Close() + + if errors.Is(err, io.EOF) || raw == "" { return } h.checkError(err) @@ -102,13 +113,7 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim // This should be called once all the message services are running. // TODO: The message service should handle this internally func (s *P2PMessageService) DialPeers() { - go s.connectToPeers() -} -// connectToPeers establishes a stream with all our peers and uses that stream to send messages -func (s *P2PMessageService) connectToPeers() { - // create a map with streams to all peers - peerStreams := make(map[types.Address]network.Stream) s.peers.Range(func(key string, p peer.PeerInfo) bool { if p.Address == s.me.Address { @@ -122,40 +127,35 @@ func (s *P2PMessageService) connectToPeers() { // This will be used during connection and stream creation by libp2p. s.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - stream, err := s.p2pHost.NewStream(context.Background(), info.ID, MESSAGE_ADDRESS) + err = s.p2pHost.Connect(context.Background(), *info) s.checkError(err) - peerStreams[p.Address] = stream + return true }) - for { - select { - case <-s.quit: +} - for _, writer := range peerStreams { - writer.Close() - } - return - case m := <-s.in: - raw, err := m.Serialize() - s.checkError(err) - s.recordOutgoingMessageMetrics(m, []byte(raw)) - writer := bufio.NewWriter(peerStreams[m.To]) - _, err = writer.WriteString(raw) - s.checkError(err) - err = writer.WriteByte(DELIMETER) - s.checkError(err) - writer.Flush() +// Send sends messages to other participants +func (ms *P2PMessageService) Send(msg protocols.Message) { - } + raw, err := msg.Serialize() + ms.checkError(err) + + ms.recordOutgoingMessageMetrics(msg, []byte(raw)) + peer, ok := ms.peers.Load(msg.To.String()) + if !ok { + panic(fmt.Errorf("could not load peer %s", msg.To.String())) } -} + s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS) + ms.checkError(err) -// Send dispatches messages -func (h *P2PMessageService) Send(msg protocols.Message) { + writer := bufio.NewWriter(s) + _, err = writer.WriteString(raw + string(DELIMETER)) + ms.checkError(err) + + writer.Flush() + s.Close() - // TODO: Now that the in chan has been deprecated from the API we should remove in from this message serviceß - h.in <- msg } // checkError panics if the SimpleTCPMessageService is running, otherwise it just returns @@ -197,4 +197,4 @@ func (h *P2PMessageService) recordOutgoingMessageMetrics(msg protocols.Message, h.metrics.Gauge(fmt.Sprintf("msg_payload_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(totalPayloadsSize)) h.metrics.Gauge(fmt.Sprintf("msg_size,sender=%s,receiver=%s", h.me.Address, msg.To)).Update(float64(len(raw))) -} \ No newline at end of file +} From 293be65ebf41dbe4aefc31996d9bfa98b8fdd240 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Wed, 28 Sep 2022 14:32:16 -0700 Subject: [PATCH 02/10] ignore EOF from client close --- messaging/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/messaging/service.go b/messaging/service.go index 91ff3f0b..12c3e703 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -93,7 +93,8 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim // Create a buffer stream for non blocking read and write. raw, err := reader.ReadString(DELIMETER) - if errors.Is(err, io.EOF) || raw == "" { + // An EOF means the stream has been closed by the other side. + if errors.Is(err, io.EOF) { return } h.checkError(err) From 5be82ce7f4f6e44d078a8daafefeee505f383a9d Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Wed, 28 Sep 2022 14:33:27 -0700 Subject: [PATCH 03/10] remove for loop --- messaging/service.go | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/messaging/service.go b/messaging/service.go index 12c3e703..aae0687e 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -82,28 +82,27 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim h.p2pHost.SetStreamHandler(MESSAGE_ADDRESS, func(stream network.Stream) { - reader := bufio.NewReader(stream) - for { - select { - case <-h.quit: - stream.Close() + select { + case <-h.quit: + stream.Close() + return + default: + + reader := bufio.NewReader(stream) + // Create a buffer stream for non blocking read and write. + raw, err := reader.ReadString(DELIMETER) + + // An EOF means the stream has been closed by the other side. + if errors.Is(err, io.EOF) { return - default: - - // Create a buffer stream for non blocking read and write. - raw, err := reader.ReadString(DELIMETER) - - // An EOF means the stream has been closed by the other side. - if errors.Is(err, io.EOF) { - return - } - h.checkError(err) - m, err := protocols.DeserializeMessage(raw) - - h.checkError(err) - h.out <- m } + h.checkError(err) + m, err := protocols.DeserializeMessage(raw) + + h.checkError(err) + h.out <- m } + }) return h From 08f6e231d3ab1c1f6eddedfca0995bb80d76af6f Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Wed, 28 Sep 2022 14:45:32 -0700 Subject: [PATCH 04/10] close streams when done in handler --- messaging/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/messaging/service.go b/messaging/service.go index aae0687e..4549a767 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -94,6 +94,7 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim // An EOF means the stream has been closed by the other side. if errors.Is(err, io.EOF) { + stream.Close() return } h.checkError(err) @@ -101,6 +102,7 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim h.checkError(err) h.out <- m + stream.Close() } }) From 278a2273659c1b6acdc8e6f0b0eebf5071a03e39 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Wed, 28 Sep 2022 15:01:02 -0700 Subject: [PATCH 05/10] use more isolated port range --- peer/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer/peer.go b/peer/peer.go index 0f4ba249..2a24ba35 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -15,7 +15,7 @@ import ( ) // START_PORT is the start of the port range we'll use to issue unique ports. -const START_PORT = 7000 +const START_PORT = 49000 type Role = uint From b7183497836159f14bd2812c9d7f12851a4d59c6 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Thu, 29 Sep 2022 08:11:00 -0700 Subject: [PATCH 06/10] add a basic retry mechanism --- messaging/service.go | 57 +++++++++++++++------------------------- tests/virtual-payment.go | 1 - 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/messaging/service.go b/messaging/service.go index 4549a767..d52d2c9c 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "time" libp2p "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" @@ -19,9 +20,11 @@ import ( ) const ( - MESSAGE_ADDRESS = "/messages/1.0.0" - DELIMETER = '\n' - BUFFER_SIZE = 1_000_000 + MESSAGE_ADDRESS = "/messages/1.0.0" + DELIMETER = '\n' + BUFFER_SIZE = 1_000_000 + NUM_CONNECT_ATTEMPTS = 20 + RETRY_SLEEP_DURATION = 5 * time.Second ) // P2PMessageService is a rudimentary message service that uses TCP to send and receive messages @@ -111,52 +114,34 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim } -// DialPeers dials all peers in the peer list and establishs a connection with them. -// This should be called once all the message services are running. -// TODO: The message service should handle this internally -func (s *P2PMessageService) DialPeers() { - - s.peers.Range(func(key string, p peer.PeerInfo) bool { - - if p.Address == s.me.Address { - return false - } - // Extract the peer ID from the multiaddr. - info, err := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress()) - s.checkError(err) - - // Add the destination's peer multiaddress in the peerstore. - // This will be used during connection and stream creation by libp2p. - s.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - - err = s.p2pHost.Connect(context.Background(), *info) - s.checkError(err) - - return true - }) -} - // Send sends messages to other participants func (ms *P2PMessageService) Send(msg protocols.Message) { raw, err := msg.Serialize() ms.checkError(err) - ms.recordOutgoingMessageMetrics(msg, []byte(raw)) peer, ok := ms.peers.Load(msg.To.String()) if !ok { panic(fmt.Errorf("could not load peer %s", msg.To.String())) } - s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS) - ms.checkError(err) + for i := 0; i < NUM_CONNECT_ATTEMPTS; i++ { + s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS) + if err == nil { + writer := bufio.NewWriter(s) + _, err = writer.WriteString(raw + string(DELIMETER)) + ms.checkError(err) + ms.recordOutgoingMessageMetrics(msg, []byte(raw)) - writer := bufio.NewWriter(s) - _, err = writer.WriteString(raw + string(DELIMETER)) - ms.checkError(err) + writer.Flush() + s.Close() - writer.Flush() - s.Close() + return + } else { + fmt.Printf("attempt %d: could not open stream to %s, retrying in %s\n", i, peer.Address.String(), RETRY_SLEEP_DURATION.String()) + time.Sleep(RETRY_SLEEP_DURATION) + } + } } diff --git a/tests/virtual-payment.go b/tests/virtual-payment.go index 6d8449a0..6bc28ccc 100644 --- a/tests/virtual-payment.go +++ b/tests/virtual-payment.go @@ -76,7 +76,6 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err // We wait until everyone has chosen an address. client.MustSignalAndWait(ctx, "client created", runEnv.TestInstanceCount) - ms.DialPeers() client.MustSignalAndWait(ctx, "message service connected", runEnv.TestInstanceCount) ledgerIds := []types.Destination{} From e67e7a77d36f82f74130c9fe2a0eb413854264ba Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Thu, 29 Sep 2022 09:57:54 -0700 Subject: [PATCH 07/10] measure send duration --- messaging/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/messaging/service.go b/messaging/service.go index d52d2c9c..ae8718ca 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -116,7 +116,7 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim // Send sends messages to other participants func (ms *P2PMessageService) Send(msg protocols.Message) { - + start := time.Now() raw, err := msg.Serialize() ms.checkError(err) @@ -142,6 +142,7 @@ func (ms *P2PMessageService) Send(msg protocols.Message) { time.Sleep(RETRY_SLEEP_DURATION) } } + ms.metrics.Timer(fmt.Sprintf("msg_send,sender=%s", ms.me.Address)).Update(time.Since(start)) } From 51267d3ad749d6aa2a969ebb4cce39bd0b6163e3 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Fri, 30 Sep 2022 10:20:23 -0700 Subject: [PATCH 08/10] typo --- messaging/service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/messaging/service.go b/messaging/service.go index ae8718ca..f9278b2b 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -21,7 +21,7 @@ import ( const ( MESSAGE_ADDRESS = "/messages/1.0.0" - DELIMETER = '\n' + DELIMITER = '\n' BUFFER_SIZE = 1_000_000 NUM_CONNECT_ATTEMPTS = 20 RETRY_SLEEP_DURATION = 5 * time.Second @@ -93,7 +93,7 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim reader := bufio.NewReader(stream) // Create a buffer stream for non blocking read and write. - raw, err := reader.ReadString(DELIMETER) + raw, err := reader.ReadString(DELIMITER) // An EOF means the stream has been closed by the other side. if errors.Is(err, io.EOF) { @@ -129,7 +129,7 @@ func (ms *P2PMessageService) Send(msg protocols.Message) { s, err := ms.p2pHost.NewStream(context.Background(), peer.Id, MESSAGE_ADDRESS) if err == nil { writer := bufio.NewWriter(s) - _, err = writer.WriteString(raw + string(DELIMETER)) + _, err = writer.WriteString(raw + string(DELIMITER)) ms.checkError(err) ms.recordOutgoingMessageMetrics(msg, []byte(raw)) From 53ff47c327fe7fb1d40015fbf5fad3314baa6f87 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Fri, 30 Sep 2022 12:45:41 -0700 Subject: [PATCH 09/10] combine loop --- messaging/service.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/messaging/service.go b/messaging/service.go index f9278b2b..acb191f3 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -58,7 +58,16 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim safePeers := safesync.Map[peer.PeerInfo]{} for _, p := range peers { + if p.Address == me.Address { + continue + } safePeers.Store(p.Address.String(), p) + + // Extract the peer ID from the multiaddr. + info, _ := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress()) + // Add the destination's peer multiaddress in the peerstore. + // This will be used during connection and stream creation by libp2p. + host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) } h := &P2PMessageService{ out: make(chan protocols.Message, BUFFER_SIZE), @@ -69,20 +78,6 @@ func NewP2PMessageService(me peer.MyInfo, peers []peer.PeerInfo, metrics *runtim metrics: metrics, } - for _, p := range peers { - if p.Address == h.me.Address { - continue - } - // Extract the peer ID from the multiaddr. - info, err := p2ppeer.AddrInfoFromP2pAddr(p.MultiAddress()) - h.checkError(err) - - // Add the destination's peer multiaddress in the peerstore. - // This will be used during connection and stream creation by libp2p. - h.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - - } - h.p2pHost.SetStreamHandler(MESSAGE_ADDRESS, func(stream network.Stream) { select { From c0b17fb508c4e2f1cafa598f7fac51db838452be Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Fri, 30 Sep 2022 12:46:41 -0700 Subject: [PATCH 10/10] remove redundant else --- messaging/service.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/messaging/service.go b/messaging/service.go index acb191f3..76cf2471 100644 --- a/messaging/service.go +++ b/messaging/service.go @@ -132,10 +132,11 @@ func (ms *P2PMessageService) Send(msg protocols.Message) { s.Close() return - } else { - fmt.Printf("attempt %d: could not open stream to %s, retrying in %s\n", i, peer.Address.String(), RETRY_SLEEP_DURATION.String()) - time.Sleep(RETRY_SLEEP_DURATION) } + + fmt.Printf("attempt %d: could not open stream to %s, retrying in %s\n", i, peer.Address.String(), RETRY_SLEEP_DURATION.String()) + time.Sleep(RETRY_SLEEP_DURATION) + } ms.metrics.Timer(fmt.Sprintf("msg_send,sender=%s", ms.me.Address)).Update(time.Since(start))