Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into track-incentive-c…
Browse files Browse the repository at this point in the history
…andidates
  • Loading branch information
cce committed Aug 28, 2024
2 parents 97d0bcf + 4990077 commit 55d5068
Show file tree
Hide file tree
Showing 18 changed files with 371 additions and 107 deletions.
6 changes: 5 additions & 1 deletion catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPee
}

network.SetUserAgentHeader(request.Header)
return peer.GetHTTPClient().Do(request)
httpClient := peer.GetHTTPClient()
if httpClient == nil {
return nil, fmt.Errorf("requestLedger: HTTPPeer %s has no http client", peer.GetAddress())
}
return httpClient.Do(request)
}

func (lf *ledgerFetcher) headLedger(ctx context.Context, peer network.Peer, round basics.Round) error {
Expand Down
6 changes: 5 additions & 1 deletion catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ func (uf *universalBlockFetcher) fetchBlock(ctx context.Context, round basics.Ro
}
address = fetcherClient.address()
} else if httpPeer, validHTTPPeer := peer.(network.HTTPPeer); validHTTPPeer {
httpClient := httpPeer.GetHTTPClient()
if httpClient == nil {
return nil, nil, time.Duration(0), fmt.Errorf("fetchBlock: HTTPPeer %s has no http client", httpPeer.GetAddress())
}
fetcherClient := &HTTPFetcher{
peer: httpPeer,
rootURL: httpPeer.GetAddress(),
net: uf.net,
client: httpPeer.GetHTTPClient(),
client: httpClient,
log: uf.log,
config: &uf.config}
fetchedBuf, err = fetcherClient.getBlockBytes(ctx, round)
Expand Down
18 changes: 18 additions & 0 deletions cmd/updater/systemd-setup-user.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,27 @@ setup_user() {
sed -e s,@@BINDIR@@,"$bindir", "${SCRIPTPATH}/[email protected]" \
> "$homedir/.config/systemd/user/[email protected]"

if [[ ${HOSTMODE} == true ]]; then
echo "[INFO] Hosted mode - replacing algod with algoh"
sed -i 's/algod/algoh/g' "$homedir/.config/systemd/user/[email protected]"
fi

systemctl --user daemon-reload
}

HOSTMODE=false
while getopts H opt; do
case $opt in
H)
HOSTMODE=true
;;
?)
echo "Invalid option: -${OPTARG}"
exit 1
;;
esac
done
shift $((OPTIND-1))

if [ "$#" != 1 ]; then
echo "Usage: $0 username"
Expand Down
19 changes: 19 additions & 0 deletions cmd/updater/systemd-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,28 @@ setup_root() {
sed ${sedargs} "${SCRIPTPATH}/[email protected]" \
> /lib/systemd/system/[email protected]

if [[ ${HOSTMODE} == true ]]; then
echo "[INFO] Hosted mode - replacing algod with algoh"
sed -i 's/algod/algoh/g' /lib/systemd/system/[email protected]
fi

systemctl daemon-reload
}

HOSTMODE=false
while getopts H opt; do
case $opt in
H)
HOSTMODE=true
;;
?)
echo "Invalid option: -${OPTARG}"
exit 1
;;
esac
done
shift $((OPTIND-1))

if [ "$#" != 2 ] && [ "$#" != 3 ]; then
echo "Usage: $0 username group [bindir]"
exit 1
Expand Down
149 changes: 149 additions & 0 deletions network/README-P2P.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# P2P Network implementation overview

Refer to [p2p sub-package overview](./p2p/README.md) for details about p2p sub-components.

`P2PNetwork` implements the `GossipNode` interface similarly to `WsNetwork`. Both use
the same peer connection management and message broadcast functions but different
transport: lip2p-managed connections and HTTP + WebSocket, respectively.
`P2PNetwork` and `WsNetwork` require `config.NetAddress` to be set in order to start a server.

In addition, `HybridNetwork` is an aggregate of `P2PNetwork` and `WsNetwork` allowing a node
to interact over both networks. In the case of hybrid operation, both `config.P2PNetAddress` and
`config.NetAddress` are used.

## General design

`P2PNetwork` follows the `WsNetwork` approach for peers management and message handling:
- `msgHandler` used process or route the network protocol messages to external handlers
(for example, transaction handler or agreement service)
- `broadcaster` implementing the broadcast functionality (see below)
- mesh thread to maintain `GossipFanout` number of outgoing peers
- HTTP Server for external HTTP services (block, catchpoints)
- `OnNetworkAdvance` listener to react on round advancing

A key difference is that `P2PNetwork` uses `go-libp2p-pubsub` for TX message handling.
Upon start it subscribes to `/algo/tx/0.1.0` topic and publishes TX messages as needed.
The `pubsub` library divides message handling into two stages: validation and processing. Based on
the validation result, a message is either discarded or accepted for further
broadcasting to other peers. This necessitates having separate handlers for TX messages
in `TxHandler`, as we must synchronously determine whether a transaction group is valid:
- can't ignore fast and broadcast later - will be rejected as a seen message
- can't accept fast to prevent invalid/expired transactions broadcasting

## Major Components

### HTTP Services

`P2PNetwork` uses libp2p's `http` submodule to handle HTTP traffic over libp2p-managed connection.
It is `http.Handler`-compatible so that service handlers are registered the same way as for `WsNetwork`.

### Phonebook and Peerstore and peer classes

Originally phonebook was designed as an address registry holding permanent (`-p` cli option
or `phonebook.json` extra configuration file) and dynamic (SRV DNS records) entries.
These entries later can be later retrieved by a peer role
(`PhoneBookEntryRelayRole` or `PhoneBookEntryArchivalRole`).
A new `PeerStore` (built on top of `libp2p.Peerstore`) resembles the original `Phonebook`
by strictly implementing some of its methods and has the remaining `Phonebook`'s methods
with a slightly different signature - `string` vs `peer.AddrInfo` for address representation.
The main issue is that entries in `PeerStore` are identified by `PeerID`
and each peer might have multiple addresses (versus the original WS peers with the only one
`host:port` connectivity option.)

Both P2PNetwork and WsNetwork have an extra level of peer classification on top of two phonebook's
classes: `PeersConnectedOut`, `PeersConnectedIn`, `PeersPhonebookRelays`, `PeersPhonebookArchivalNodes`.
This allows network clients to be more precise on peers set they want to work with. For example,
ledger service wants `PeersPhonebookArchivalNodes`, and transaction syncer - `PeersConnectedOut`.


### wsPeer

Peers are created in `wsStreamHandler` that is called for both incoming and outgoing connections
(and streams). `incoming` flag is set to true for incoming connection.
At the very beginning of the `wsStreamHandler` one byte read/write happens in order to make sure:
- Stream is operable
- A placeholder for a handshake where some meta-data can be exchanged

Each peer gets a read channel `handler.readBuffer` where it enqueues incoming messages for routing
to appropriate handler.

Connected peers are maintained as a `wsPeers` map similarly to the `WsNetwork`.
The main difference between `P2PNetwork` and `WsNetwork` is `http.Client`. Because wsPeers operate
over the multiplexed streams in libp2p-managed connection, a plain `http.Client` would not be able
to connect to a p2p HTTP server. This requires the `wsPeer` constructed in `P2PNetwork` to have a special
libp2p-streams compatible `http.Client` produced by `MakeHTTPClientWithRateLimit` helper method.
It implements a rate-limiting approach similar to the regular http clients from `WsNetwork`.

### Broadcaster

`msgBroadcaster` encapsulates a shared broadcasting logic: priority vs bulk messages (and queues),
data preparation, peers retrieving. Broadcast requests eventually hits
`peer.writeNonBlockMsgs` -> `peer.writeLoopSendMsg` -> `conn.WriteMessage`.
See the diagram denoting the broadcast data flow.

```mermaid
graph LR
p2pnet[P2PNetwork]
wsnet[WsNetwork]
B[broadcaster]
p2pnet & wsnet --> B
subgraph "wsPeer"
direction LR
writeNonBlockMsgs
Conn[conn.WriteMessage]
subgraph "writeLoop"
writeLoopSendMsg
end
writeNonBlockMsgs --> writeLoop
writeLoopSendMsg --> Conn
end
B --> writeNonBlockMsgs
Conn --> WMP2P & WMWS
subgraph "wsPeerConnP2P"
WMP2P[WriteMessage]
end
subgraph "websocket"
WMWS[WriteMessage]
end
subgraph "libp2p"
stream.Write
end
WMP2P --> libp2p
```

### DHT and Capabilities discovery

DHT is controlled by the `EnableDHTProviders` configuration option and the capabilities
exposed by a node. These capabilities include:
- `archival`: a listening node with `Archival` config flag set
- `catchpointStoring`: a listening node configured to store catchpoints
- `gossip`: a listening node with `EnableGossipService` config flag set

When the `P2PNetwork` starts, the node begins advertising its capabilities by running
a background goroutine. By default, the underlying DHT implementation pulls bootstrap nodes from
a peer store and attempts to connect immediately, which is not how go-algorand services operate.
To address this, a new `bootstrapper` abstraction has been added to control bootstrap peer
access using the DHT's `BootstrapFunc` mechanism. The callback function returns empty bootstrap
peers until the `P2PNetwork` starts.

### Net identity based peers deduplication

`WsNetwork` net identity was slightly extended to allow ws and p2p nodes cross-check
when running in a hybrid mode:
- `identityTracker` instance is shared between `WsNetwork` and `P2PNetwork`
- identity schema supplied to the `WsNetwork` uses a p2p-node private key based message signer
- `PublicAddress` must be set for hybrid nodes in order to operate properly

Using the changes above `identityTracker` is able to deduplicate `WsNetwork` peer if it ends up
to be hybrid node already connected to via `P2PNetwork` and other way around.
21 changes: 9 additions & 12 deletions network/limitcaller/rateLimitingTransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,16 @@ var ErrConnectionQueueingTimeout = errors.New("rateLimitingTransport: queueing t
// according to the entries in the phonebook.
func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, dialer *Dialer, maxIdleConnsPerHost int) RateLimitingTransport {
defaultTransport := http.DefaultTransport.(*http.Transport)
return RateLimitingTransport{
phonebook: phonebook,
innerTransport: &http.Transport{
Proxy: defaultTransport.Proxy,
DialContext: dialer.innerDialContext,
MaxIdleConns: defaultTransport.MaxIdleConns,
IdleConnTimeout: defaultTransport.IdleConnTimeout,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
},
queueingTimeout: queueingTimeout,
innerTransport := &http.Transport{
Proxy: defaultTransport.Proxy,
DialContext: dialer.innerDialContext,
MaxIdleConns: defaultTransport.MaxIdleConns,
IdleConnTimeout: defaultTransport.IdleConnTimeout,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
}
return MakeRateLimitingTransportWithRoundTripper(phonebook, queueingTimeout, innerTransport, nil, maxIdleConnsPerHost)
}

// MakeRateLimitingTransportWithRoundTripper creates a rate limiting http transport that would limit the requests rate
Expand Down
7 changes: 7 additions & 0 deletions network/netidentity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-deadlock"
)

// netidentity.go implements functionality to participate in an "Identity Challenge Exchange"
Expand Down Expand Up @@ -461,12 +462,14 @@ func (noopIdentityTracker) removeIdentity(p *wsPeer) {}
// mapping from PublicKeys exchanged in identity challenges to a peer
// this structure is not thread-safe; it is protected by wn.peersLock or p2p.wsPeersLock
type publicKeyIdentTracker struct {
mu deadlock.Mutex
peersByID map[crypto.PublicKey]*wsPeer
}

// NewIdentityTracker returns a new publicKeyIdentTracker
func NewIdentityTracker() *publicKeyIdentTracker {
return &publicKeyIdentTracker{
mu: deadlock.Mutex{},
peersByID: make(map[crypto.PublicKey]*wsPeer),
}
}
Expand All @@ -475,6 +478,8 @@ func NewIdentityTracker() *publicKeyIdentTracker {
// returns false if it was unable to load the peer into the given identity
// or true otherwise (if the peer was already there, or if it was added)
func (t *publicKeyIdentTracker) setIdentity(p *wsPeer) bool {
t.mu.Lock()
defer t.mu.Unlock()
existingPeer, exists := t.peersByID[p.identity]
if !exists {
// the identity is not occupied, so set it and return true
Expand All @@ -489,6 +494,8 @@ func (t *publicKeyIdentTracker) setIdentity(p *wsPeer) bool {
// removeIdentity removes the entry in the peersByID map if it exists
// and is occupied by the given peer
func (t *publicKeyIdentTracker) removeIdentity(p *wsPeer) {
t.mu.Lock()
defer t.mu.Unlock()
if t.peersByID[p.identity] == p {
delete(t.peersByID, p.identity)
}
Expand Down
Loading

0 comments on commit 55d5068

Please sign in to comment.