Skip to content

Commit

Permalink
WIP: P2PNetwork readme
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Aug 9, 2024
1 parent c1cd118 commit 13e8d67
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 12 deletions.
86 changes: 86 additions & 0 deletions network/README-P2P.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# P2P Network implementation overview

Refer to [p2p sub-package overview](./p2p/README.md) for details about p2p sub-components.

`P2PNetwork` implements the `GossipNode` interface similarly to `WsNetwork`. Both use
the same peer connection management and message broadcast functions but different
transport: lip2p-managed connections and HTTP + WebSocket, respectively.
`P2PNetwork` and `WsNetwork` require `config.NetAddress` to be set in order to start a server.

In addition, `HybridNetwork` is an aggregate of `P2PNetwork` and `WsNetwork` allowing a node
to interact over both networks. In the case of hybrid operation, both `config.P2PNetAddress` and
`config.NetAddress` are used.

## HTTP Services

`P2PNetwork` uses libp2p's `http` submodule to handle HTTP traffic over libp2p-managed connection.
It is `http.Handler`-compatible so that service handlers are registered the same way as for `WsNetwork`.

## Phonebook and Peerstore and peer classes

## WsPeer

Peers are created in `wsStreamHandler` that is called for both incoming and outgoing connections (and streams). `incoming` flag is set to true for incoming connection.
At the very beginning of the `wsStreamHandler` one byte read/write happens in order to make sure:
- Stream is operable
- A placeholder for a handshake where some meta-data can be exchanged

Connected peers are maintained as a `wsPeers` map similarly to the `WsNetwork`.

<!--
core peer, http client
-->

## Broadcaster

`msgBroadcaster` encapsulates a shared broadcasting logic: priority vs bulk messages (and queues),
data preparation, peers retrieving. Broadcast requests eventually hits `peer.writeNonBlockMsgs` -> `peer.writeLoopSendMsg` -> `conn.WriteMessage`. See the diagram denoting the broadcast data flow.

```mermaid
graph LR
p2pnet[P2PNetwork]
wsnet[WsNetwork]
B[broadcaster]
p2pnet & wsnet --> B
subgraph "wsPeer"
direction TB
writeNonBlockMsgs
Conn[conn.WriteMessage]
subgraph "writeLoop"
writeLoopSendMsg
end
writeNonBlockMsgs --> writeLoop
writeLoopSendMsg --> Conn
end
B --> writeNonBlockMsgs
Conn --> WMP2P & WMWS
subgraph "wsPeerConnP2P"
WMP2P[WriteMessage]
end
subgraph "websocket"
WMWS[WriteMessage]
end
subgraph "libp2p"
stream.Write
end
WMP2P --> libp2p
```

## DHT and Capabilities discovery

## Net identity

<!--
Changes to support P2P
-->
6 changes: 5 additions & 1 deletion network/p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ The underlying libp2p implementation abstracted as `p2p.Service` and initialized

`Host` is also used for p2p HTTP server and DHT Discovery service creation. It is also useful for unit testing. Note, `Host` is created with `NoListenAddrs` options that prevents automatic listening and networking until the `Service.Start()` is called. This follows others services design (including WsNetwork service).

### Connection and Stream direction

TBD

### Connection limiting

libp2p's `ResourceManager` is used to limit number of connections up tp `cfg.P2PIncomingConnectionsLimit`.
Expand Down Expand Up @@ -97,7 +101,7 @@ graph LR
end
P2P --> AdvCap
Cap --> P2P
Cap -.-> P2P
subgraph "libp2p"
Adv[Advertise]
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea
peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr)
wsp := &wsPeer{
wsPeerCore: peerCore,
conn: &wsPeerConnP2PImpl{stream: stream},
conn: &wsPeerConnP2P{stream: stream},
outgoing: !incoming,
identity: netIdentPeerID,
}
Expand Down Expand Up @@ -844,7 +844,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea

// peerRemoteClose called from wsPeer to report that it has closed
func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) {
remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer()
remotePeerID := peer.conn.(*wsPeerConnP2P).stream.Conn().RemotePeer()
n.wsPeersLock.Lock()
n.identityTracker.removeIdentity(peer)
delete(n.wsPeers, remotePeerID)
Expand Down
18 changes: 9 additions & 9 deletions network/p2pPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (
mnet "github.com/multiformats/go-multiaddr/net"
)

type wsPeerConnP2PImpl struct {
type wsPeerConnP2P struct {
stream network.Stream
}

func (c *wsPeerConnP2PImpl) RemoteAddrString() string {
func (c *wsPeerConnP2P) RemoteAddrString() string {
return c.stream.Conn().RemoteMultiaddr().String()
}

func (c *wsPeerConnP2PImpl) NextReader() (int, io.Reader, error) {
func (c *wsPeerConnP2P) NextReader() (int, io.Reader, error) {
// read length
var lenbuf [4]byte
_, err := io.ReadFull(c.stream, lenbuf[:])
Expand All @@ -54,7 +54,7 @@ func (c *wsPeerConnP2PImpl) NextReader() (int, io.Reader, error) {
return websocket.BinaryMessage, io.LimitReader(c.stream, int64(msglen)), nil
}

func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error {
func (c *wsPeerConnP2P) WriteMessage(_ int, buf []byte) error {
// simple message framing:
// 1. write encoding of the length
var lenbuf [4]byte
Expand All @@ -69,23 +69,23 @@ func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error {
}

// Do nothing for now since this doesn't actually close the connection just sends the close message
func (c *wsPeerConnP2PImpl) CloseWithMessage([]byte, time.Time) error {
func (c *wsPeerConnP2P) CloseWithMessage([]byte, time.Time) error {
return nil
}

func (c *wsPeerConnP2PImpl) SetReadLimit(int64) {}
func (c *wsPeerConnP2P) SetReadLimit(int64) {}

func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error {
func (c *wsPeerConnP2P) CloseWithoutFlush() error {
err := c.stream.Close()
if err != nil && err != yamux.ErrStreamClosed && err != yamux.ErrSessionShutdown && err != yamux.ErrStreamReset {
return err
}
return nil
}

func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil }
func (c *wsPeerConnP2P) UnderlyingConn() net.Conn { return nil }

func (c *wsPeerConnP2PImpl) RemoteAddr() net.Addr {
func (c *wsPeerConnP2P) RemoteAddr() net.Addr {
netaddr, err := mnet.ToNetAddr(c.stream.Conn().RemoteMultiaddr())
if err != nil {
logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err)
Expand Down

0 comments on commit 13e8d67

Please sign in to comment.