diff --git a/vms/platformvm/state/mock_state.go b/vms/platformvm/state/mock_state.go index 9d7ba10eeee9..1dd577d1ebdf 100644 --- a/vms/platformvm/state/mock_state.go +++ b/vms/platformvm/state/mock_state.go @@ -12,12 +12,14 @@ package state import ( context "context" reflect "reflect" + sync "sync" time "time" database "github.com/ava-labs/avalanchego/database" ids "github.com/ava-labs/avalanchego/ids" validators "github.com/ava-labs/avalanchego/snow/validators" iterator "github.com/ava-labs/avalanchego/utils/iterator" + logging "github.com/ava-labs/avalanchego/utils/logging" avax "github.com/ava-labs/avalanchego/vms/components/avax" gas "github.com/ava-labs/avalanchego/vms/components/gas" block "github.com/ava-labs/avalanchego/vms/platformvm/block" @@ -860,6 +862,20 @@ func (mr *MockStateMockRecorder) PutPendingValidator(staker any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutPendingValidator", reflect.TypeOf((*MockState)(nil).PutPendingValidator), staker) } +// ReindexBlocks mocks base method. +func (m *MockState) ReindexBlocks(lock sync.Locker, log logging.Logger) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReindexBlocks", lock, log) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReindexBlocks indicates an expected call of ReindexBlocks. +func (mr *MockStateMockRecorder) ReindexBlocks(lock, log any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReindexBlocks", reflect.TypeOf((*MockState)(nil).ReindexBlocks), lock, log) +} + // SetAccruedFees mocks base method. func (m *MockState) SetAccruedFees(f uint64) { m.ctrl.T.Helper() diff --git a/vms/platformvm/state/state.go b/vms/platformvm/state/state.go index cc774922e3b9..fc6d8dca6b99 100644 --- a/vms/platformvm/state/state.go +++ b/vms/platformvm/state/state.go @@ -8,6 +8,8 @@ import ( "context" "errors" "fmt" + "math" + "sync" "time" "github.com/google/btree" @@ -23,6 +25,7 @@ import ( "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/uptime" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/upgrade" @@ -30,7 +33,9 @@ import ( "github.com/ava-labs/avalanchego/utils/crypto/bls" "github.com/ava-labs/avalanchego/utils/hashing" "github.com/ava-labs/avalanchego/utils/iterator" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/maybe" + "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/components/gas" @@ -97,6 +102,7 @@ var ( LastAcceptedKey = []byte("last accepted") HeightsIndexedKey = []byte("heights indexed") InitializedKey = []byte("initialized") + BlocksReindexedKey = []byte("blocks reindexed.3") emptyL1ValidatorCache = &cache.Empty[ids.ID, maybe.Maybe[L1Validator]]{} ) @@ -215,6 +221,14 @@ type State interface { // Discard uncommitted changes to the database. Abort() + // ReindexBlocks converts any block indices using the legacy storage format + // to the new format. If this database has already updated the indices, + // this function will return immediately, without iterating over the + // database. + // + // TODO: Remove after v1.14.x is activated + ReindexBlocks(lock sync.Locker, log logging.Logger) error + // Commit changes to the base database. Commit() error @@ -227,6 +241,16 @@ type State interface { Close() error } +// Prior to https://github.com/ava-labs/avalanchego/pull/1719, blocks were +// stored as a map from blkID to stateBlk. Nodes synced prior to this PR may +// still have blocks partially stored using this legacy format. +// +// TODO: Remove after v1.14.x is activated +type stateBlk struct { + Bytes []byte `serialize:"true"` + Status choices.Status `serialize:"true"` +} + /* * VMDB * |-. validators @@ -296,6 +320,7 @@ type State interface { * | '-- timestamp + validationID -> nil * '-. singletons * |-- initializedKey -> nil + * |-- blocksReindexedKey -> nil * |-- timestampKey -> timestamp * |-- feeStateKey -> feeState * |-- l1ValidatorExcessKey -> l1ValidatorExcess @@ -2258,7 +2283,7 @@ func (s *state) GetStatelessBlock(blockID ids.ID) (block.Block, error) { return nil, err } - blk, err := block.Parse(block.GenesisCodec, blkBytes) + blk, _, err := parseStoredBlock(blkBytes) if err != nil { return nil, err } @@ -3039,6 +3064,162 @@ func (s *state) writeMetadata() error { return nil } +// Returns the block and whether it is a [stateBlk]. +// Invariant: blkBytes is safe to parse with blocks.GenesisCodec +// +// TODO: Remove after v1.14.x is activated +func parseStoredBlock(blkBytes []byte) (block.Block, bool, error) { + // Attempt to parse as blocks.Block + blk, err := block.Parse(block.GenesisCodec, blkBytes) + if err == nil { + return blk, false, nil + } + + // Fallback to [stateBlk] + blkState := stateBlk{} + if _, err := block.GenesisCodec.Unmarshal(blkBytes, &blkState); err != nil { + return nil, false, err + } + + blk, err = block.Parse(block.GenesisCodec, blkState.Bytes) + return blk, true, err +} + +func (s *state) ReindexBlocks(lock sync.Locker, log logging.Logger) error { + has, err := s.singletonDB.Has(BlocksReindexedKey) + if err != nil { + return err + } + if has { + log.Info("blocks already reindexed") + return nil + } + + // It is possible that new blocks are added after grabbing this iterator. + // New blocks are guaranteed to be persisted in the new format, so we don't + // need to check them. + blockIterator := s.blockDB.NewIterator() + // Releasing is done using a closure to ensure that updating blockIterator + // will result in having the most recent iterator released when executing + // the deferred function. + defer func() { + blockIterator.Release() + }() + + log.Info("starting block reindexing") + + var ( + startTime = time.Now() + lastCommit = startTime + nextUpdate = startTime.Add(indexLogFrequency) + numIndicesChecked = 0 + numIndicesUpdated = 0 + ) + + for blockIterator.Next() { + valueBytes := blockIterator.Value() + blk, isStateBlk, err := parseStoredBlock(valueBytes) + if err != nil { + return fmt.Errorf("failed to parse block: %w", err) + } + + blkID := blk.ID() + + // This block was previously stored using the legacy format, update the + // index to remove the usage of stateBlk. + if isStateBlk { + blkBytes := blk.Bytes() + if err := s.blockDB.Put(blkID[:], blkBytes); err != nil { + return fmt.Errorf("failed to write block: %w", err) + } + + numIndicesUpdated++ + } + + numIndicesChecked++ + + now := time.Now() + if now.After(nextUpdate) { + nextUpdate = now.Add(indexLogFrequency) + + progress := timer.ProgressFromHash(blkID[:]) + eta := timer.EstimateETA( + startTime, + progress, + math.MaxUint64, + ) + + log.Info("reindexing blocks", + zap.Int("numIndicesUpdated", numIndicesUpdated), + zap.Int("numIndicesChecked", numIndicesChecked), + zap.Duration("eta", eta), + ) + } + + if numIndicesChecked%indexIterationLimit == 0 { + // We must hold the lock during committing to make sure we don't + // attempt to commit to disk while a block is concurrently being + // accepted. + lock.Lock() + err := errors.Join( + s.Commit(), + blockIterator.Error(), + ) + lock.Unlock() + if err != nil { + return err + } + + // We release the iterator here to allow the underlying database to + // clean up deleted state. + blockIterator.Release() + + // We take the minimum here because it's possible that the node is + // currently bootstrapping. This would mean that grabbing the lock + // could take an extremely long period of time; which we should not + // delay processing for. + indexDuration := now.Sub(lastCommit) + sleepDuration := min( + indexIterationSleepMultiplier*indexDuration, + indexIterationSleepCap, + ) + time.Sleep(sleepDuration) + + // Make sure not to include the sleep duration into the next index + // duration. + lastCommit = time.Now() + + blockIterator = s.blockDB.NewIteratorWithStart(blkID[:]) + } + } + + // Ensure we fully iterated over all blocks before writing that indexing has + // finished. + // + // Note: This is needed because a transient read error could cause the + // iterator to stop early. + if err := blockIterator.Error(); err != nil { + return fmt.Errorf("failed to iterate over historical blocks: %w", err) + } + + if err := s.singletonDB.Put(BlocksReindexedKey, nil); err != nil { + return fmt.Errorf("failed to put marked blocks as reindexed: %w", err) + } + + // We must hold the lock during committing to make sure we don't attempt to + // commit to disk while a block is concurrently being accepted. + lock.Lock() + defer lock.Unlock() + + log.Info("finished block reindexing", + zap.Int("numIndicesUpdated", numIndicesUpdated), + zap.Int("numIndicesChecked", numIndicesChecked), + zap.Duration("duration", time.Since(startTime)), + ) + + return s.Commit() +} + func (s *state) GetUptime(vdrID ids.NodeID) (time.Duration, time.Time, error) { return s.validatorState.GetUptime(vdrID, constants.PrimaryNetworkID) } diff --git a/vms/platformvm/state/state_test.go b/vms/platformvm/state/state_test.go index 05ada8d9a781..e3d72b7cab6c 100644 --- a/vms/platformvm/state/state_test.go +++ b/vms/platformvm/state/state_test.go @@ -9,6 +9,7 @@ import ( "maps" "math" "math/rand" + "sync" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/upgrade/upgradetest" "github.com/ava-labs/avalanchego/utils" @@ -34,6 +36,7 @@ import ( "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/components/gas" + "github.com/ava-labs/avalanchego/vms/platformvm/block" "github.com/ava-labs/avalanchego/vms/platformvm/config" "github.com/ava-labs/avalanchego/vms/platformvm/fx/fxmock" "github.com/ava-labs/avalanchego/vms/platformvm/genesis/genesistest" @@ -1098,6 +1101,74 @@ func copyValidatorSet( return result } +func TestParsedStateBlock(t *testing.T) { + var ( + require = require.New(t) + blks = makeBlocks(require) + ) + + for _, blk := range blks { + stBlk := stateBlk{ + Bytes: blk.Bytes(), + Status: choices.Accepted, + } + + stBlkBytes, err := block.GenesisCodec.Marshal(block.CodecVersion, &stBlk) + require.NoError(err) + + gotBlk, isStateBlk, err := parseStoredBlock(stBlkBytes) + require.NoError(err) + require.True(isStateBlk) + require.Equal(blk.ID(), gotBlk.ID()) + + gotBlk, isStateBlk, err = parseStoredBlock(blk.Bytes()) + require.NoError(err) + require.False(isStateBlk) + require.Equal(blk.ID(), gotBlk.ID()) + } +} + +func TestReindexBlocks(t *testing.T) { + var ( + require = require.New(t) + s = newTestState(t, memdb.New()) + blks = makeBlocks(require) + ) + + // Populate the blocks using the legacy format. + for _, blk := range blks { + stBlk := stateBlk{ + Bytes: blk.Bytes(), + Status: choices.Accepted, + } + stBlkBytes, err := block.GenesisCodec.Marshal(block.CodecVersion, &stBlk) + require.NoError(err) + + blkID := blk.ID() + require.NoError(s.blockDB.Put(blkID[:], stBlkBytes)) + } + + // Convert the indices to the new format. + require.NoError(s.ReindexBlocks(&sync.Mutex{}, logging.NoLog{})) + + // Verify that the blocks are stored in the new format. + for _, blk := range blks { + blkID := blk.ID() + blkBytes, err := s.blockDB.Get(blkID[:]) + require.NoError(err) + + parsedBlk, err := block.Parse(block.GenesisCodec, blkBytes) + require.NoError(err) + require.Equal(blkID, parsedBlk.ID()) + } + + // Verify that the flag has been written to disk to allow skipping future + // reindexings. + reindexed, err := s.singletonDB.Has(BlocksReindexedKey) + require.NoError(err) + require.True(reindexed) +} + func TestStateSubnetOwner(t *testing.T) { require := require.New(t) @@ -1179,6 +1250,94 @@ func TestStateSubnetToL1Conversion(t *testing.T) { } } +func makeBlocks(require *require.Assertions) []block.Block { + var blks []block.Block + { + blk, err := block.NewApricotAbortBlock(ids.GenerateTestID(), 1000) + require.NoError(err) + blks = append(blks, blk) + } + + { + blk, err := block.NewApricotAtomicBlock(ids.GenerateTestID(), 1000, &txs.Tx{ + Unsigned: &txs.AdvanceTimeTx{ + Time: 1000, + }, + }) + require.NoError(err) + blks = append(blks, blk) + } + + { + blk, err := block.NewApricotCommitBlock(ids.GenerateTestID(), 1000) + require.NoError(err) + blks = append(blks, blk) + } + + { + tx := &txs.Tx{ + Unsigned: &txs.RewardValidatorTx{ + TxID: ids.GenerateTestID(), + }, + } + require.NoError(tx.Initialize(txs.Codec)) + blk, err := block.NewApricotProposalBlock(ids.GenerateTestID(), 1000, tx) + require.NoError(err) + blks = append(blks, blk) + } + + { + tx := &txs.Tx{ + Unsigned: &txs.RewardValidatorTx{ + TxID: ids.GenerateTestID(), + }, + } + require.NoError(tx.Initialize(txs.Codec)) + blk, err := block.NewApricotStandardBlock(ids.GenerateTestID(), 1000, []*txs.Tx{tx}) + require.NoError(err) + blks = append(blks, blk) + } + + { + blk, err := block.NewBanffAbortBlock(time.Now(), ids.GenerateTestID(), 1000) + require.NoError(err) + blks = append(blks, blk) + } + + { + blk, err := block.NewBanffCommitBlock(time.Now(), ids.GenerateTestID(), 1000) + require.NoError(err) + blks = append(blks, blk) + } + + { + tx := &txs.Tx{ + Unsigned: &txs.RewardValidatorTx{ + TxID: ids.GenerateTestID(), + }, + } + require.NoError(tx.Initialize(txs.Codec)) + + blk, err := block.NewBanffProposalBlock(time.Now(), ids.GenerateTestID(), 1000, tx, []*txs.Tx{}) + require.NoError(err) + blks = append(blks, blk) + } + + { + tx := &txs.Tx{ + Unsigned: &txs.RewardValidatorTx{ + TxID: ids.GenerateTestID(), + }, + } + require.NoError(tx.Initialize(txs.Codec)) + + blk, err := block.NewBanffStandardBlock(time.Now(), ids.GenerateTestID(), 1000, []*txs.Tx{tx}) + require.NoError(err) + blks = append(blks, blk) + } + return blks +} + // Verify that committing the state writes the fee state to the database and // that loading the state fetches the fee state from the database. func TestStateFeeStateCommitAndLoad(t *testing.T) { diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 824cb6065c3f..90540203e8d9 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -234,6 +234,16 @@ func (vm *VM) Initialize( // Incrementing [awaitShutdown] would cause a deadlock since // [periodicallyPruneMempool] grabs the context lock. go vm.periodicallyPruneMempool(execConfig.MempoolPruneFrequency) + + go func() { + err := vm.state.ReindexBlocks(&vm.ctx.Lock, vm.ctx.Log) + if err != nil { + vm.ctx.Log.Warn("reindexing blocks failed", + zap.Error(err), + ) + } + }() + return nil }