Skip to content

Commit

Permalink
Merge pull request #91 from orbs-network/bugfix/node-sync-committee-p…
Browse files Browse the repository at this point in the history
…olling

polling request committee on leanHelix side
  • Loading branch information
gadcl authored Mar 22, 2020
2 parents 9d97f05 + 3f6b23c commit ba90b6f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
58 changes: 44 additions & 14 deletions services/leanhelixterm/leanhelix_term.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ import (
"github.com/orbs-network/lean-helix-go/spec/types/go/protocol"
"github.com/orbs-network/lean-helix-go/state"
"github.com/orbs-network/scribe/log"
"github.com/pkg/errors"
"math"
"time"
)

const CallCommitteeContractInterval = 200 * time.Millisecond

type LeanHelixTerm struct {
*ConsensusMessagesFilter
termInCommittee *termincommittee.TermInCommittee
Expand All @@ -35,41 +39,66 @@ func NewLeanHelixTerm(ctx context.Context, logger logger.LHLogger, config *inter
myMemberId := config.Membership.MyMemberId()
messageFactory := messagesfactory.NewMessageFactory(config.InstanceId, config.KeyManager, myMemberId, randomSeed)

committeeMembers, err := requestOrderedCommittee(state, blockHeight, randomSeed, config)
committeeMembers, err := requestOrderedCommitteePersist(state, blockHeight, randomSeed, config, logger)
if err != nil {
logger.Info("OUT OF COMMITTEE WITH ERROR RECEIVING COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, error=%s", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, err)
return termNotInCommittee(randomSeed, config)
logger.Info("ERROR RECEIVING COMMITTEE: H=%d, error=%s", blockHeight, err)
}

isParticipating := isParticipatingInCommittee(myMemberId, committeeMembers)
logger.Debug("RECEIVED COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating)
logger.ConsensusTrace("got committee for the current consensus round", nil, log.StringableSlice("committee", committeeMembers))
// on ctx terminated requestOrderedCommitteePersist returns nil committee
isParticipating := isParticipatingInTerm(myMemberId, committeeMembers)

if !isParticipating {
logger.Info("OUT OF COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating)
logger.Debug("OUT OF COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating)
return termNotInCommittee(randomSeed, config)
}

logger.Debug("RECEIVED COMMITTEE: H=%d, prevBlockProof=%s, randomSeed=%d, members=%s, isParticipating=%t", blockHeight, printShortBlockProofBytes(prevBlockProofBytes), randomSeed, termincommittee.ToCommitteeMembersStr(committeeMembers), isParticipating)
logger.ConsensusTrace("got committee for the current consensus round", nil, log.StringableSlice("committee", committeeMembers))

termInCommittee := termincommittee.NewTermInCommittee(logger, config, state, messageFactory, electionTrigger, committeeMembers, prevBlock, canBeFirstLeader, CommitsToProof(logger, config.KeyManager, onCommit))
return &LeanHelixTerm{
ConsensusMessagesFilter: NewConsensusMessagesFilter(termInCommittee, config.KeyManager, randomSeed),
termInCommittee: termInCommittee,
}
}

func requestOrderedCommittee(s *state.State, blockHeight primitives.BlockHeight, randomSeed uint64, config *interfaces.Config) ([]primitives.MemberId, error) {
func requestOrderedCommitteePersist(s *state.State, blockHeight primitives.BlockHeight, randomSeed uint64, config *interfaces.Config, logger logger.LHLogger) ([]primitives.MemberId, error) {
const maxView = primitives.View(math.MaxUint64)
ctx, err := s.Contexts.For(state.NewHeightView(blockHeight, maxView)) // term-level context
if err != nil {
return nil, err
}
committeeMembers, err := config.Membership.RequestOrderedCommittee(ctx, blockHeight, randomSeed)
if err != nil {
return nil, err
logger.Debug("Polling RequestOrderedCommittee: H=%d, interval-between-attempts=%d", blockHeight, CallCommitteeContractInterval)

attempts := 1
for {

// exit on term update (node sync) or system shutdown
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "requestOrderedCommitteePersist: context terminated")
}

committeeMembers, err := config.Membership.RequestOrderedCommittee(ctx, blockHeight, randomSeed)
if err == nil {
return committeeMembers, nil
}

// log every 500 failures
if attempts%500 == 1 {
if ctx.Err() == nil { // this may fail rightfully on graceful shutdown (ctx.Done), we don't want to report an error in this case
logger.Info("requestOrderedCommitteePersist: cannot get ordered committee #attempts=%d, error=%s", attempts, err)
}
}

// sleep or wait for ctx done, whichever comes first
sleepOrShutdown, cancel := context.WithTimeout(ctx, CallCommitteeContractInterval)
<-sleepOrShutdown.Done()
cancel()

attempts++
}
return committeeMembers, nil
}


func termNotInCommittee(randomSeed uint64, config *interfaces.Config) *LeanHelixTerm {
return &LeanHelixTerm{
ConsensusMessagesFilter: NewConsensusMessagesFilter(nil, config.KeyManager, randomSeed),
Expand All @@ -84,7 +113,7 @@ func (lht *LeanHelixTerm) Dispose() {
}
}

func isParticipatingInCommittee(myMemberId primitives.MemberId, committeeMembers []primitives.MemberId) bool {
func isParticipatingInTerm(myMemberId primitives.MemberId, committeeMembers []primitives.MemberId) bool {
for _, committeeMember := range committeeMembers {
if myMemberId.Equal(committeeMember) {
return true
Expand All @@ -99,3 +128,4 @@ func printShortBlockProofBytes(b []byte) string {
}
return fmt.Sprintf("%s..%s", hex.EncodeToString(b[:6]), hex.EncodeToString(b[len(b)-6:]))
}

6 changes: 3 additions & 3 deletions services/leanhelixterm/leanhelix_term_participating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestParticipating(t *testing.T) {
memberId2 := primitives.MemberId("Member 2")
memberId3 := primitives.MemberId("Member 3")
committeeMembers := []primitives.MemberId{myMemberId, memberId1, memberId2, memberId3}
actual := isParticipatingInCommittee(myMemberId, committeeMembers)
actual := isParticipatingInTerm(myMemberId, committeeMembers)
require.True(t, actual)
}

Expand All @@ -28,7 +28,7 @@ func TestParticipatingLastInList(t *testing.T) {
memberId2 := primitives.MemberId("Member 2")
memberId3 := primitives.MemberId("Member 3")
committeeMembers := []primitives.MemberId{memberId1, memberId2, memberId3, myMemberId}
actual := isParticipatingInCommittee(myMemberId, committeeMembers)
actual := isParticipatingInTerm(myMemberId, committeeMembers)
require.True(t, actual)
}

Expand All @@ -38,6 +38,6 @@ func TestNotParticipating(t *testing.T) {
memberId2 := primitives.MemberId("Member 2")
memberId3 := primitives.MemberId("Member 3")
committeeMembers := []primitives.MemberId{memberId1, memberId2, memberId3}
actual := isParticipatingInCommittee(myMemberId, committeeMembers)
actual := isParticipatingInTerm(myMemberId, committeeMembers)
require.False(t, actual)
}

0 comments on commit ba90b6f

Please sign in to comment.