Skip to content

Commit

Permalink
node: clear new p2p net handlers on fast catchup (algorand#6127)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Sep 13, 2024
1 parent 90353e5 commit 2b6e018
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 25 deletions.
10 changes: 1 addition & 9 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,7 @@ func (handler *TxHandler) Start() {

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
{
Tag: protocol.TxnTag,
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
MessageHandler: struct {
network.ValidateHandleFunc
}{
network.ValidateHandleFunc(handler.validateIncomingTxMessage),
},
},
{Tag: protocol.TxnTag, MessageHandler: network.ValidateHandleFunc(handler.validateIncomingTxMessage)},
})

handler.backlogWg.Add(2)
Expand Down
4 changes: 2 additions & 2 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type GossipNode interface {
// Currently used as p2p pubsub topic validators.
RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler)

// ClearProcessors deregisters all the existing message processors.
ClearProcessors()
// ClearValidatorHandlers deregisters all the existing message processors.
ClearValidatorHandlers()

// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
Expand Down
8 changes: 4 additions & 4 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ func (n *HybridP2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageVal
n.wsNetwork.RegisterValidatorHandlers(dispatch)
}

// ClearProcessors deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearProcessors() {
n.p2pNetwork.ClearProcessors()
n.wsNetwork.ClearProcessors()
// ClearValidatorHandlers deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearValidatorHandlers() {
n.p2pNetwork.ClearValidatorHandlers()
n.wsNetwork.ClearValidatorHandlers()
}

// GetHTTPClient returns a http.Client with a suitable for the network Transport
Expand Down
9 changes: 6 additions & 3 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ func MakeHTTPServer(streamHost host.Host) *HTTPServer {
p2phttpMux: mux.NewRouter(),
}
// libp2phttp server requires either explicit ListenAddrs or streamHost.Addrs() to be non-empty.
// If streamHost.Addrs() is empty, we will listen on all interfaces
// If streamHost.Addrs() is empty (that happens when NetAddress is set to ":0" and private address filtering is automatically enabled),
// we will listen on localhost to satisfy libp2phttp.Host.Serve() requirements.
// A side effect is it actually starts listening on interfaces listed in ListenAddrs and as go-libp2p v0.33.2
// there is no other way to have libp2phttp server running AND to have streamHost.Addrs() filtered.
if len(streamHost.Addrs()) == 0 {
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen all interfaces", streamHost.ID())
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen localhost interface to satisfy libp2phttp.Host.Serve ", streamHost.ID())
httpServer.ListenAddrs = []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/0.0.0.0/tcp/0/http"),
multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http"),
}
httpServer.InsecureAllowHTTP = true
}
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidator
n.handler.RegisterValidatorHandlers(dispatch)
}

// ClearProcessors deregisters all the existing message handlers.
func (n *P2PNetwork) ClearProcessors() {
// ClearValidatorHandlers deregisters all the existing message handlers.
func (n *P2PNetwork) ClearValidatorHandlers() {
n.handler.ClearValidatorHandlers([]Tag{})
}

Expand Down
2 changes: 1 addition & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func TestP2PRelay(t *testing.T) {
counter.Store(0)
var loggedMsgs [][]byte
counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs)
netA.ClearProcessors()
netA.ClearValidatorHandlers()
netA.RegisterValidatorHandlers(counterHandler)

for i := 0; i < expectedMsgs/2; i++ {
Expand Down
4 changes: 2 additions & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ func (wn *WebsocketNetwork) ClearHandlers() {
func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
}

// ClearProcessors deregisters all the existing message handlers.
func (wn *WebsocketNetwork) ClearProcessors() {
// ClearValidatorHandlers deregisters all the existing message handlers.
func (wn *WebsocketNetwork) ClearValidatorHandlers() {
}

func (wn *WebsocketNetwork) setHeaders(header http.Header) {
Expand Down
2 changes: 2 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func (node *AlgorandFullNode) Stop() {
}()

node.net.ClearHandlers()
node.net.ClearValidatorHandlers()
if !node.config.DisableNetworking {
node.net.Stop()
}
Expand Down Expand Up @@ -1218,6 +1219,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
node.waitMonitoringRoutines()
}()
node.net.ClearHandlers()
node.net.ClearValidatorHandlers()
node.stateProofWorker.Stop()
node.txHandler.Stop()
node.agreementService.Shutdown()
Expand Down
47 changes: 45 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,11 @@ func TestDefaultResourcePaths(t *testing.T) {
log := logging.Base()

n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
require.NoError(t, err)

n.Start()
defer n.Stop()

require.NoError(t, err)

// confirm genesis dir exists in the data dir, and that resources exist in the expected locations
require.DirExists(t, filepath.Join(testDirectory, genesis.ID()))

Expand Down Expand Up @@ -1073,3 +1072,47 @@ func TestNodeP2PRelays(t *testing.T) {
return len(nodes[2].net.GetPeers(network.PeersPhonebookRelays)) == 2
}, 80*time.Second, 1*time.Second)
}

// TestNodeSetCatchpointCatchupMode checks node can handle services restart for fast catchup correctly
func TestNodeSetCatchpointCatchupMode(t *testing.T) {
partitiontest.PartitionTest(t)

testDirectory := t.TempDir()

genesis := bookkeeping.Genesis{
SchemaID: "gen",
Proto: protocol.ConsensusCurrentVersion,
Network: config.Devtestnet,
FeeSink: sinkAddr.String(),
RewardsPool: poolAddr.String(),
}
log := logging.TestingLog(t)
cfg := config.GetDefaultLocal()

tests := []struct {
name string
enableP2P bool
}{
{"WS node", false},
{"P2P node", true},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cfg.EnableP2P = test.enableP2P

n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
defer n.Stop()

// "start" catchpoint catchup => close services
outCh := n.SetCatchpointCatchupMode(true)
<-outCh
// "stop" catchpoint catchup => resume services
outCh = n.SetCatchpointCatchupMode(false)
<-outCh
})
}
}

0 comments on commit 2b6e018

Please sign in to comment.