From 191ec7531cb06435b54c733162fb481ac2dc4d39 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Mon, 9 Sep 2024 11:48:30 -0500 Subject: [PATCH] Fix the delayed sequencer missing messages in tests --- arbnode/delayed_sequencer.go | 56 ++++++++++++++++++++-------- system_tests/block_validator_test.go | 2 - 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 4f18531a76..cdae4d9e06 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -9,6 +9,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -29,16 +30,17 @@ type DelayedSequencer struct { reader *InboxReader exec execution.ExecutionSequencer coordinator *SeqCoordinator - waitingForFinalizedBlock uint64 + waitingForFinalizedBlock *uint64 mutex sync.Mutex config DelayedSequencerConfigFetcher } type DelayedSequencerConfig struct { - Enable bool `koanf:"enable" reload:"hot"` - FinalizeDistance int64 `koanf:"finalize-distance" reload:"hot"` - RequireFullFinality bool `koanf:"require-full-finality" reload:"hot"` - UseMergeFinality bool `koanf:"use-merge-finality" reload:"hot"` + Enable bool `koanf:"enable" reload:"hot"` + FinalizeDistance int64 `koanf:"finalize-distance" reload:"hot"` + RequireFullFinality bool `koanf:"require-full-finality" reload:"hot"` + UseMergeFinality bool `koanf:"use-merge-finality" reload:"hot"` + RescanInterval time.Duration `koanf:"rescan-interval" reload:"hot"` } type DelayedSequencerConfigFetcher func() *DelayedSequencerConfig @@ -48,6 +50,7 @@ func DelayedSequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int64(prefix+".finalize-distance", DefaultDelayedSequencerConfig.FinalizeDistance, "how many blocks in the past L1 block is considered final (ignored when using Merge finality)") f.Bool(prefix+".require-full-finality", DefaultDelayedSequencerConfig.RequireFullFinality, "whether to wait for full finality before sequencing delayed messages") f.Bool(prefix+".use-merge-finality", DefaultDelayedSequencerConfig.UseMergeFinality, "whether to use The Merge's notion of finality before sequencing delayed messages") + f.Duration(prefix+".rescan-interval", DefaultDelayedSequencerConfig.RescanInterval, "frequency to rescan for new delayed messages (the parent chain reader's poll-interval config is more important than this)") } var DefaultDelayedSequencerConfig = DelayedSequencerConfig{ @@ -55,6 +58,7 @@ var DefaultDelayedSequencerConfig = DelayedSequencerConfig{ FinalizeDistance: 20, RequireFullFinality: false, UseMergeFinality: true, + RescanInterval: time.Second, } var TestDelayedSequencerConfig = DelayedSequencerConfig{ @@ -62,6 +66,7 @@ var TestDelayedSequencerConfig = DelayedSequencerConfig{ FinalizeDistance: 20, RequireFullFinality: false, UseMergeFinality: false, + RescanInterval: time.Millisecond * 100, } func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher) (*DelayedSequencer, error) { @@ -124,13 +129,12 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock finalized = uint64(currentNum - config.FinalizeDistance) } - if d.waitingForFinalizedBlock > finalized { + if d.waitingForFinalizedBlock != nil && *d.waitingForFinalizedBlock > finalized { return nil } - // Unless we find an unfinalized message (which sets waitingForBlock), - // we won't find a new finalized message until FinalizeDistance blocks in the future. - d.waitingForFinalizedBlock = lastBlockHeader.Number.Uint64() + 1 + // Reset what block we're waiting for if we've caught up + d.waitingForFinalizedBlock = nil dbDelayedCount, err := d.inbox.GetDelayedCount() if err != nil { @@ -151,8 +155,8 @@ func (d *DelayedSequencer) sequenceWithoutLockout(ctx context.Context, lastBlock return err } if parentChainBlockNumber > finalized { - // Message isn't finalized yet; stop here - d.waitingForFinalizedBlock = parentChainBlockNumber + // Message isn't finalized yet; wait for it to be + d.waitingForFinalizedBlock = &parentChainBlockNumber break } if lastDelayedAcc != (common.Hash{}) { @@ -213,20 +217,40 @@ func (d *DelayedSequencer) run(ctx context.Context) { headerChan, cancel := d.l1Reader.Subscribe(false) defer cancel() + latestHeader, err := d.l1Reader.LastHeader(ctx) + if err != nil { + log.Warn("delayed sequencer: failed to get latest header", "err", err) + latestHeader = nil + } + rescanTimer := time.NewTimer(d.config().RescanInterval) for { + if !rescanTimer.Stop() { + select { + case <-rescanTimer.C: + default: + } + } + if latestHeader != nil { + rescanTimer.Reset(d.config().RescanInterval) + } + var ok bool select { - case nextHeader, ok := <-headerChan: + case latestHeader, ok = <-headerChan: if !ok { - log.Info("delayed sequencer: header channel close") + log.Debug("delayed sequencer: header channel close") return } - if err := d.trySequence(ctx, nextHeader); err != nil { - log.Error("Delayed sequencer error", "err", err) + case <-rescanTimer.C: + if latestHeader == nil { + continue } case <-ctx.Done(): - log.Info("delayed sequencer: context done", "err", ctx.Err()) + log.Debug("delayed sequencer: context done", "err", ctx.Err()) return } + if err := d.trySequence(ctx, latestHeader); err != nil { + log.Error("Delayed sequencer error", "err", err) + } } } diff --git a/system_tests/block_validator_test.go b/system_tests/block_validator_test.go index eef6c29b7a..c3b68871cc 100644 --- a/system_tests/block_validator_test.go +++ b/system_tests/block_validator_test.go @@ -203,8 +203,6 @@ func testBlockValidatorSimple(t *testing.T, opts Options) { builder.L1.SendWaitTestTransactions(t, []*types.Transaction{ WrapL2ForDelayed(t, delayedTx, builder.L1Info, "User", 100000), }) - // give the inbox reader a bit of time to pick up the delayed message - time.Sleep(time.Millisecond * 500) // sending l1 messages creates l1 blocks.. make enough to get that delayed inbox message in for i := 0; i < 30; i++ {