Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchup: cleanup pipelinedFetch #5662

Merged
merged 9 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger,
proto := config.Consensus[protocol.ConsensusCurrentVersion]
genesis := make(map[basics.Address]basics.AccountData)
genesis[user] = basics.AccountData{
Status: basics.Online,
Status: basics.Offline,
MicroAlgos: basics.MicroAlgos{Raw: proto.MinBalance * 2000000},
}
genesis[sinkAddr] = basics.AccountData{
Status: basics.Online,
Status: basics.Offline,
MicroAlgos: basics.MicroAlgos{Raw: proto.MinBalance * 2000000},
}
genesis[poolAddr] = basics.AccountData{
Status: basics.Online,
Status: basics.Offline,
MicroAlgos: basics.MicroAlgos{Raw: proto.MinBalance * 2000000},
}

Expand Down
275 changes: 104 additions & 171 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Ledger interface {
IsWritingCatchpointDataFile() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
WaitMem(r basics.Round) chan struct{}
}

// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network.
Expand Down Expand Up @@ -244,17 +245,16 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin
// - If we couldn't fetch the block (e.g. if there are no peers available, or we've reached the catchupRetryLimit)
// - If the block is already in the ledger (e.g. if agreement service has already written it)
// - If the retrieval of the previous block was unsuccessful
func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, lookbackComplete chan bool, peerSelector *peerSelector) bool {
func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan struct{}, lookbackComplete chan struct{}, peerSelector *peerSelector, ctx context.Context) bool {
// If sync-ing this round is not intended, don't fetch it
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
return false
}
i := 0
hasLookback := false
for true {
for {
i++
select {
case <-s.ctx.Done():
case <-ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted", r)
return false
default:
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
psp, getPeerErr := peerSelector.getNextPeer()
if getPeerErr != nil {
s.log.Debugf("fetchAndWrite: was unable to obtain a peer to retrieve the block from")
break
return false
}
peer := psp.Peer

Expand All @@ -300,20 +300,15 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)

// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
// to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain
// for no reason.
if !hasLookback {
select {
case <-s.ctx.Done():
s.log.Infof("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger after failing once : %v", r, err)
return false
case hasLookback = <-lookbackComplete:
if !hasLookback {
s.log.Infof("fetchAndWrite(%d): lookback block doesn't exist, won't try to retrieve block again : %v", r, err)
return false
}
}
select {
case <-ctx.Done():
s.log.Infof("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger after failing once : %v", r, err)
return false
case <-lookbackComplete:
}
continue // retry the fetch
} else if block == nil || cert == nil {
Expand All @@ -338,18 +333,13 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
}

// make sure that we have the lookBack block that's required for authenticating this block
if !hasLookback {
select {
case <-s.ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r)
return false
case hasLookback = <-lookbackComplete:
if !hasLookback {
s.log.Warnf("fetchAndWrite(%v): lookback block doesn't exist, cannot authenticate new block", r)
return false
}
}
select {
case <-ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r)
return false
case <-lookbackComplete:
}

if s.cfg.CatchupVerifyCertificate() {
err = s.auth.Authenticate(block, cert)
if err != nil {
Expand All @@ -365,94 +355,71 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,

// Write to ledger, noting that ledger writes must be in order
select {
case <-s.ctx.Done():
case <-ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted while waiting to write to ledger", r)
return false
case prevFetchSuccess := <-prevFetchCompleteChan:
if prevFetchSuccess {
// make sure the ledger wrote enough of the account data to disk, since we don't want the ledger to hold a large amount of data in memory.
proto, err := s.ledger.ConsensusParams(r.SubSaturate(1))
if err != nil {
s.log.Errorf("fetchAndWrite(%d): Unable to determine consensus params for round %d: %v", r, r-1, err)
return false
}
ledgerBacklogRound := r.SubSaturate(basics.Round(proto.MaxBalLookback))
select {
case <-s.ledger.Wait(ledgerBacklogRound):
// i.e. round r-320 is no longer in the blockqueue, so it's account data is either being currently written, or it was already written.
case <-s.ctx.Done():
s.log.Debugf("fetchAndWrite(%d): Aborted while waiting for ledger to complete writing up to round %d", r, ledgerBacklogRound)
return false
}
case <-prevFetchCompleteChan:
// make sure the ledger wrote enough of the account data to disk, since we don't want the ledger to hold a large amount of data in memory.
proto, err := s.ledger.ConsensusParams(r.SubSaturate(1))
if err != nil {
s.log.Errorf("fetchAndWrite(%d): Unable to determine consensus params for round %d: %v", r, r-1, err)
return false
}
ledgerBacklogRound := r.SubSaturate(basics.Round(proto.MaxBalLookback))
select {
case <-s.ledger.Wait(ledgerBacklogRound):
// i.e. round r-320 is no longer in the blockqueue, so it's account data is either being currently written, or it was already written.
case <-s.ctx.Done():
s.log.Debugf("fetchAndWrite(%d): Aborted while waiting for ledger to complete writing up to round %d", r, ledgerBacklogRound)
return false
}

if s.cfg.CatchupVerifyTransactionSignatures() || s.cfg.CatchupVerifyApplyData() {
var vb *ledgercore.ValidatedBlock
vb, err = s.ledger.Validate(s.ctx, *block, s.blockValidationPool)
if err != nil {
if s.ctx.Err() != nil {
// if the context expired, just exit.
return false
}
if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
// the block was added to the ledger from elsewhere after fetching it here
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
return false
}
s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err)
if s.cfg.CatchupVerifyTransactionSignatures() || s.cfg.CatchupVerifyApplyData() {
var vb *ledgercore.ValidatedBlock
vb, err = s.ledger.Validate(s.ctx, *block, s.blockValidationPool)
if err != nil {
if s.ctx.Err() != nil {
// if the context expired, just exit.
return false
}
err = s.ledger.AddValidatedBlock(*vb, *cert)
} else {
err = s.ledger.AddBlock(*block, *cert)
}

if err != nil {
switch err.(type) {
case ledgercore.ErrNonSequentialBlockEval:
s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r)
return true
case ledgercore.BlockInLedgerError:
if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
// the block was added to the ledger from elsewhere after fetching it here
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
return false
case protocol.Error:
if !s.protocolErrorLogged {
logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err)
s.protocolErrorLogged = true
}
default:
s.log.Errorf("fetchAndWrite(%v): ledger write failed: %v", r, err)
}

s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err)
return false
}
s.log.Debugf("fetchAndWrite(%v): Wrote block to ledger", r)
return true
err = s.ledger.AddValidatedBlock(*vb, *cert)
} else {
err = s.ledger.AddBlock(*block, *cert)
}
s.log.Warnf("fetchAndWrite(%v): previous block doesn't exist (perhaps fetching block %v failed)", r, r-1)
return false
}
}
return false
}

type task func() basics.Round

func (s *Service) pipelineCallback(r basics.Round, thisFetchComplete chan bool, prevFetchCompleteChan chan bool, lookbackChan chan bool, peerSelector *peerSelector) func() basics.Round {
return func() basics.Round {
fetchResult := s.fetchAndWrite(r, prevFetchCompleteChan, lookbackChan, peerSelector)

// the fetch result will be read at most twice (once as the lookback block and once as the prev block, so we write the result twice)
thisFetchComplete <- fetchResult
thisFetchComplete <- fetchResult
if err != nil {
switch err.(type) {
case ledgercore.ErrNonSequentialBlockEval:
s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r)
return true
case ledgercore.BlockInLedgerError:
// the block was added to the ledger from elsewhere after fetching it here
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
return false
case protocol.Error:
if !s.protocolErrorLogged {
logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err)
s.protocolErrorLogged = true
}
default:
s.log.Errorf("fetchAndWrite(%v): ledger write failed: %v", r, err)
}

if !fetchResult {
s.log.Infof("pipelineCallback(%d): did not fetch or write the block", r)
return 0
return false
}
s.log.Debugf("fetchAndWrite(%v): Wrote block to ledger", r)
return true
}
return r
}
}

Expand All @@ -463,105 +430,71 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
parallelRequests = seedLookback
}

completed := make(chan basics.Round, parallelRequests)
taskCh := make(chan task, parallelRequests)
completed := make(map[basics.Round]chan bool)
var wg sync.WaitGroup

defer func() {
close(taskCh)
wg.Wait()
close(completed)
for _, ch := range completed {
close(ch)
}
}()

peerSelector := createPeerSelector(s.net, s.cfg, true)

if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable {
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from")
return
}

// Invariant: len(taskCh) + (# pending writes to completed) <= N
wg.Add(int(parallelRequests))
for i := uint64(0); i < parallelRequests; i++ {
go func() {
defer wg.Done()
for t := range taskCh {
completed <- t() // This write comes after a read from taskCh, so the invariant is preserved.
// Create a new context for canceling the pipeline if some block
// fetch fails along the way.
ctx, cancelCtx := context.WithCancel(s.ctx)
defer cancelCtx()

// firstRound is the first round we're waiting to fetch.
firstRound := s.ledger.NextRound()

// nextRound is the next round that we will issue a fetch for.
nextRound := firstRound

for {
for nextRound < firstRound+basics.Round(parallelRequests) {
if s.nextRoundIsNotSupported(nextRound) {
s.handleUnsupportedRound(nextRound)
return
}
}()
}

recentReqs := make([]chan bool, 0)
for i := 0; i < int(seedLookback); i++ {
// the fetch result will be read at most twice (once as the lookback block and once as the prev block, so we write the result twice)
reqComplete := make(chan bool, 2)
reqComplete <- true
reqComplete <- true
recentReqs = append(recentReqs, reqComplete)
}
done := make(chan bool, 1)
completed[nextRound] = done

from := s.ledger.NextRound()
nextRound := from
for ; nextRound < from+basics.Round(parallelRequests); nextRound++ {
// If the next round is not supported
if s.nextRoundIsNotSupported(nextRound) {
// We may get here when (1) The service starts
// and gets to an unsupported round. Since in
// this loop we do not wait for the requests
// to be written to the ledger, there is no
// guarantee that the unsupported round will be
// stopped in this case.

// (2) The unsupported round is detected in the
// "the rest" loop, but did not cancel because
// the last supported round was not yet written
// to the ledger.

// It is sufficient to check only in the first
// iteration, however checking in all in favor
// of code simplicity.
s.handleUnsupportedRound(nextRound)
break
}
wg.Add(1)
go func(r basics.Round) {
prev := s.ledger.WaitMem(r - 1)
seed := s.ledger.WaitMem(r.SubSaturate(basics.Round(seedLookback)))
done <- s.fetchAndWrite(r, prev, seed, peerSelector, ctx)
wg.Done()
}(nextRound)

currentRoundComplete := make(chan bool, 2)
// len(taskCh) + (# pending writes to completed) increases by 1
taskCh <- s.pipelineCallback(nextRound, currentRoundComplete, recentReqs[len(recentReqs)-1], recentReqs[len(recentReqs)-int(seedLookback)], peerSelector)
recentReqs = append(recentReqs[1:], currentRoundComplete)
}
nextRound++
}

completedRounds := make(map[basics.Round]bool)
// the rest
for {
select {
case round := <-completed:
if round == 0 {
// there was an error
case completedOK := <-completed[firstRound]:
delete(completed, firstRound)
firstRound++

if !completedOK {
// there was an error; defer will cancel the pipeline
return
}

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
return
}
completedRounds[round] = true
// fetch rounds we can validate
for completedRounds[nextRound-basics.Round(parallelRequests)] {
// If the next round is not supported
if s.nextRoundIsNotSupported(nextRound) {
s.handleUnsupportedRound(nextRound)
return
}
delete(completedRounds, nextRound)

currentRoundComplete := make(chan bool, 2)
// len(taskCh) + (# pending writes to completed) increases by 1
taskCh <- s.pipelineCallback(nextRound, currentRoundComplete, recentReqs[len(recentReqs)-1], recentReqs[0], peerSelector)
recentReqs = append(recentReqs[1:], currentRoundComplete)
nextRound++
}
case <-s.ctx.Done():
return
}
Expand Down
Loading