Skip to content

Commit

Permalink
Tolerate send errors on roll call (unless consesus is required)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum committed Jul 22, 2024
1 parent 75fcc19 commit 1c0fbba
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 35 deletions.
36 changes: 12 additions & 24 deletions consensus/pbft/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/hashicorp/go-multierror"

Expand Down Expand Up @@ -45,48 +44,37 @@ func (r *Replica) broadcast(msg interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.NetworkTimeout)
defer cancel()

var (
wg sync.WaitGroup
multierr *multierror.Error
lock sync.Mutex
)

var errGroup multierror.Group
for _, target := range r.peers {
target := target

// Skip self.
if target == r.id {
continue
}

wg.Add(1)

// Send concurrently to everyone.
go func(peer peer.ID) {
defer wg.Done()
errGroup.Go(func() error {

// NOTE: We could potentially retry sending if we fail once. On the other hand, somewhat unlikely they're
// back online split second later.

err := r.host.SendMessageOnProtocol(ctx, peer, payload, r.protocolID)
err := r.host.SendMessageOnProtocol(ctx, target, payload, r.protocolID)
if err != nil {

lock.Lock()
defer lock.Unlock()

multierr = multierror.Append(multierr, err)
return fmt.Errorf("peer send error (peer: %v): %w", target.String(), err)
}
}(target)
}

wg.Wait()
return nil
})
}

// If all went well, just return.
sendErr := multierr.ErrorOrNil()
if sendErr == nil {
sendErr := errGroup.Wait()
if sendErr.ErrorOrNil() == nil {
return nil
}

// Warn if we had more send errors than we bargained for.
errCount := uint(multierr.Len())
errCount := uint(sendErr.Len())
if errCount > r.f {
r.log.Warn().Uint("f", r.f).Uint("errors", errCount).Msg("broadcast error count higher than pBFT f value")
}
Expand Down
4 changes: 2 additions & 2 deletions node/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee
}

// Request execution from peers.
err := n.sendToMany(ctx, replicas, reqCluster)
err := n.sendToMany(ctx, replicas, reqCluster, true)
if err != nil {
return fmt.Errorf("could not send cluster formation request to peers: %w", err)
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (n *Node) disbandCluster(requestID string, replicas []peer.ID) error {
ctx, cancel := context.WithTimeout(context.Background(), consensusClusterSendTimeout)
defer cancel()

err := n.sendToMany(ctx, replicas, msgDisband)
err := n.sendToMany(ctx, replicas, msgDisband, true)
if err != nil {
return fmt.Errorf("could not send cluster disband request (request: %s): %w", requestID, err)
}
Expand Down
6 changes: 5 additions & 1 deletion node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
}
}

err = n.sendToMany(ctx, reportingPeers, reqExecute)
err = n.sendToMany(ctx,
reportingPeers,
reqExecute,
consensusRequired(consensusAlgo), // If we're using consensus, try to reach all peers.
)
if err != nil {
return codes.Error, nil, cluster, fmt.Errorf("could not send execution request to peers (function: %s, request: %s): %w", req.FunctionID, requestID, err)
}
Expand Down
42 changes: 34 additions & 8 deletions node/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"

"github.com/hashicorp/go-multierror"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -63,24 +64,49 @@ func (n *Node) send(ctx context.Context, to peer.ID, msg blockless.Message) erro
return nil
}

// sendToMany serializes the message and sends it to a number of peers. It aborts on any error.
func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message) error {
// sendToMany serializes the message and sends it to a number of peers. `requireAll` dictates how we treat partial errors.
func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message, requireAll bool) error {

// Serialize the message.
payload, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("could not encode record: %w", err)
}

var errGroup multierror.Group
for i, peer := range peers {
// Send message.
err = n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("could not send message to peer (id: %v, peer %d out of %d): %w", peer, i, len(peers), err)
}
i := i
peer := peer

errGroup.Go(func() error {
err := n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i, len(peers), peer.String(), err)
}

return nil
})
}

return nil
retErr := errGroup.Wait()
if retErr == nil || len(retErr.Errors) == 0 {
// If everything succeeded => ok.
return nil
}

switch len(retErr.Errors) {
case len(peers):
// If everything failed => error.
return fmt.Errorf("all sends failed: %w", retErr)

default:
// Some sends failed - do as requested by `requireAll`.
if requireAll {
return fmt.Errorf("some sends failed: %w", retErr)
}

return nil
}
}

func (n *Node) publish(ctx context.Context, msg blockless.Message) error {
Expand Down

0 comments on commit 1c0fbba

Please sign in to comment.