diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go index e24286e8df..983de01475 100644 --- a/catchup/fetcher_test.go +++ b/catchup/fetcher_test.go @@ -47,15 +47,15 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger, proto := config.Consensus[protocol.ConsensusCurrentVersion] genesis := make(map[basics.Address]basics.AccountData) genesis[user] = basics.AccountData{ - Status: basics.Online, + Status: basics.Offline, MicroAlgos: basics.MicroAlgos{Raw: proto.MinBalance * 2000000}, } genesis[sinkAddr] = basics.AccountData{ - Status: basics.Online, + Status: basics.Offline, MicroAlgos: basics.MicroAlgos{Raw: proto.MinBalance * 2000000}, } genesis[poolAddr] = basics.AccountData{ - Status: basics.Online, + Status: basics.Offline, MicroAlgos: basics.MicroAlgos{Raw: proto.MinBalance * 2000000}, } diff --git a/catchup/service.go b/catchup/service.go index 5c08e60e39..7870f0ded8 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -64,6 +64,7 @@ type Ledger interface { IsWritingCatchpointDataFile() bool Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error + WaitMem(r basics.Round) chan struct{} } // Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network. @@ -204,8 +205,8 @@ func (s *Service) SynchronizingTime() time.Duration { var errLedgerAlreadyHasBlock = errors.New("ledger already has block") // function scope to make a bunch of defer statements better -func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeeping.Block, cert *agreement.Certificate, ddur time.Duration, err error) { - ledgerWaitCh := s.ledger.Wait(r) +func (s *Service) innerFetch(ctx context.Context, r basics.Round, peer network.Peer) (blk *bookkeeping.Block, cert *agreement.Certificate, ddur time.Duration, err error) { + ledgerWaitCh := s.ledger.WaitMem(r) select { case <-ledgerWaitCh: // if our ledger already have this block, no need to attempt to fetch it. @@ -213,14 +214,12 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin default: } - ctx, cf := context.WithCancel(s.ctx) + ctx, cf := context.WithCancel(ctx) fetcher := makeUniversalBlockFetcher(s.log, s.net, s.cfg) defer cf() - stopWaitingForLedgerRound := make(chan struct{}) - defer close(stopWaitingForLedgerRound) go func() { select { - case <-stopWaitingForLedgerRound: + case <-ctx.Done(): case <-ledgerWaitCh: cf() } @@ -244,17 +243,16 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin // - If we couldn't fetch the block (e.g. if there are no peers available, or we've reached the catchupRetryLimit) // - If the block is already in the ledger (e.g. if agreement service has already written it) // - If the retrieval of the previous block was unsuccessful -func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, lookbackComplete chan bool, peerSelector *peerSelector) bool { +func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCompleteChan chan struct{}, lookbackComplete chan struct{}, peerSelector *peerSelector) bool { // If sync-ing this round is not intended, don't fetch it if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) { return false } i := 0 - hasLookback := false - for true { + for { i++ select { - case <-s.ctx.Done(): + case <-ctx.Done(): s.log.Debugf("fetchAndWrite(%v): Aborted", r) return false default: @@ -284,12 +282,12 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, psp, getPeerErr := peerSelector.getNextPeer() if getPeerErr != nil { s.log.Debugf("fetchAndWrite: was unable to obtain a peer to retrieve the block from") - break + return false } peer := psp.Peer // Try to fetch, timing out after retryInterval - block, cert, blockDownloadDuration, err := s.innerFetch(r, peer) + block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer) if err != nil { if err == errLedgerAlreadyHasBlock { @@ -300,20 +298,15 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, } s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i) peerSelector.rankPeer(psp, peerRankDownloadFailed) + // we've just failed to retrieve a block; wait until the previous block is fetched before trying again // to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain // for no reason. - if !hasLookback { - select { - case <-s.ctx.Done(): - s.log.Infof("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger after failing once : %v", r, err) - return false - case hasLookback = <-lookbackComplete: - if !hasLookback { - s.log.Infof("fetchAndWrite(%d): lookback block doesn't exist, won't try to retrieve block again : %v", r, err) - return false - } - } + select { + case <-ctx.Done(): + s.log.Infof("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger after failing once : %v", r, err) + return false + case <-lookbackComplete: } continue // retry the fetch } else if block == nil || cert == nil { @@ -338,18 +331,13 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, } // make sure that we have the lookBack block that's required for authenticating this block - if !hasLookback { - select { - case <-s.ctx.Done(): - s.log.Debugf("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r) - return false - case hasLookback = <-lookbackComplete: - if !hasLookback { - s.log.Warnf("fetchAndWrite(%v): lookback block doesn't exist, cannot authenticate new block", r) - return false - } - } + select { + case <-ctx.Done(): + s.log.Debugf("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r) + return false + case <-lookbackComplete: } + if s.cfg.CatchupVerifyCertificate() { err = s.auth.Authenticate(block, cert) if err != nil { @@ -365,94 +353,71 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, // Write to ledger, noting that ledger writes must be in order select { - case <-s.ctx.Done(): + case <-ctx.Done(): s.log.Debugf("fetchAndWrite(%v): Aborted while waiting to write to ledger", r) return false - case prevFetchSuccess := <-prevFetchCompleteChan: - if prevFetchSuccess { - // make sure the ledger wrote enough of the account data to disk, since we don't want the ledger to hold a large amount of data in memory. - proto, err := s.ledger.ConsensusParams(r.SubSaturate(1)) - if err != nil { - s.log.Errorf("fetchAndWrite(%d): Unable to determine consensus params for round %d: %v", r, r-1, err) - return false - } - ledgerBacklogRound := r.SubSaturate(basics.Round(proto.MaxBalLookback)) - select { - case <-s.ledger.Wait(ledgerBacklogRound): - // i.e. round r-320 is no longer in the blockqueue, so it's account data is either being currently written, or it was already written. - case <-s.ctx.Done(): - s.log.Debugf("fetchAndWrite(%d): Aborted while waiting for ledger to complete writing up to round %d", r, ledgerBacklogRound) - return false - } + case <-prevFetchCompleteChan: + // make sure the ledger wrote enough of the account data to disk, since we don't want the ledger to hold a large amount of data in memory. + proto, err := s.ledger.ConsensusParams(r.SubSaturate(1)) + if err != nil { + s.log.Errorf("fetchAndWrite(%d): Unable to determine consensus params for round %d: %v", r, r-1, err) + return false + } + ledgerBacklogRound := r.SubSaturate(basics.Round(proto.MaxBalLookback)) + select { + case <-s.ledger.Wait(ledgerBacklogRound): + // i.e. round r-320 is no longer in the blockqueue, so it's account data is either being currently written, or it was already written. + case <-s.ctx.Done(): + s.log.Debugf("fetchAndWrite(%d): Aborted while waiting for ledger to complete writing up to round %d", r, ledgerBacklogRound) + return false + } - if s.cfg.CatchupVerifyTransactionSignatures() || s.cfg.CatchupVerifyApplyData() { - var vb *ledgercore.ValidatedBlock - vb, err = s.ledger.Validate(s.ctx, *block, s.blockValidationPool) - if err != nil { - if s.ctx.Err() != nil { - // if the context expired, just exit. - return false - } - if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { - // the block was added to the ledger from elsewhere after fetching it here - // only the agreement could have added this block into the ledger, catchup is complete - s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) - return false - } - s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err) + if s.cfg.CatchupVerifyTransactionSignatures() || s.cfg.CatchupVerifyApplyData() { + var vb *ledgercore.ValidatedBlock + vb, err = s.ledger.Validate(s.ctx, *block, s.blockValidationPool) + if err != nil { + if s.ctx.Err() != nil { + // if the context expired, just exit. return false } - err = s.ledger.AddValidatedBlock(*vb, *cert) - } else { - err = s.ledger.AddBlock(*block, *cert) - } - - if err != nil { - switch err.(type) { - case ledgercore.ErrNonSequentialBlockEval: - s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r) - return true - case ledgercore.BlockInLedgerError: + if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { // the block was added to the ledger from elsewhere after fetching it here // only the agreement could have added this block into the ledger, catchup is complete s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) return false - case protocol.Error: - if !s.protocolErrorLogged { - logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err) - s.protocolErrorLogged = true - } - default: - s.log.Errorf("fetchAndWrite(%v): ledger write failed: %v", r, err) } - + s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err) return false } - s.log.Debugf("fetchAndWrite(%v): Wrote block to ledger", r) - return true + err = s.ledger.AddValidatedBlock(*vb, *cert) + } else { + err = s.ledger.AddBlock(*block, *cert) } - s.log.Warnf("fetchAndWrite(%v): previous block doesn't exist (perhaps fetching block %v failed)", r, r-1) - return false - } - } - return false -} - -type task func() basics.Round -func (s *Service) pipelineCallback(r basics.Round, thisFetchComplete chan bool, prevFetchCompleteChan chan bool, lookbackChan chan bool, peerSelector *peerSelector) func() basics.Round { - return func() basics.Round { - fetchResult := s.fetchAndWrite(r, prevFetchCompleteChan, lookbackChan, peerSelector) - - // the fetch result will be read at most twice (once as the lookback block and once as the prev block, so we write the result twice) - thisFetchComplete <- fetchResult - thisFetchComplete <- fetchResult + if err != nil { + switch err.(type) { + case ledgercore.ErrNonSequentialBlockEval: + s.log.Infof("fetchAndWrite(%d): no need to re-evaluate historical block", r) + return true + case ledgercore.BlockInLedgerError: + // the block was added to the ledger from elsewhere after fetching it here + // only the agreement could have added this block into the ledger, catchup is complete + s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) + return false + case protocol.Error: + if !s.protocolErrorLogged { + logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err) + s.protocolErrorLogged = true + } + default: + s.log.Errorf("fetchAndWrite(%v): ledger write failed: %v", r, err) + } - if !fetchResult { - s.log.Infof("pipelineCallback(%d): did not fetch or write the block", r) - return 0 + return false + } + s.log.Debugf("fetchAndWrite(%v): Wrote block to ledger", r) + return true } - return r } } @@ -463,82 +428,69 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { parallelRequests = seedLookback } - completed := make(chan basics.Round, parallelRequests) - taskCh := make(chan task, parallelRequests) + completed := make(map[basics.Round]chan bool) var wg sync.WaitGroup - defer func() { - close(taskCh) wg.Wait() - close(completed) + for _, ch := range completed { + close(ch) + } }() peerSelector := createPeerSelector(s.net, s.cfg, true) - if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable { s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from") return } - // Invariant: len(taskCh) + (# pending writes to completed) <= N - wg.Add(int(parallelRequests)) - for i := uint64(0); i < parallelRequests; i++ { - go func() { - defer wg.Done() - for t := range taskCh { - completed <- t() // This write comes after a read from taskCh, so the invariant is preserved. + // Create a new context for canceling the pipeline if some block + // fetch fails along the way. + ctx, cancelCtx := context.WithCancel(s.ctx) + defer cancelCtx() + + // firstRound is the first round we're waiting to fetch. + firstRound := s.ledger.NextRound() + + // nextRound is the next round that we will issue a fetch for. + nextRound := firstRound + + for { + for nextRound < firstRound+basics.Round(parallelRequests) { + if s.nextRoundIsNotSupported(nextRound) { + // We may get here when the service starts + // and gets to an unsupported round. Since in + // this loop we do not wait for the requests + // to be written to the ledger, there is no + // guarantee that the unsupported round will be + // stopped in this case. + s.handleUnsupportedRound(nextRound) + return } - }() - } - recentReqs := make([]chan bool, 0) - for i := 0; i < int(seedLookback); i++ { - // the fetch result will be read at most twice (once as the lookback block and once as the prev block, so we write the result twice) - reqComplete := make(chan bool, 2) - reqComplete <- true - reqComplete <- true - recentReqs = append(recentReqs, reqComplete) - } + done := make(chan bool, 1) + completed[nextRound] = done - from := s.ledger.NextRound() - nextRound := from - for ; nextRound < from+basics.Round(parallelRequests); nextRound++ { - // If the next round is not supported - if s.nextRoundIsNotSupported(nextRound) { - // We may get here when (1) The service starts - // and gets to an unsupported round. Since in - // this loop we do not wait for the requests - // to be written to the ledger, there is no - // guarantee that the unsupported round will be - // stopped in this case. - - // (2) The unsupported round is detected in the - // "the rest" loop, but did not cancel because - // the last supported round was not yet written - // to the ledger. - - // It is sufficient to check only in the first - // iteration, however checking in all in favor - // of code simplicity. - s.handleUnsupportedRound(nextRound) - break - } + wg.Add(1) + go func(r basics.Round) { + prev := s.ledger.WaitMem(r - 1) + seed := s.ledger.WaitMem(r.SubSaturate(basics.Round(seedLookback))) + done <- s.fetchAndWrite(ctx, r, prev, seed, peerSelector) + wg.Done() + }(nextRound) - currentRoundComplete := make(chan bool, 2) - // len(taskCh) + (# pending writes to completed) increases by 1 - taskCh <- s.pipelineCallback(nextRound, currentRoundComplete, recentReqs[len(recentReqs)-1], recentReqs[len(recentReqs)-int(seedLookback)], peerSelector) - recentReqs = append(recentReqs[1:], currentRoundComplete) - } + nextRound++ + } - completedRounds := make(map[basics.Round]bool) - // the rest - for { select { - case round := <-completed: - if round == 0 { - // there was an error + case completedOK := <-completed[firstRound]: + delete(completed, firstRound) + firstRound++ + + if !completedOK { + // there was an error; defer will cancel the pipeline return } + // if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we // could resume with the catchup. if s.ledger.IsWritingCatchpointDataFile() { @@ -546,22 +498,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { s.suspendForCatchpointWriting = true return } - completedRounds[round] = true - // fetch rounds we can validate - for completedRounds[nextRound-basics.Round(parallelRequests)] { - // If the next round is not supported - if s.nextRoundIsNotSupported(nextRound) { - s.handleUnsupportedRound(nextRound) - return - } - delete(completedRounds, nextRound) - currentRoundComplete := make(chan bool, 2) - // len(taskCh) + (# pending writes to completed) increases by 1 - taskCh <- s.pipelineCallback(nextRound, currentRoundComplete, recentReqs[len(recentReqs)-1], recentReqs[0], peerSelector) - recentReqs = append(recentReqs[1:], currentRoundComplete) - nextRound++ - } case <-s.ctx.Done(): return } @@ -719,7 +656,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy peer := psp.Peer // Ask the fetcher to get the block somehow - block, fetchedCert, _, err := s.innerFetch(cert.Round, peer) + block, fetchedCert, _, err := s.innerFetch(s.ctx, cert.Round, peer) if err != nil { select { diff --git a/catchup/service_test.go b/catchup/service_test.go index b0a6501328..4e5c907512 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -784,6 +784,10 @@ func (m *mockedLedger) Wait(r basics.Round) chan struct{} { return m.chans[r] } +func (m *mockedLedger) WaitMem(r basics.Round) chan struct{} { + return m.Wait(r) +} + func (m *mockedLedger) Block(r basics.Round) (bookkeeping.Block, error) { m.mu.Lock() defer m.mu.Unlock() diff --git a/ledger/bulletin.go b/ledger/bulletin.go index 4465a52381..8114fefa14 100644 --- a/ledger/bulletin.go +++ b/ledger/bulletin.go @@ -47,13 +47,19 @@ func (notifier *notifier) notify() { } // bulletin provides an easy way to wait on a round to be written to the ledger. -// To use it, call <-Wait(round) +// To use it, call <-Wait(round). type bulletin struct { mu deadlock.Mutex pendingNotificationRequests map[basics.Round]notifier latestRound basics.Round } +// bulletinMem is a variant of bulletin that notifies when blocks +// are available in-memory (but might not be stored durably on disk). +type bulletinMem struct { + bulletin +} + func makeBulletin() *bulletin { b := new(bulletin) b.pendingNotificationRequests = make(map[basics.Round]notifier) @@ -89,10 +95,7 @@ func (b *bulletin) loadFromDisk(l ledgerForTracker, _ basics.Round) error { func (b *bulletin) close() { } -func (b *bulletin) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { -} - -func (b *bulletin) committedUpTo(rnd basics.Round) (retRound, lookback basics.Round) { +func (b *bulletin) notifyRound(rnd basics.Round) { b.mu.Lock() defer b.mu.Unlock() @@ -106,6 +109,19 @@ func (b *bulletin) committedUpTo(rnd basics.Round) (retRound, lookback basics.Ro } b.latestRound = rnd +} + +func (b *bulletin) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { +} + +func (b *bulletinMem) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { + b.notifyRound(blk.Round()) +} + +func (b *bulletin) committedUpTo(rnd basics.Round) (retRound, lookback basics.Round) { + // We notify for rnd for both bulletinMem and bulletinDisk, for simplicity. + // It's always safe to notify when block hits disk. + b.notifyRound(rnd) return rnd, basics.Round(0) } diff --git a/ledger/ledger.go b/ledger/ledger.go index 350b649240..337072f128 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -84,7 +84,8 @@ type Ledger struct { acctsOnline onlineAccounts catchpoint catchpointTracker txTail txTail - bulletin bulletin + bulletinDisk bulletin + bulletinMem bulletinMem notifier blockNotifier metrics metricsTracker spVerification spVerificationTracker @@ -212,7 +213,8 @@ func (l *Ledger) reloadLedger() error { &l.catchpoint, // catchpoints tracker : update catchpoint labels, create catchpoint files &l.acctsOnline, // update online account balances history &l.txTail, // update the transaction tail, tracking the recent 1000 txn - &l.bulletin, // provide closed channel signaling support for completed rounds + &l.bulletinDisk, // provide closed channel signaling support for completed rounds on disk + &l.bulletinMem, // provide closed channel signaling support for completed rounds in memory &l.notifier, // send OnNewBlocks to subscribers &l.metrics, // provides metrics reporting support &l.spVerification, // provides state proof verification support @@ -745,7 +747,16 @@ func (l *Ledger) WaitForCommit(r basics.Round) { func (l *Ledger) Wait(r basics.Round) chan struct{} { l.trackerMu.RLock() defer l.trackerMu.RUnlock() - return l.bulletin.Wait(r) + return l.bulletinDisk.Wait(r) +} + +// WaitMem returns a channel that closes once a given round is +// available in memory in the ledger, but might not be stored +// durably on disk yet. +func (l *Ledger) WaitMem(r basics.Round) chan struct{} { + l.trackerMu.RLock() + defer l.trackerMu.RUnlock() + return l.bulletinMem.Wait(r) } // GenesisHash returns the genesis hash for this ledger.