From 1c0fbba704f368eab329858446cbd6c9476f2f96 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Mon, 22 Jul 2024 11:49:41 +0200 Subject: [PATCH] Tolerate send errors on roll call (unless consesus is required) --- consensus/pbft/messaging.go | 36 +++++++++++-------------------- node/cluster.go | 4 ++-- node/head_execute.go | 6 +++++- node/message.go | 42 ++++++++++++++++++++++++++++++------- 4 files changed, 53 insertions(+), 35 deletions(-) diff --git a/consensus/pbft/messaging.go b/consensus/pbft/messaging.go index 31350bfd..e412629b 100644 --- a/consensus/pbft/messaging.go +++ b/consensus/pbft/messaging.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "github.com/hashicorp/go-multierror" @@ -45,48 +44,37 @@ func (r *Replica) broadcast(msg interface{}) error { ctx, cancel := context.WithTimeout(context.Background(), r.cfg.NetworkTimeout) defer cancel() - var ( - wg sync.WaitGroup - multierr *multierror.Error - lock sync.Mutex - ) - + var errGroup multierror.Group for _, target := range r.peers { + target := target + // Skip self. if target == r.id { continue } - wg.Add(1) - // Send concurrently to everyone. - go func(peer peer.ID) { - defer wg.Done() + errGroup.Go(func() error { // NOTE: We could potentially retry sending if we fail once. On the other hand, somewhat unlikely they're // back online split second later. - - err := r.host.SendMessageOnProtocol(ctx, peer, payload, r.protocolID) + err := r.host.SendMessageOnProtocol(ctx, target, payload, r.protocolID) if err != nil { - - lock.Lock() - defer lock.Unlock() - - multierr = multierror.Append(multierr, err) + return fmt.Errorf("peer send error (peer: %v): %w", target.String(), err) } - }(target) - } - wg.Wait() + return nil + }) + } // If all went well, just return. - sendErr := multierr.ErrorOrNil() - if sendErr == nil { + sendErr := errGroup.Wait() + if sendErr.ErrorOrNil() == nil { return nil } // Warn if we had more send errors than we bargained for. - errCount := uint(multierr.Len()) + errCount := uint(sendErr.Len()) if errCount > r.f { r.log.Warn().Uint("f", r.f).Uint("errors", errCount).Msg("broadcast error count higher than pBFT f value") } diff --git a/node/cluster.go b/node/cluster.go index fe0cb7b4..65a347aa 100644 --- a/node/cluster.go +++ b/node/cluster.go @@ -78,7 +78,7 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee } // Request execution from peers. - err := n.sendToMany(ctx, replicas, reqCluster) + err := n.sendToMany(ctx, replicas, reqCluster, true) if err != nil { return fmt.Errorf("could not send cluster formation request to peers: %w", err) } @@ -142,7 +142,7 @@ func (n *Node) disbandCluster(requestID string, replicas []peer.ID) error { ctx, cancel := context.WithTimeout(context.Background(), consensusClusterSendTimeout) defer cancel() - err := n.sendToMany(ctx, replicas, msgDisband) + err := n.sendToMany(ctx, replicas, msgDisband, true) if err != nil { return fmt.Errorf("could not send cluster disband request (request: %s): %w", requestID, err) } diff --git a/node/head_execute.go b/node/head_execute.go index c8329530..1116812e 100644 --- a/node/head_execute.go +++ b/node/head_execute.go @@ -129,7 +129,11 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re } } - err = n.sendToMany(ctx, reportingPeers, reqExecute) + err = n.sendToMany(ctx, + reportingPeers, + reqExecute, + consensusRequired(consensusAlgo), // If we're using consensus, try to reach all peers. + ) if err != nil { return codes.Error, nil, cluster, fmt.Errorf("could not send execution request to peers (function: %s, request: %s): %w", req.FunctionID, requestID, err) } diff --git a/node/message.go b/node/message.go index c94b2e41..655e9de7 100644 --- a/node/message.go +++ b/node/message.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" + "github.com/hashicorp/go-multierror" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -63,8 +64,8 @@ func (n *Node) send(ctx context.Context, to peer.ID, msg blockless.Message) erro return nil } -// sendToMany serializes the message and sends it to a number of peers. It aborts on any error. -func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message) error { +// sendToMany serializes the message and sends it to a number of peers. `requireAll` dictates how we treat partial errors. +func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message, requireAll bool) error { // Serialize the message. payload, err := json.Marshal(msg) @@ -72,15 +73,40 @@ func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Me return fmt.Errorf("could not encode record: %w", err) } + var errGroup multierror.Group for i, peer := range peers { - // Send message. - err = n.host.SendMessage(ctx, peer, payload) - if err != nil { - return fmt.Errorf("could not send message to peer (id: %v, peer %d out of %d): %w", peer, i, len(peers), err) - } + i := i + peer := peer + + errGroup.Go(func() error { + err := n.host.SendMessage(ctx, peer, payload) + if err != nil { + return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i, len(peers), peer.String(), err) + } + + return nil + }) } - return nil + retErr := errGroup.Wait() + if retErr == nil || len(retErr.Errors) == 0 { + // If everything succeeded => ok. + return nil + } + + switch len(retErr.Errors) { + case len(peers): + // If everything failed => error. + return fmt.Errorf("all sends failed: %w", retErr) + + default: + // Some sends failed - do as requested by `requireAll`. + if requireAll { + return fmt.Errorf("some sends failed: %w", retErr) + } + + return nil + } } func (n *Node) publish(ctx context.Context, msg blockless.Message) error {