From 13e8d6782b11759149c718f14d361c5d3fd5ab31 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 9 Aug 2024 17:11:17 -0400 Subject: [PATCH] WIP: P2PNetwork readme --- network/README-P2P.md | 86 +++++++++++++++++++++++++++++++++++++++++++ network/p2p/README.md | 6 ++- network/p2pNetwork.go | 4 +- network/p2pPeer.go | 18 ++++----- 4 files changed, 102 insertions(+), 12 deletions(-) create mode 100644 network/README-P2P.md diff --git a/network/README-P2P.md b/network/README-P2P.md new file mode 100644 index 0000000000..f89c2393a3 --- /dev/null +++ b/network/README-P2P.md @@ -0,0 +1,86 @@ +# P2P Network implementation overview + +Refer to [p2p sub-package overview](./p2p/README.md) for details about p2p sub-components. + +`P2PNetwork` implements the `GossipNode` interface similarly to `WsNetwork`. Both use +the same peer connection management and message broadcast functions but different +transport: lip2p-managed connections and HTTP + WebSocket, respectively. +`P2PNetwork` and `WsNetwork` require `config.NetAddress` to be set in order to start a server. + +In addition, `HybridNetwork` is an aggregate of `P2PNetwork` and `WsNetwork` allowing a node +to interact over both networks. In the case of hybrid operation, both `config.P2PNetAddress` and +`config.NetAddress` are used. + +## HTTP Services + +`P2PNetwork` uses libp2p's `http` submodule to handle HTTP traffic over libp2p-managed connection. +It is `http.Handler`-compatible so that service handlers are registered the same way as for `WsNetwork`. + +## Phonebook and Peerstore and peer classes + +## WsPeer + +Peers are created in `wsStreamHandler` that is called for both incoming and outgoing connections (and streams). `incoming` flag is set to true for incoming connection. +At the very beginning of the `wsStreamHandler` one byte read/write happens in order to make sure: + - Stream is operable + - A placeholder for a handshake where some meta-data can be exchanged + +Connected peers are maintained as a `wsPeers` map similarly to the `WsNetwork`. + + + +## Broadcaster + +`msgBroadcaster` encapsulates a shared broadcasting logic: priority vs bulk messages (and queues), +data preparation, peers retrieving. Broadcast requests eventually hits `peer.writeNonBlockMsgs` -> `peer.writeLoopSendMsg` -> `conn.WriteMessage`. See the diagram denoting the broadcast data flow. + +```mermaid +graph LR + + p2pnet[P2PNetwork] + wsnet[WsNetwork] + B[broadcaster] + + p2pnet & wsnet --> B + + subgraph "wsPeer" + direction TB + writeNonBlockMsgs + Conn[conn.WriteMessage] + + subgraph "writeLoop" + writeLoopSendMsg + end + + writeNonBlockMsgs --> writeLoop + writeLoopSendMsg --> Conn + end + + B --> writeNonBlockMsgs + + Conn --> WMP2P & WMWS + + subgraph "wsPeerConnP2P" + WMP2P[WriteMessage] + end + + subgraph "websocket" + WMWS[WriteMessage] + end + + subgraph "libp2p" + stream.Write + end + + WMP2P --> libp2p +``` + +## DHT and Capabilities discovery + +## Net identity + + \ No newline at end of file diff --git a/network/p2p/README.md b/network/p2p/README.md index 9361578c20..8778eb67e8 100644 --- a/network/p2p/README.md +++ b/network/p2p/README.md @@ -70,6 +70,10 @@ The underlying libp2p implementation abstracted as `p2p.Service` and initialized `Host` is also used for p2p HTTP server and DHT Discovery service creation. It is also useful for unit testing. Note, `Host` is created with `NoListenAddrs` options that prevents automatic listening and networking until the `Service.Start()` is called. This follows others services design (including WsNetwork service). +### Connection and Stream direction + +TBD + ### Connection limiting libp2p's `ResourceManager` is used to limit number of connections up tp `cfg.P2PIncomingConnectionsLimit`. @@ -97,7 +101,7 @@ graph LR end P2P --> AdvCap - Cap --> P2P + Cap -.-> P2P subgraph "libp2p" Adv[Advertise] diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 37c6cfcd52..2c2c885b38 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -782,7 +782,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ wsPeerCore: peerCore, - conn: &wsPeerConnP2PImpl{stream: stream}, + conn: &wsPeerConnP2P{stream: stream}, outgoing: !incoming, identity: netIdentPeerID, } @@ -844,7 +844,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea // peerRemoteClose called from wsPeer to report that it has closed func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { - remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer() + remotePeerID := peer.conn.(*wsPeerConnP2P).stream.Conn().RemotePeer() n.wsPeersLock.Lock() n.identityTracker.removeIdentity(peer) delete(n.wsPeers, remotePeerID) diff --git a/network/p2pPeer.go b/network/p2pPeer.go index a5065f01bb..9a0ce2699d 100644 --- a/network/p2pPeer.go +++ b/network/p2pPeer.go @@ -31,15 +31,15 @@ import ( mnet "github.com/multiformats/go-multiaddr/net" ) -type wsPeerConnP2PImpl struct { +type wsPeerConnP2P struct { stream network.Stream } -func (c *wsPeerConnP2PImpl) RemoteAddrString() string { +func (c *wsPeerConnP2P) RemoteAddrString() string { return c.stream.Conn().RemoteMultiaddr().String() } -func (c *wsPeerConnP2PImpl) NextReader() (int, io.Reader, error) { +func (c *wsPeerConnP2P) NextReader() (int, io.Reader, error) { // read length var lenbuf [4]byte _, err := io.ReadFull(c.stream, lenbuf[:]) @@ -54,7 +54,7 @@ func (c *wsPeerConnP2PImpl) NextReader() (int, io.Reader, error) { return websocket.BinaryMessage, io.LimitReader(c.stream, int64(msglen)), nil } -func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error { +func (c *wsPeerConnP2P) WriteMessage(_ int, buf []byte) error { // simple message framing: // 1. write encoding of the length var lenbuf [4]byte @@ -69,13 +69,13 @@ func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error { } // Do nothing for now since this doesn't actually close the connection just sends the close message -func (c *wsPeerConnP2PImpl) CloseWithMessage([]byte, time.Time) error { +func (c *wsPeerConnP2P) CloseWithMessage([]byte, time.Time) error { return nil } -func (c *wsPeerConnP2PImpl) SetReadLimit(int64) {} +func (c *wsPeerConnP2P) SetReadLimit(int64) {} -func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error { +func (c *wsPeerConnP2P) CloseWithoutFlush() error { err := c.stream.Close() if err != nil && err != yamux.ErrStreamClosed && err != yamux.ErrSessionShutdown && err != yamux.ErrStreamReset { return err @@ -83,9 +83,9 @@ func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error { return nil } -func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil } +func (c *wsPeerConnP2P) UnderlyingConn() net.Conn { return nil } -func (c *wsPeerConnP2PImpl) RemoteAddr() net.Addr { +func (c *wsPeerConnP2P) RemoteAddr() net.Addr { netaddr, err := mnet.ToNetAddr(c.stream.Conn().RemoteMultiaddr()) if err != nil { logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err)