diff --git a/cache/fifo_cache.go b/cache/fifo_cache.go new file mode 100644 index 000000000000..5a26c830ccaf --- /dev/null +++ b/cache/fifo_cache.go @@ -0,0 +1,58 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package cache + +import ( + "sync" + + "github.com/ava-labs/avalanchego/utils/buffer" +) + +type FIFO[K comparable, V any] struct { + l sync.RWMutex + buffer buffer.Queue[K] + m map[K]V +} + +// NewFIFO creates a new First-In-First-Out cache of size [limit]. +// +// If a duplicate item is stored, it will not be requeued but its +// value will be changed. +func NewFIFO[K comparable, V any](limit int) (*FIFO[K, V], error) { + c := &FIFO[K, V]{ + m: make(map[K]V, limit), + } + buf, err := buffer.NewBoundedQueue(limit, c.remove) + if err != nil { + return nil, err + } + c.buffer = buf + return c, nil +} + +func (f *FIFO[K, V]) Put(key K, val V) bool { + f.l.Lock() + defer f.l.Unlock() + + _, exists := f.m[key] + if !exists { + f.buffer.Push(key) // Push removes the oldest [K] if we are at the [limit] + } + f.m[key] = val + return exists +} + +func (f *FIFO[K, V]) Get(key K) (V, bool) { + f.l.RLock() + defer f.l.RUnlock() + + v, ok := f.m[key] + return v, ok +} + +// remove is used as the callback in [BoundedBuffer]. It is assumed that the +// [WriteLock] is held when this is accessed. +func (f *FIFO[K, V]) remove(key K) { + delete(f.m, key) +} diff --git a/cache/fifo_cache_test.go b/cache/fifo_cache_test.go new file mode 100644 index 000000000000..75bcd33b9f6a --- /dev/null +++ b/cache/fifo_cache_test.go @@ -0,0 +1,157 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package cache + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFIFOCacheInsertion(t *testing.T) { + type put struct { + kv int + exists bool + } + + type get struct { + k int + ok bool + } + + tests := []struct { + name string + ops []interface{} + }{ + { + name: "insert less than limit", + ops: []interface{}{ + put{ + kv: 0, + exists: false, + }, + get{ + k: 0, + ok: true, + }, + }, + }, + { + name: "insert limit", + ops: []interface{}{ + put{ + kv: 0, + exists: false, + }, + put{ + kv: 1, + exists: false, + }, + get{ + k: 0, + ok: true, + }, + get{ + k: 1, + ok: true, + }, + }, + }, + { + name: "exceed limit", + ops: []interface{}{ + put{ + kv: 0, + exists: false, + }, + put{ + kv: 1, + exists: false, + }, + put{ + kv: 2, + exists: false, + }, + get{ + k: 0, + ok: false, + }, + get{ + k: 1, + ok: true, + }, + get{ + k: 2, + ok: true, + }, + }, + }, + { + name: "exceed limit + get ops maintains FIFO removal order", + ops: []interface{}{ + put{ + kv: 0, + exists: false, + }, + put{ + kv: 1, + exists: false, + }, + get{ + k: 0, + ok: true, + }, + put{ + kv: 2, + exists: false, + }, + get{ + k: 0, + ok: false, + }, + get{ + k: 1, + ok: true, + }, + get{ + k: 2, + ok: true, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + cache, err := NewFIFO[int, int](2) + require.NoError(err) + + for _, opIntf := range tt.ops { + switch op := opIntf.(type) { + case put: + exists := cache.Put(op.kv, op.kv) + require.Equal(op.exists, exists) + case get: + val, ok := cache.Get(op.k) + require.Equal(op.ok, ok) + if ok { + require.Equal(op.k, val) + } + default: + require.Fail("op can only be a put or a get") + } + } + }) + } +} + +func TestEmptyCacheSizeInvalid(t *testing.T) { + require := require.New(t) + _, err := NewFIFO[int, int](0) + expectedErr := errors.New("maxSize must be greater than 0") + require.Equal(expectedErr, err) +} diff --git a/snow/engine/snowman/block/state_syncable_vm.go b/snow/engine/snowman/block/state_syncable_vm.go index 0457505183e5..5fa67bdd8ad2 100644 --- a/snow/engine/snowman/block/state_syncable_vm.go +++ b/snow/engine/snowman/block/state_syncable_vm.go @@ -43,3 +43,23 @@ type StateSyncableVM interface { // [summaryHeight]. GetStateSummary(ctx context.Context, summaryHeight uint64) (StateSummary, error) } + +type StateSyncableVMDisabled struct{} + +func (StateSyncableVMDisabled) StateSyncEnabled(context.Context) (bool, error) { return false, nil } + +func (StateSyncableVMDisabled) GetOngoingSyncStateSummary(context.Context) (StateSummary, error) { + return nil, ErrStateSyncableVMNotImplemented +} + +func (StateSyncableVMDisabled) GetLastStateSummary(context.Context) (StateSummary, error) { + return nil, ErrStateSyncableVMNotImplemented +} + +func (StateSyncableVMDisabled) ParseStateSummary(ctx context.Context, summaryBytes []byte) (StateSummary, error) { + return nil, ErrStateSyncableVMNotImplemented +} + +func (StateSyncableVMDisabled) GetStateSummary(ctx context.Context, summaryHeight uint64) (StateSummary, error) { + return nil, ErrStateSyncableVMNotImplemented +} diff --git a/vms/sdk/chainindex/chain_index.go b/vms/sdk/chainindex/chain_index.go new file mode 100644 index 000000000000..3115c0f28bfd --- /dev/null +++ b/vms/sdk/chainindex/chain_index.go @@ -0,0 +1,224 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package chainindex + +import ( + "context" + "encoding/binary" + "errors" + "math/rand" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const ( + blockPrefix byte = 0x0 // TODO: migrate to flat files + blockIDHeightPrefix byte = 0x1 // ID -> Height + blockHeightIDPrefix byte = 0x2 // Height -> ID (don't always need full block from disk) + lastAcceptedByte byte = 0x3 // lastAcceptedByte -> lastAcceptedHeight +) + +var ( + lastAcceptedKey = []byte{lastAcceptedByte} + + errBlockCompactionFrequencyZero = errors.New("block compaction frequency must be non-zero") +) + +// Config is the configuration of the chain index +type Config struct { + // AcceptedBlockWindow gives the number of blocks to store on-disk before deleting the oldest block. + // 0 disables block deletion. + AcceptedBlockWindow uint64 `json:"acceptedBlockWindow"` + // BlockCompactionFrequency sets the interval between indexing blocks that the chain index runs + // compaction manually. + BlockCompactionFrequency uint64 `json:"blockCompactionFrequency"` +} + +// NewDefaultConfig returns a default config for the chain index +func NewDefaultConfig() Config { + return Config{ + AcceptedBlockWindow: 50_000, // ~3.5hr with 250ms block time (100GB at 2MB) + BlockCompactionFrequency: 32, // 64 MB of deletion if 2 MB blocks + } +} + +// ChainIndex provides a simple block block index using a key-value store +// and providing a simple retention window and regular block compaction policy. +type ChainIndex[T Block] struct { + config Config + compactionOffset uint64 + metrics *metrics + log logging.Logger + db database.Database + parser Parser[T] +} + +// Block provides the minimium interface of a block type required by the ChainIndex +// to enable indexing by both height and ID. +type Block interface { + GetID() ids.ID + GetHeight() uint64 + GetBytes() []byte +} + +// Parser defines the injected dependency required by the ChainIndex so that it +// can parse blocks it reads from disk. +type Parser[T Block] interface { + ParseBlock(context.Context, []byte) (T, error) +} + +func New[T Block]( + log logging.Logger, + registry prometheus.Registerer, + config Config, + parser Parser[T], + db database.Database, +) (*ChainIndex[T], error) { + metrics, err := newMetrics(registry) + if err != nil { + return nil, err + } + if config.BlockCompactionFrequency == 0 { + return nil, errBlockCompactionFrequencyZero + } + + return &ChainIndex[T]{ + config: config, + // Offset by random number to ensure the network does not compact simultaneously + compactionOffset: rand.Uint64() % config.BlockCompactionFrequency, //nolint:gosec + metrics: metrics, + log: log, + db: db, + parser: parser, + }, nil +} + +// GetLastAcceptedHeight return the height of the last accepted block included in the +// index. +func (c *ChainIndex[T]) GetLastAcceptedHeight(_ context.Context) (uint64, error) { + lastAcceptedHeightBytes, err := c.db.Get(lastAcceptedKey) + if err != nil { + return 0, err + } + return database.ParseUInt64(lastAcceptedHeightBytes) +} + +// UpdateLastAccepted writes blk to disk and updates the last accepted height +func (c *ChainIndex[T]) UpdateLastAccepted(ctx context.Context, blk T) error { + c.metrics.indexedBlocks.Inc() + batch := c.db.NewBatch() + + var ( + blkID = blk.GetID() + height = blk.GetHeight() + blkBytes = blk.GetBytes() + ) + heightBytes := binary.BigEndian.AppendUint64(nil, height) + err := errors.Join( + batch.Put(lastAcceptedKey, heightBytes), + batch.Put(prefixBlockIDHeightKey(blkID), heightBytes), + batch.Put(prefixBlockHeightIDKey(height), blkID[:]), + batch.Put(prefixBlockKey(height), blkBytes), + ) + if err != nil { + return err + } + + expiryHeight := height - c.config.AcceptedBlockWindow + if c.config.AcceptedBlockWindow == 0 || expiryHeight == 0 || expiryHeight >= height { // ensure we don't free genesis + return batch.Write() + } + + if err := batch.Delete(prefixBlockKey(expiryHeight)); err != nil { + return err + } + deleteBlkID, err := c.GetBlockIDAtHeight(ctx, expiryHeight) + if err != nil { + return err + } + if err := batch.Delete(prefixBlockIDHeightKey(deleteBlkID)); err != nil { + return err + } + if err := batch.Delete(prefixBlockHeightIDKey(expiryHeight)); err != nil { + return err + } + c.metrics.deletedBlocks.Inc() + + if expiryHeight%c.config.BlockCompactionFrequency == c.compactionOffset { + go func() { + start := time.Now() + if err := c.db.Compact([]byte{blockPrefix}, prefixBlockKey(expiryHeight)); err != nil { + c.log.Error("failed to compact block store", zap.Error(err)) + return + } + c.log.Info("compacted disk blocks", zap.Uint64("end", expiryHeight), zap.Duration("t", time.Since(start))) + }() + } + + return batch.Write() +} + +// GetBlock retrieves the block from disk by the blkID +func (c *ChainIndex[T]) GetBlock(ctx context.Context, blkID ids.ID) (T, error) { + height, err := c.GetBlockIDHeight(ctx, blkID) + if err != nil { + return utils.Zero[T](), err + } + return c.GetBlockByHeight(ctx, height) +} + +// GetBlockIDAtHeight retrieves the blkID associated with the requested blkHeight +func (c *ChainIndex[T]) GetBlockIDAtHeight(_ context.Context, blkHeight uint64) (ids.ID, error) { + blkIDBytes, err := c.db.Get(prefixBlockHeightIDKey(blkHeight)) + if err != nil { + return ids.Empty, err + } + return ids.ID(blkIDBytes), nil +} + +// GetBlockIDHeight retrieves the blkHeight associated with the requested blkID +func (c *ChainIndex[T]) GetBlockIDHeight(_ context.Context, blkID ids.ID) (uint64, error) { + blkHeightBytes, err := c.db.Get(prefixBlockIDHeightKey(blkID)) + if err != nil { + return 0, err + } + return database.ParseUInt64(blkHeightBytes) +} + +// GetBlockByHeight returns the block at the requested height +func (c *ChainIndex[T]) GetBlockByHeight(ctx context.Context, blkHeight uint64) (T, error) { + blkBytes, err := c.db.Get(prefixBlockKey(blkHeight)) + if err != nil { + return utils.Zero[T](), err + } + return c.parser.ParseBlock(ctx, blkBytes) +} + +func prefixBlockKey(height uint64) []byte { + k := make([]byte, 1+wrappers.LongLen) + k[0] = blockPrefix + binary.BigEndian.PutUint64(k[1:], height) + return k +} + +func prefixBlockIDHeightKey(id ids.ID) []byte { + k := make([]byte, 1+ids.IDLen) + k[0] = blockIDHeightPrefix + copy(k[1:], id[:]) + return k +} + +func prefixBlockHeightIDKey(height uint64) []byte { + k := make([]byte, 1+wrappers.LongLen) + k[0] = blockHeightIDPrefix + binary.BigEndian.PutUint64(k[1:], height) + return k +} diff --git a/vms/sdk/chainindex/chain_index_test.go b/vms/sdk/chainindex/chain_index_test.go new file mode 100644 index 000000000000..6138c2d13d79 --- /dev/null +++ b/vms/sdk/chainindex/chain_index_test.go @@ -0,0 +1,129 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package chainindex + +import ( + "context" + "encoding/binary" + "fmt" + "testing" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/hashing" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +type testBlock struct { + height uint64 +} + +func (t *testBlock) GetID() ids.ID { return hashing.ComputeHash256Array(t.GetBytes()) } +func (t *testBlock) GetHeight() uint64 { return t.height } +func (t *testBlock) GetBytes() []byte { return binary.BigEndian.AppendUint64(nil, t.height) } + +type parser struct{} + +func (*parser) ParseBlock(_ context.Context, b []byte) (*testBlock, error) { + if len(b) != wrappers.LongLen { + return nil, fmt.Errorf("unexpected block length: %d", len(b)) + } + height := binary.BigEndian.Uint64(b) + return &testBlock{height: height}, nil +} + +func newTestChainIndex(config Config, db database.Database) (*ChainIndex[*testBlock], error) { + return New(logging.NoLog{}, prometheus.NewRegistry(), config, &parser{}, db) +} + +func confirmBlockIndexed(r *require.Assertions, ctx context.Context, chainIndex *ChainIndex[*testBlock], expectedBlk *testBlock, expectedErr error) { + blkByHeight, err := chainIndex.GetBlockByHeight(ctx, expectedBlk.height) + r.ErrorIs(err, expectedErr) + + blkIDAtHeight, err := chainIndex.GetBlockIDAtHeight(ctx, expectedBlk.height) + r.ErrorIs(err, expectedErr) + + blockIDHeight, err := chainIndex.GetBlockIDHeight(ctx, expectedBlk.GetID()) + r.ErrorIs(err, expectedErr) + + blk, err := chainIndex.GetBlock(ctx, expectedBlk.GetID()) + r.ErrorIs(err, expectedErr) + + if expectedErr != nil { + return + } + + r.Equal(blkByHeight.GetID(), expectedBlk.GetID()) + r.Equal(blkIDAtHeight, expectedBlk.GetID()) + r.Equal(blockIDHeight, expectedBlk.GetHeight()) + r.Equal(blk.GetID(), expectedBlk.GetID()) +} + +func confirmLastAcceptedHeight(r *require.Assertions, ctx context.Context, chainIndex *ChainIndex[*testBlock], expectedHeight uint64) { + lastAcceptedHeight, err := chainIndex.GetLastAcceptedHeight(ctx) + r.NoError(err) + r.Equal(expectedHeight, lastAcceptedHeight) +} + +func TestChainIndex(t *testing.T) { + r := require.New(t) + ctx := context.Background() + chainIndex, err := newTestChainIndex(NewDefaultConfig(), memdb.New()) + r.NoError(err) + + genesisBlk := &testBlock{height: 0} + confirmBlockIndexed(r, ctx, chainIndex, genesisBlk, database.ErrNotFound) + _, err = chainIndex.GetLastAcceptedHeight(ctx) + r.ErrorIs(err, database.ErrNotFound) + + r.NoError(chainIndex.UpdateLastAccepted(ctx, genesisBlk)) + confirmBlockIndexed(r, ctx, chainIndex, genesisBlk, nil) + confirmLastAcceptedHeight(r, ctx, chainIndex, genesisBlk.GetHeight()) + + blk1 := &testBlock{height: 1} + r.NoError(chainIndex.UpdateLastAccepted(ctx, blk1)) + confirmBlockIndexed(r, ctx, chainIndex, blk1, nil) + confirmLastAcceptedHeight(r, ctx, chainIndex, blk1.GetHeight()) +} + +func TestChainIndexInvalidCompactionFrequency(t *testing.T) { + _, err := newTestChainIndex(Config{BlockCompactionFrequency: 0}, memdb.New()) + require.ErrorIs(t, err, errBlockCompactionFrequencyZero) +} + +func TestChainIndexExpiry(t *testing.T) { + r := require.New(t) + ctx := context.Background() + chainIndex, err := newTestChainIndex(Config{AcceptedBlockWindow: 1, BlockCompactionFrequency: 64}, memdb.New()) + r.NoError(err) + + genesisBlk := &testBlock{height: 0} + r.NoError(chainIndex.UpdateLastAccepted(ctx, genesisBlk)) + confirmBlockIndexed(r, ctx, chainIndex, genesisBlk, nil) + confirmLastAcceptedHeight(r, ctx, chainIndex, genesisBlk.GetHeight()) + + blk1 := &testBlock{height: 1} + r.NoError(chainIndex.UpdateLastAccepted(ctx, blk1)) + confirmBlockIndexed(r, ctx, chainIndex, genesisBlk, nil) // Confirm genesis is not un-indexed + confirmBlockIndexed(r, ctx, chainIndex, blk1, nil) + confirmLastAcceptedHeight(r, ctx, chainIndex, blk1.GetHeight()) + + blk2 := &testBlock{height: 2} + r.NoError(chainIndex.UpdateLastAccepted(ctx, blk2)) + confirmBlockIndexed(r, ctx, chainIndex, genesisBlk, nil) // Confirm genesis is not un-indexed + confirmBlockIndexed(r, ctx, chainIndex, blk2, nil) + confirmBlockIndexed(r, ctx, chainIndex, blk1, database.ErrNotFound) + confirmLastAcceptedHeight(r, ctx, chainIndex, blk2.GetHeight()) + + blk3 := &testBlock{height: 3} + r.NoError(chainIndex.UpdateLastAccepted(ctx, blk3)) + confirmBlockIndexed(r, ctx, chainIndex, genesisBlk, nil) // Confirm genesis is not un-indexed + confirmBlockIndexed(r, ctx, chainIndex, blk3, nil) + confirmBlockIndexed(r, ctx, chainIndex, blk2, database.ErrNotFound) + confirmLastAcceptedHeight(r, ctx, chainIndex, blk3.GetHeight()) +} diff --git a/vms/sdk/chainindex/metrics.go b/vms/sdk/chainindex/metrics.go new file mode 100644 index 000000000000..7eb8b173b158 --- /dev/null +++ b/vms/sdk/chainindex/metrics.go @@ -0,0 +1,39 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package chainindex + +import ( + "errors" + + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + indexedBlocks prometheus.Counter + deletedBlocks prometheus.Counter +} + +func newMetrics(registry prometheus.Registerer) (*metrics, error) { + m := &metrics{ + indexedBlocks: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "chainindex", + Name: "num_indexed_blocks", + Help: "Number of blocks indexed", + }), + deletedBlocks: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "chainindex", + Name: "deleted_blocks", + Help: "Number of blocks deleted from the chain", + }), + } + + if err := errors.Join( + registry.Register(m.indexedBlocks), + registry.Register(m.deletedBlocks), + ); err != nil { + return nil, err + } + + return m, nil +} diff --git a/vms/sdk/context/config.go b/vms/sdk/context/config.go new file mode 100644 index 000000000000..e03722ce4735 --- /dev/null +++ b/vms/sdk/context/config.go @@ -0,0 +1,35 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package context + +import "encoding/json" + +type Config map[string]json.RawMessage + +func NewEmptyConfig() Config { + return make(Config) +} + +func NewConfig(b []byte) (Config, error) { + c := Config{} + if len(b) > 0 { + if err := json.Unmarshal(b, &c); err != nil { + return nil, err + } + } + return c, nil +} + +func GetConfig[T any](c Config, key string, defaultConfig T) (T, error) { + val, ok := c[key] + if !ok { + return defaultConfig, nil + } + + var emptyConfig T + if err := json.Unmarshal(val, &defaultConfig); err != nil { + return emptyConfig, err + } + return defaultConfig, nil +} diff --git a/vms/sdk/context/config_test.go b/vms/sdk/context/config_test.go new file mode 100644 index 000000000000..3ce61077ad8b --- /dev/null +++ b/vms/sdk/context/config_test.go @@ -0,0 +1,93 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package context + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type testConfig struct { + TxFee uint64 `json:"txFee"` + MinFee uint64 `json:"minFee"` +} + +func TestGetConfig(t *testing.T) { + type test struct { + name string + providedStr string + defaultConfig testConfig + wantConfig testConfig + } + for _, test := range []test{ + { + name: "default want non-zero values", + providedStr: "", + defaultConfig: testConfig{TxFee: 100}, + wantConfig: testConfig{TxFee: 100}, + }, + { + name: "default want zero values", + providedStr: "", + defaultConfig: testConfig{}, + wantConfig: testConfig{}, + }, + { + name: "override default with zero values", + providedStr: `{ + "test": { + "txFee": 0, + "minFee": 0 + } + }`, + defaultConfig: testConfig{TxFee: 100, MinFee: 100}, + wantConfig: testConfig{TxFee: 0, MinFee: 0}, + }, + { + name: "override non-zero defaults", + providedStr: `{ + "test": { + "txFee": 1000, + "minFee": 1000 + } + }`, + defaultConfig: testConfig{TxFee: 100, MinFee: 100}, + wantConfig: testConfig{TxFee: 1000, MinFee: 1000}, + }, + { + name: "override one default value", + providedStr: `{ + "test": { + "txFee": 1000 + } + }`, + defaultConfig: testConfig{TxFee: 100, MinFee: 100}, + wantConfig: testConfig{TxFee: 1000, MinFee: 100}, + }, + } { + t.Run(test.name, func(t *testing.T) { + r := require.New(t) + c, err := NewConfig([]byte(test.providedStr)) + r.NoError(err) + testConfig, err := GetConfig(c, "test", test.defaultConfig) + r.NoError(err) + r.Equal(test.wantConfig, testConfig) + }) + } +} + +func TestInvalidConfig(t *testing.T) { + r := require.New(t) + _, err := NewConfig([]byte(`{`)) + r.Error(err) +} + +func TestInvalidConfigField(t *testing.T) { + r := require.New(t) + c, err := NewConfig([]byte(`{"test": {"txFee": "invalid"}}`)) + r.NoError(err) + _, err = GetConfig(c, "test", testConfig{}) + r.Error(err) +} diff --git a/vms/sdk/event/event.go b/vms/sdk/event/event.go new file mode 100644 index 000000000000..fe9e012da927 --- /dev/null +++ b/vms/sdk/event/event.go @@ -0,0 +1,93 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package event + +import ( + "context" + "errors" +) + +var ( + _ Subscription[struct{}] = (*SubscriptionFunc[struct{}])(nil) + _ SubscriptionFactory[struct{}] = (*SubscriptionFuncFactory[struct{}])(nil) +) + +// SubscriptionFactory returns an instance of a concrete Subscription +type SubscriptionFactory[T any] interface { + New() (Subscription[T], error) +} + +// Subscription defines how to consume events +type Subscription[T any] interface { + // Notify returns fatal errors + Notify(ctx context.Context, t T) error + // Close returns fatal errors + Close() error +} + +type SubscriptionFuncFactory[T any] SubscriptionFunc[T] + +func (s SubscriptionFuncFactory[T]) New() (Subscription[T], error) { + return SubscriptionFunc[T](s), nil +} + +// SubscriptionFunc implements Subscription[T] using anonymous functions +type SubscriptionFunc[T any] struct { + NotifyF func(ctx context.Context, t T) error + Closer func() error +} + +// Notify invokes the anonymous NotifyF function of the subscription +func (s SubscriptionFunc[T]) Notify(ctx context.Context, t T) error { + return s.NotifyF(ctx, t) +} + +// Close invokes the anonymous Closer function of the subscription if +// non-nil +func (s SubscriptionFunc[_]) Close() error { + if s.Closer == nil { + return nil + } + return s.Closer() +} + +// NotifyAll notifies all subs with the event e and joins any errors returned +// by the combined subs. +func NotifyAll[T any](ctx context.Context, e T, subs ...Subscription[T]) error { + var errs []error + for _, sub := range subs { + if err := sub.Notify(ctx, e); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +// Aggregate combines the subs into a single subscription instance +func Aggregate[T any](subs ...Subscription[T]) Subscription[T] { + return SubscriptionFunc[T]{ + NotifyF: func(ctx context.Context, t T) error { + return NotifyAll(ctx, t, subs...) + }, + Closer: func() error { + var errs []error + for _, sub := range subs { + if err := sub.Close(); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) + }, + } +} + +// Map transforms a subscription from an output sub to an input typed sub +func Map[Input any, Output any](mapF func(Input) Output, sub Subscription[Output]) Subscription[Input] { + return SubscriptionFunc[Input]{ + NotifyF: func(ctx context.Context, t Input) error { + return sub.Notify(ctx, mapF(t)) + }, + Closer: sub.Close, + } +} diff --git a/vms/sdk/event/event_test.go b/vms/sdk/event/event_test.go new file mode 100644 index 000000000000..671efe99d59d --- /dev/null +++ b/vms/sdk/event/event_test.go @@ -0,0 +1,78 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package event + +import ( + "context" + "testing" + + "github.com/ava-labs/avalanchego/ids" + "github.com/stretchr/testify/require" +) + +func newTestSubscription() (*SubscriptionFunc[ids.ID], chan ids.ID) { + subCh := make(chan ids.ID, 1) + return &SubscriptionFunc[ids.ID]{ + NotifyF: func(_ context.Context, eventID ids.ID) error { + subCh <- eventID + return nil + }, + Closer: func() error { + close(subCh) + return nil + }, + }, subCh +} + +func TestSubscription(t *testing.T) { + r := require.New(t) + sub, subCh := newTestSubscription() + + eventID := ids.GenerateTestID() + r.NoError(sub.Notify(context.Background(), eventID)) + + r.Equal(eventID, <-subCh) + r.NoError(sub.Close()) + <-subCh +} + +func TestNotifyAll(t *testing.T) { + r := require.New(t) + sub1, sub1Ch := newTestSubscription() + sub2, sub2Ch := newTestSubscription() + + eventID := ids.GenerateTestID() + r.NoError(NotifyAll(context.Background(), eventID, sub1, sub2)) + r.Equal(eventID, <-sub1Ch) + r.Equal(eventID, <-sub2Ch) +} + +func TestAggregateSubs(t *testing.T) { + r := require.New(t) + sub1, sub1Ch := newTestSubscription() + sub2, sub2Ch := newTestSubscription() + + eventID := ids.GenerateTestID() + aggregateSub := Aggregate(sub1, sub2) + + r.NoError(aggregateSub.Notify(context.Background(), eventID)) + r.Equal(eventID, <-sub1Ch) + r.Equal(eventID, <-sub2Ch) + + r.NoError(aggregateSub.Close()) + <-sub1Ch + <-sub2Ch +} + +func TestMapSub(t *testing.T) { + r := require.New(t) + sub, subCh := newTestSubscription() + + mappedSub := Map(func(eventID string) ids.ID { return ids.FromStringOrPanic(eventID) }, sub) + eventID := ids.GenerateTestID() + r.NoError(mappedSub.Notify(context.Background(), eventID.String())) + r.Equal(eventID, <-subCh) + r.NoError(mappedSub.Close()) + <-subCh +} diff --git a/vms/sdk/snow/block.go b/vms/sdk/snow/block.go new file mode 100644 index 000000000000..0c4390844e71 --- /dev/null +++ b/vms/sdk/snow/block.go @@ -0,0 +1,350 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/vms/sdk/event" +) + +var ( + _ snowman.Block = (*Block[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) + _ block.WithVerifyContext = (*Block[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) + + errParentFailedVerification = errors.New("parent failed verification") + errMismatchedPChainContext = errors.New("mismatched P-Chain context") +) + +type ConcreteBlock interface { + fmt.Stringer + GetID() ids.ID + GetParent() ids.ID + GetTimestamp() int64 + GetBytes() []byte + GetHeight() uint64 + // GetContext returns the P-Chain context of the block. + // May return nil if there is no P-Chain context, which + // should only occur prior to ProposerVM activation. + // This will be verified from the snow package, so that the + // inner chain can simply use its embedded context. + GetContext() *block.Context +} + +// Block implements snowman.Block and abstracts away the caching +// and block pinning required by the AvalancheGo Consensus engine. +// This converts the VM DevX from implementing the consensus engine specific invariants +// to implementing an input/output/accepted block type and handling the state transitions +// between these types. +// In conjunction with the AvalancheGo Consensus engine, this code guarantees that +// 1. Verify is always called against a verified parent +// 2. Accept is always called against a verified block +// 3. Reject is always called against a verified block +// +// Block additionally handles DynamicStateSync where blocks are vacuously +// verified/accepted to update a moving state sync target. +// After FinishStateSync is called, the snow package guarantees the same invariants +// as applied during normal consensus. +type Block[Input ConcreteBlock, Output ConcreteBlock, Accepted ConcreteBlock] struct { + Input Input + Output Output + verified bool + Accepted Accepted + accepted bool + + vm *VM[Input, Output, Accepted] +} + +func NewInputBlock[Input ConcreteBlock, Output ConcreteBlock, Accepted ConcreteBlock]( + vm *VM[Input, Output, Accepted], + input Input, +) *Block[Input, Output, Accepted] { + return &Block[Input, Output, Accepted]{ + Input: input, + vm: vm, + } +} + +func NewVerifiedBlock[Input ConcreteBlock, Output ConcreteBlock, Accepted ConcreteBlock]( + vm *VM[Input, Output, Accepted], + input Input, + output Output, +) *Block[Input, Output, Accepted] { + return &Block[Input, Output, Accepted]{ + Input: input, + Output: output, + verified: true, + vm: vm, + } +} + +func NewAcceptedBlock[Input ConcreteBlock, Output ConcreteBlock, Accepted ConcreteBlock]( + vm *VM[Input, Output, Accepted], + input Input, + output Output, + accepted Accepted, +) *Block[Input, Output, Accepted] { + return &Block[Input, Output, Accepted]{ + Input: input, + Output: output, + verified: true, + Accepted: accepted, + accepted: true, + vm: vm, + } +} + +func (b *Block[I, O, A]) setAccepted(output O, accepted A) { + b.Output = output + b.verified = true + b.Accepted = accepted + b.accepted = true +} + +// verify the block against the provided parent output and set the +// required Output/verified fields. +func (b *Block[I, O, A]) verify(ctx context.Context, parentOutput O) error { + output, err := b.vm.chain.VerifyBlock(ctx, parentOutput, b.Input) + if err != nil { + return err + } + b.Output = output + b.verified = true + return nil +} + +// accept the block and set the required Accepted/accepted fields. +// Assumes verify has already been called. +func (b *Block[I, O, A]) accept(ctx context.Context, parentAccepted A) error { + acceptedBlk, err := b.vm.chain.AcceptBlock(ctx, parentAccepted, b.Output) + if err != nil { + return err + } + b.Accepted = acceptedBlk + b.accepted = true + return nil +} + +func (*Block[I, O, A]) ShouldVerifyWithContext(context.Context) (bool, error) { + return true, nil +} + +func (b *Block[I, O, A]) VerifyWithContext(ctx context.Context, pChainCtx *block.Context) error { + return b.verifyWithContext(ctx, pChainCtx) +} + +func (b *Block[I, O, A]) Verify(ctx context.Context) error { + return b.verifyWithContext(ctx, nil) +} + +func (b *Block[I, O, A]) verifyWithContext(ctx context.Context, pChainCtx *block.Context) error { + b.vm.chainLock.Lock() + defer b.vm.chainLock.Unlock() + + start := time.Now() + defer func() { + b.vm.metrics.blockVerify.Observe(float64(time.Since(start))) + }() + + ready := b.vm.ready + ctx, span := b.vm.tracer.Start( + ctx, "Block.Verify", + trace.WithAttributes( + attribute.Int("size", len(b.Input.GetBytes())), + attribute.Int64("height", int64(b.Input.GetHeight())), + attribute.Bool("ready", ready), + attribute.Bool("built", b.verified), + ), + ) + defer span.End() + + switch { + case !ready: + // If the VM is not ready (dynamic state sync), skip verifying the block. + b.vm.log.Info( + "skipping block verification in dynamic state sync", + zap.Stringer("blk", b.Input), + ) + case b.verified: + // Defensive: verify the inner and wrapper block contexts match to ensure + // we don't build a block with a mismatched P-Chain context that will be + // invalid to peers. + innerCtx := b.Input.GetContext() + if err := verifyPChainCtx(pChainCtx, innerCtx); err != nil { + return err + } + + // If we built the block, the state will already be populated and we don't + // need to compute it (we assume that we built a correct block and it isn't + // necessary to re-verify). + b.vm.log.Info( + "skipping verification of locally built block", + zap.Stringer("blk", b), + ) + default: + b.vm.log.Info("Verifying block", + zap.Stringer("block", b), + ) + // Fetch my parent to verify against + parent, err := b.vm.GetBlock(ctx, b.Parent()) + if err != nil { + return err + } + + // If my parent has not been verified and we're no longer in dynamic state sync, + // then my parent must have failed verification during the transition to normal operation. + if !parent.verified { + return errParentFailedVerification + } + + // Verify the inner and wrapper block contexts match + innerCtx := b.Input.GetContext() + if err := verifyPChainCtx(pChainCtx, innerCtx); err != nil { + return err + } + if err := b.verify(ctx, parent.Output); err != nil { + return err + } + + if err := event.NotifyAll[O](ctx, b.Output, b.vm.verifiedSubs...); err != nil { + return err + } + } + + b.vm.verifiedL.Lock() + b.vm.verifiedBlocks[b.Input.GetID()] = b + b.vm.verifiedL.Unlock() + + return nil +} + +func verifyPChainCtx(providedCtx, innerCtx *block.Context) error { + switch { + case providedCtx == nil && innerCtx == nil: + return nil + case providedCtx == nil && innerCtx != nil: + return fmt.Errorf("%w: missing provided context != inner P-Chain height %d", errMismatchedPChainContext, innerCtx.PChainHeight) + case providedCtx != nil && innerCtx == nil: + return fmt.Errorf("%w: provided P-Chain height (%d) != missing inner context", errMismatchedPChainContext, providedCtx.PChainHeight) + case providedCtx.PChainHeight != innerCtx.PChainHeight: + return fmt.Errorf("%w: provided P-Chain height (%d) != inner P-Chain height %d", errMismatchedPChainContext, providedCtx.PChainHeight, innerCtx.PChainHeight) + default: + return nil + } +} + +// markAccepted marks the block and updates the required VM state. +// iff parent is non-nil, it will request the chain to Accept the block. +// The caller is responsible to provide the accepted parent if the VM is in a ready state. +func (b *Block[I, O, A]) markAccepted(ctx context.Context, parent *Block[I, O, A]) error { + if err := b.vm.inputChainIndex.UpdateLastAccepted(ctx, b.Input); err != nil { + return err + } + + if parent != nil { + if err := b.accept(ctx, parent.Accepted); err != nil { + return err + } + } + + b.vm.verifiedL.Lock() + delete(b.vm.verifiedBlocks, b.Input.GetID()) + b.vm.verifiedL.Unlock() + + b.vm.setLastAccepted(b) + + return b.notifyAccepted(ctx) +} + +func (b *Block[I, O, A]) notifyAccepted(ctx context.Context) error { + // If I was not actually marked accepted, notify pre ready subs + if !b.accepted { + return event.NotifyAll(ctx, b.Input, b.vm.preReadyAcceptedSubs...) + } + return event.NotifyAll(ctx, b.Accepted, b.vm.acceptedSubs...) +} + +// implements "snowman.Block.choices.Decidable" +func (b *Block[I, O, A]) Accept(ctx context.Context) error { + b.vm.chainLock.Lock() + defer b.vm.chainLock.Unlock() + + start := time.Now() + defer func() { + b.vm.metrics.blockAccept.Observe(float64(time.Since(start))) + }() + + ctx, span := b.vm.tracer.Start(ctx, "Block.Accept") + defer span.End() + + defer b.vm.log.Info("accepting block", zap.Stringer("block", b)) + + // If I'm not ready yet, mark myself as accepted, and return early. + isReady := b.vm.ready + if !isReady { + return b.markAccepted(ctx, nil) + } + + // If I'm ready and not verified, then I or my ancestor must have failed + // verification during the transition from dynamic state sync. This indicates + // an invalid block has been accepted, which should be prevented by consensus. + // If we hit this case, return a fatal error here. + if !b.verified { + return errParentFailedVerification + } + + // If I am verified and ready, fetch my parent and accept myself. I'm verified, which + // implies my parent is verified as well. + parent, err := b.vm.GetBlock(ctx, b.Parent()) + if err != nil { + return fmt.Errorf("failed to fetch parent while accepting verified block %s: %w", b, err) + } + return b.markAccepted(ctx, parent) +} + +// implements "snowman.Block.choices.Decidable" +func (b *Block[I, O, A]) Reject(ctx context.Context) error { + ctx, span := b.vm.tracer.Start(ctx, "Block.Reject") + defer span.End() + + b.vm.verifiedL.Lock() + delete(b.vm.verifiedBlocks, b.Input.GetID()) + b.vm.verifiedL.Unlock() + + // Notify subscribers about the rejected blocks that were vacuously verified during dynamic state sync + if !b.verified { + return event.NotifyAll[I](ctx, b.Input, b.vm.preRejectedSubs...) + } + + return event.NotifyAll[O](ctx, b.Output, b.vm.rejectedSubs...) +} + +// implements "snowman.Block" +func (b *Block[I, O, A]) ID() ids.ID { return b.Input.GetID() } +func (b *Block[I, O, A]) Parent() ids.ID { return b.Input.GetParent() } +func (b *Block[I, O, A]) Height() uint64 { return b.Input.GetHeight() } +func (b *Block[I, O, A]) Timestamp() time.Time { return time.UnixMilli(b.Input.GetTimestamp()) } +func (b *Block[I, O, A]) Bytes() []byte { return b.Input.GetBytes() } + +// Implements GetXXX for internal consistency +func (b *Block[I, O, A]) GetID() ids.ID { return b.Input.GetID() } +func (b *Block[I, O, A]) GetParent() ids.ID { return b.Input.GetParent() } +func (b *Block[I, O, A]) GetHeight() uint64 { return b.Input.GetHeight() } +func (b *Block[I, O, A]) GetTimestamp() int64 { return b.Input.GetTimestamp() } +func (b *Block[I, O, A]) GetBytes() []byte { return b.Input.GetBytes() } + +// implements "fmt.Stringer" +func (b *Block[I, O, A]) String() string { + return fmt.Sprintf("Block(Input = %s, verified = %t, accepted = %t)", b.Input, b.verified, b.accepted) +} diff --git a/vms/sdk/snow/chain_index.go b/vms/sdk/snow/chain_index.go new file mode 100644 index 000000000000..5ecc9ed48468 --- /dev/null +++ b/vms/sdk/snow/chain_index.go @@ -0,0 +1,166 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "fmt" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils" +) + +// ChainIndex defines the generic on-disk index for the Input block type required +// by the VM. +// ChainIndex must serve the last accepted block, it is up to the implementation +// how large of a window of accepted blocks to maintain in its index. +// The VM provides a consensus-aware caching layer on top of ChainIndex (FIFO to +// maintain cache of most useful data), so the implementation does not need to +// provide its own caching layer. +type ChainIndex[T ConcreteBlock] interface { + UpdateLastAccepted(ctx context.Context, blk T) error + GetLastAcceptedHeight(ctx context.Context) (uint64, error) + GetBlock(ctx context.Context, blkID ids.ID) (T, error) + GetBlockIDAtHeight(ctx context.Context, blkHeight uint64) (ids.ID, error) + GetBlockIDHeight(ctx context.Context, blkID ids.ID) (uint64, error) + GetBlockByHeight(ctx context.Context, blkHeight uint64) (T, error) +} + +func (v *VM[I, O, A]) makeConsensusIndex( + ctx context.Context, + chainIndex ChainIndex[I], + outputBlock O, + acceptedBlock A, + stateReady bool, +) error { + v.inputChainIndex = chainIndex + lastAcceptedHeight, err := v.inputChainIndex.GetLastAcceptedHeight(ctx) + if err != nil { + return err + } + inputBlock, err := v.inputChainIndex.GetBlockByHeight(ctx, lastAcceptedHeight) + if err != nil { + return err + } + + var lastAcceptedBlock *Block[I, O, A] + if stateReady { + v.ready = true + lastAcceptedBlock, err = v.reprocessFromOutputToInput(ctx, inputBlock, outputBlock, acceptedBlock) + if err != nil { + return err + } + } else { + v.ready = false + lastAcceptedBlock = NewInputBlock(v, inputBlock) + } + v.setLastAccepted(lastAcceptedBlock) + v.preferredBlkID = lastAcceptedBlock.ID() + v.consensusIndex = &ConsensusIndex[I, O, A]{v} + + return nil +} + +// GetConsensusIndex returns the consensus index exposed to the application. The consensus index is created during chain initialization +// and is exposed here for testing. +func (v *VM[I, O, A]) GetConsensusIndex() *ConsensusIndex[I, O, A] { + return v.consensusIndex +} + +// reprocessFromOutputToInput re-processes blocks from output/accepted to align with the supplied input block. +// assumes that outputBlock and acceptedBlock represent the same block and that all blocks in the range +// [output/accepted, input] have been added to the inputChainIndex. +func (v *VM[I, O, A]) reprocessFromOutputToInput(ctx context.Context, targetInputBlock I, outputBlock O, acceptedBlock A) (*Block[I, O, A], error) { + if targetInputBlock.GetHeight() < outputBlock.GetHeight() || outputBlock.GetID() != acceptedBlock.GetID() { + return nil, fmt.Errorf("invalid initial accepted state (Input = %s, Output = %s, Accepted = %s)", targetInputBlock, outputBlock, acceptedBlock) + } + + // Re-process from the last output block, to the last accepted input block + for targetInputBlock.GetHeight() > outputBlock.GetHeight() { + reprocessInputBlock, err := v.inputChainIndex.GetBlockByHeight(ctx, outputBlock.GetHeight()+1) + if err != nil { + return nil, err + } + + outputBlock, err = v.chain.VerifyBlock(ctx, outputBlock, reprocessInputBlock) + if err != nil { + return nil, err + } + acceptedBlock, err = v.chain.AcceptBlock(ctx, acceptedBlock, outputBlock) + if err != nil { + return nil, err + } + } + + return NewAcceptedBlock(v, targetInputBlock, outputBlock, acceptedBlock), nil +} + +// ConsensusIndex provides a wrapper around the VM, which enables the chain developer to share the +// caching layer provided by the VM and used in the consensus engine. +// The ConsensusIndex additionally provides access to the accepted/preferred frontier by providing +// accessors to the latest type of the frontier. +// ie. last accepted block is guaranteed to have Accepted type available, whereas the preferred block +// is only guaranteed to have the Output type available. +type ConsensusIndex[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock] struct { + vm *VM[I, O, A] +} + +func (c *ConsensusIndex[I, O, A]) GetBlockByHeight(ctx context.Context, height uint64) (I, error) { + blk, err := c.vm.GetBlockByHeight(ctx, height) + if err != nil { + return utils.Zero[I](), err + } + return blk.Input, nil +} + +func (c *ConsensusIndex[I, O, A]) GetBlock(ctx context.Context, blkID ids.ID) (I, error) { + blk, err := c.vm.GetBlock(ctx, blkID) + if err != nil { + return utils.Zero[I](), err + } + return blk.Input, nil +} + +// GetPreferredBlock returns the output block of the current preference. +// +// Prior to dynamic state sync, GetPreferredBlock will return an error because the preference +// will not have been verified. +// After completing dynamic state sync, all outstanding processing blocks will be verified. +// However, there's an edge case where the node may have vacuously verified an invalid block +// during dynamic state sync, such that the preferred block is invalid and its output is +// empty. +// Consensus should guarantee that we do not accept such a block even if it's preferred as +// long as a majority of validators are correct. +// After outstanding processing blocks have been Accepted/Rejected, the preferred block +// will be verified and the output will be available. +func (c *ConsensusIndex[I, O, A]) GetPreferredBlock(ctx context.Context) (O, error) { + c.vm.metaLock.Lock() + preference := c.vm.preferredBlkID + c.vm.metaLock.Unlock() + + blk, err := c.vm.GetBlock(ctx, preference) + if err != nil { + return utils.Zero[O](), err + } + if !blk.verified { + return utils.Zero[O](), fmt.Errorf("preferred block %s has not been verified", blk) + } + return blk.Output, nil +} + +// GetLastAccepted returns the last accepted block of the chain. +// +// If the chain is mid dynamic state sync, GetLastAccepted will return an error +// because the last accepted block will not be populated. +func (c *ConsensusIndex[I, O, A]) GetLastAccepted(context.Context) (A, error) { + c.vm.metaLock.Lock() + defer c.vm.metaLock.Unlock() + + lastAccepted := c.vm.lastAcceptedBlock + + if !lastAccepted.accepted { + return utils.Zero[A](), fmt.Errorf("last accepted block %s has not been populated", lastAccepted) + } + return lastAccepted.Accepted, nil +} diff --git a/vms/sdk/snow/config.go b/vms/sdk/snow/config.go new file mode 100644 index 000000000000..1ec882d6ebe7 --- /dev/null +++ b/vms/sdk/snow/config.go @@ -0,0 +1,39 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "github.com/ava-labs/avalanchego/trace" + "github.com/ava-labs/avalanchego/utils/profiler" + + "github.com/ava-labs/avalanchego/vms/sdk/context" +) + +const ( + SnowVMConfigKey = "snowvm" + TracerConfigKey = "tracer" + ContinuousProfilerKey = "continuousProfiler" +) + +type VMConfig struct { + AcceptedBlockWindowCache int `json:"acceptedBlockWindowCache"` +} + +func NewDefaultVMConfig() VMConfig { + return VMConfig{ + AcceptedBlockWindowCache: 128, + } +} + +func GetVMConfig(config context.Config) (VMConfig, error) { + return context.GetConfig(config, SnowVMConfigKey, NewDefaultVMConfig()) +} + +func GetProfilerConfig(config context.Config) (profiler.Config, error) { + return context.GetConfig(config, ContinuousProfilerKey, profiler.Config{Enabled: false}) +} + +func GetTracerConfig(config context.Config) (trace.Config, error) { + return context.GetConfig(config, TracerConfigKey, trace.Config{Enabled: false}) +} diff --git a/vms/sdk/snow/health.go b/vms/sdk/snow/health.go new file mode 100644 index 000000000000..7ba79022fc9c --- /dev/null +++ b/vms/sdk/snow/health.go @@ -0,0 +1,113 @@ +// Copyright (C) 2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/ava-labs/avalanchego/api/health" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/set" +) + +const ( + vmReadinessHealthChecker = "snowVMReady" + unresolvedBlocksHealthChecker = "snowUnresolvedBlocks" +) + +var ( + errUnresolvedBlocks = errors.New("unresolved invalid blocks in processing") + errVMNotReady = errors.New("vm not ready") + + _ health.Checker = (*vmReadinessHealthCheck)(nil) + _ health.Checker = (*unresolvedBlockHealthCheck[ConcreteBlock])(nil) + _ health.Checker = (*VM[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) +) + +func (v *VM[I, O, A]) HealthCheck(ctx context.Context) (any, error) { + var ( + details = make(map[string]any) + errs []error + ) + + v.healthCheckers.Range(func(k, v any) bool { + name := k.(string) + checker := v.(health.Checker) + checkerDetails, err := checker.HealthCheck(ctx) + + details[name] = checkerDetails + errs = append(errs, err) + return true + }) + + return details, errors.Join(errs...) +} + +func (v *VM[I, O, A]) RegisterHealthChecker(name string, healthChecker health.Checker) error { + if _, ok := v.healthCheckers.LoadOrStore(name, healthChecker); ok { + return fmt.Errorf("duplicate health checker for %s", name) + } + + return nil +} + +func (v *VM[I, O, A]) initHealthCheckers() error { + vmReadiness := newVMReadinessHealthCheck(func() bool { + return v.ready + }) + return v.RegisterHealthChecker(vmReadinessHealthChecker, vmReadiness) +} + +// vmReadinessHealthCheck marks itself as ready iff the VM is in normal operation. +// ie. has the full state required to process new blocks from tip. +type vmReadinessHealthCheck struct { + isReady func() bool +} + +func newVMReadinessHealthCheck(isReady func() bool) *vmReadinessHealthCheck { + return &vmReadinessHealthCheck{isReady: isReady} +} + +func (v *vmReadinessHealthCheck) HealthCheck(_ context.Context) (any, error) { + ready := v.isReady() + if !ready { + return ready, errVMNotReady + } + return ready, nil +} + +// unresolvedBlockHealthCheck implements [health.Checker] by providing a health check which reports healthy +// after any blocks that failed verification during the transition from dynamic state sync to normal operation +// have been cleared from the processing set. +type unresolvedBlockHealthCheck[I ConcreteBlock] struct { + lock sync.RWMutex + unresolvedBlocks set.Set[ids.ID] +} + +func newUnresolvedBlocksHealthCheck[I ConcreteBlock](unresolvedBlkIDs set.Set[ids.ID]) *unresolvedBlockHealthCheck[I] { + return &unresolvedBlockHealthCheck[I]{ + unresolvedBlocks: unresolvedBlkIDs, + } +} + +func (u *unresolvedBlockHealthCheck[I]) Resolve(blkID ids.ID) { + u.lock.Lock() + defer u.lock.Unlock() + + u.unresolvedBlocks.Remove(blkID) +} + +func (u *unresolvedBlockHealthCheck[I]) HealthCheck(_ context.Context) (any, error) { + u.lock.RLock() + defer u.lock.RUnlock() + + unresolvedBlocks := u.unresolvedBlocks.Len() + if unresolvedBlocks > 0 { + return unresolvedBlocks, fmt.Errorf("%w: %d", errUnresolvedBlocks, unresolvedBlocks) + } + return unresolvedBlocks, nil +} diff --git a/vms/sdk/snow/input_covariant_vm.go b/vms/sdk/snow/input_covariant_vm.go new file mode 100644 index 000000000000..646972b4bc48 --- /dev/null +++ b/vms/sdk/snow/input_covariant_vm.go @@ -0,0 +1,48 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + + "github.com/ava-labs/avalanchego/ids" +) + +// InputCovariantVM provides basic VM functionality that only returns the input block type +// Since Input is the only field guaranteed to be populated for each consensus block, we provide +// this wrapper for convenience. +type InputCovariantVM[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock] struct { + vm *VM[I, O, A] +} + +func (v *InputCovariantVM[I, O, A]) GetBlock(ctx context.Context, blkID ids.ID) (I, error) { + blk, err := v.vm.GetBlock(ctx, blkID) + if err != nil { + var emptyI I + return emptyI, err + } + return blk.Input, nil +} + +func (v *InputCovariantVM[I, O, A]) GetBlockByHeight(ctx context.Context, height uint64) (I, error) { + blk, err := v.vm.GetBlockByHeight(ctx, height) + if err != nil { + var emptyI I + return emptyI, err + } + return blk.Input, nil +} + +func (v *InputCovariantVM[I, O, A]) ParseBlock(ctx context.Context, bytes []byte) (I, error) { + blk, err := v.vm.ParseBlock(ctx, bytes) + if err != nil { + var emptyI I + return emptyI, err + } + return blk.Input, nil +} + +func (v *InputCovariantVM[I, O, A]) LastAcceptedBlock(ctx context.Context) I { + return v.vm.LastAcceptedBlock(ctx).Input +} diff --git a/vms/sdk/snow/metrics.go b/vms/sdk/snow/metrics.go new file mode 100644 index 000000000000..eff2eaead6f2 --- /dev/null +++ b/vms/sdk/snow/metrics.go @@ -0,0 +1,63 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "github.com/ava-labs/avalanchego/utils/metric" + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + blockBuild metric.Averager + blockParse metric.Averager + blockVerify metric.Averager + blockAccept metric.Averager +} + +func newMetrics(r *prometheus.Registry) (*metrics, error) { + blockBuild, err := metric.NewAverager( + "block_build", + "time spent building blocks", + r, + ) + if err != nil { + return nil, err + } + blockParse, err := metric.NewAverager( + "block_parse", + "time spent parsing blocks", + r, + ) + if err != nil { + return nil, err + } + blockVerify, err := metric.NewAverager( + "block_verify", + "time spent verifying blocks", + r, + ) + if err != nil { + return nil, err + } + blockAccept, err := metric.NewAverager( + "block_accept", + "time spent accepting blocks", + r, + ) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + + m := &metrics{ + blockBuild: blockBuild, + blockParse: blockParse, + blockVerify: blockVerify, + blockAccept: blockAccept, + } + + return m, nil +} diff --git a/vms/sdk/snow/network.go b/vms/sdk/snow/network.go new file mode 100644 index 000000000000..5fa4b7b29a73 --- /dev/null +++ b/vms/sdk/snow/network.go @@ -0,0 +1,37 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/version" +) + +func (v *VM[I, O, A]) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error { + return v.network.AppRequest(ctx, nodeID, requestID, deadline, request) +} + +func (v *VM[I, O, A]) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { + return v.network.AppResponse(ctx, nodeID, requestID, response) +} + +func (v *VM[I, O, A]) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32, appErr *common.AppError) error { + return v.network.AppRequestFailed(ctx, nodeID, requestID, appErr) +} + +func (v *VM[I, O, A]) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error { + return v.network.AppGossip(ctx, nodeID, msg) +} + +func (v *VM[I, O, A]) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { + return v.network.Connected(ctx, nodeID, nodeVersion) +} + +func (v *VM[I, O, A]) Disconnected(ctx context.Context, nodeID ids.NodeID) error { + return v.network.Disconnected(ctx, nodeID) +} diff --git a/vms/sdk/snow/snow_vm.go b/vms/sdk/snow/snow_vm.go new file mode 100644 index 000000000000..aa5a88653613 --- /dev/null +++ b/vms/sdk/snow/snow_vm.go @@ -0,0 +1,45 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/consensus/snowman" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" +) + +var ( + _ block.ChainVM = (*SnowVM[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) + _ block.StateSyncableVM = (*SnowVM[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) + _ block.BuildBlockWithContextChainVM = (*SnowVM[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) +) + +// SnowVM wraps the VM and completes the implementation of block.ChainVM by providing +// alternative block handler functions that provide the snowman.Block type to the +// consensus engine. +type SnowVM[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock] struct { + *VM[I, O, A] +} + +func NewSnowVM[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock](version string, chain ConcreteVM[I, O, A]) *SnowVM[I, O, A] { + return &SnowVM[I, O, A]{VM: NewVM(version, chain)} +} + +func (v *SnowVM[I, O, A]) GetBlock(ctx context.Context, blkID ids.ID) (snowman.Block, error) { + return v.VM.GetBlock(ctx, blkID) +} + +func (v *SnowVM[I, O, A]) ParseBlock(ctx context.Context, bytes []byte) (snowman.Block, error) { + return v.VM.ParseBlock(ctx, bytes) +} + +func (v *SnowVM[I, O, A]) BuildBlock(ctx context.Context) (snowman.Block, error) { + return v.VM.BuildBlock(ctx) +} + +func (v *SnowVM[I, O, A]) BuildBlockWithContext(ctx context.Context, blockContext *block.Context) (snowman.Block, error) { + return v.VM.BuildBlockWithContext(ctx, blockContext) +} diff --git a/vms/sdk/snow/statesync.go b/vms/sdk/snow/statesync.go new file mode 100644 index 000000000000..0e313f9033b3 --- /dev/null +++ b/vms/sdk/snow/statesync.go @@ -0,0 +1,156 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/utils/set" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/vms/sdk/event" +) + +var _ block.StateSyncableVM = (*VM[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) + +func (v *VM[I, O, A]) SetStateSyncableVM(stateSyncableVM block.StateSyncableVM) { + v.stateSyncableVM = stateSyncableVM +} + +// StartStateSync notifies the VM to enter DynamicStateSync mode. +// The caller is responsible to eventually call FinishStateSync with a fully populated +// last accepted state. +func (v *VM[I, O, A]) StartStateSync(ctx context.Context, block I) error { + if err := v.inputChainIndex.UpdateLastAccepted(ctx, block); err != nil { + return err + } + v.ready = false + v.setLastAccepted(NewInputBlock(v, block)) + return nil +} + +// FinishStateSync completes dynamic state sync mode and sets the last accepted block to +// the given input/output/accepted value. +func (v *VM[I, O, A]) FinishStateSync(ctx context.Context, input I, output O, accepted A) error { + v.chainLock.Lock() + defer v.chainLock.Unlock() + + // Cannot call FinishStateSync if already marked as ready and in normal operation + if v.ready { + return fmt.Errorf("can't finish dynamic state sync from normal operation: %s", input) + } + + // If the block is already the last accepted block, update the fields and return + if input.GetID() == v.lastAcceptedBlock.GetID() { + v.lastAcceptedBlock.setAccepted(output, accepted) + v.log.Info("Finishing state sync with original target", zap.Stringer("lastAcceptedBlock", v.lastAcceptedBlock)) + } else { + v.log.Info("Finishing state sync with target behind last accepted tip", + zap.Stringer("target", input), + zap.Stringer("lastAcceptedBlock", v.lastAcceptedBlock.Input), + ) + start := time.Now() + // Dynamic state sync notifies completion async, so the engine may continue to process/accept new blocks + // before we grab chainLock. + // This means we must reprocess blocks from the target state sync finished on to the updated last + // accepted block. + updatedLastAccepted, err := v.reprocessFromOutputToInput(ctx, v.lastAcceptedBlock.Input, output, accepted) + if err != nil { + return fmt.Errorf("failed to finish state sync while reprocessing to last accepted tip: %w", err) + } + v.setLastAccepted(updatedLastAccepted) + v.log.Info("Finished reprocessing blocks", zap.Duration("duration", time.Since(start))) + } + + if err := v.verifyProcessingBlocks(ctx); err != nil { + return err + } + + v.ready = true + return nil +} + +func (v *VM[I, O, A]) verifyProcessingBlocks(ctx context.Context) error { + // Sort processing blocks by height + v.verifiedL.Lock() + v.log.Info("Verifying processing blocks after state sync", zap.Int("numBlocks", len(v.verifiedBlocks))) + processingBlocks := make([]*Block[I, O, A], 0, len(v.verifiedBlocks)) + for _, blk := range v.verifiedBlocks { + processingBlocks = append(processingBlocks, blk) + } + v.verifiedL.Unlock() + slices.SortFunc(processingBlocks, func(a *Block[I, O, A], b *Block[I, O, A]) int { + switch { + case a.Height() < b.Height(): + return -1 + case a.Height() == b.Height(): + return 0 + default: + return 1 + } + }) + + // Verify each block in order. An error here is not fatal because we may have vacuously verified blocks. + // Therefore, if a block's parent has not already been verified, it invalidates all subsequent children + // and we can safely drop the error here. + invalidBlkIDs := set.NewSet[ids.ID](0) + for _, blk := range processingBlocks { + parent, err := v.GetBlock(ctx, blk.Parent()) + if err != nil { + return fmt.Errorf("failed to fetch parent block %s while verifying processing block %s after state sync: %w", blk.Parent(), blk, err) + } + // the parent failed verification and this block is transitively invalid, + // we are marking this block as unresolved + if !parent.verified { + v.log.Warn("Parent block not verified, skipping verification of processing block", + zap.Stringer("parent", parent), + zap.Stringer("block", blk), + ) + invalidBlkIDs.Add(blk.ID()) + continue + } + if err := blk.verify(ctx, parent.Output); err != nil { + invalidBlkIDs.Add(blk.ID()) + v.log.Warn("Failed to verify processing block after state sync", zap.Stringer("block", blk), zap.Error(err)) + } + } + + unresolvedBlkCheck := newUnresolvedBlocksHealthCheck[I](invalidBlkIDs) + v.AddPreRejectedSub(event.SubscriptionFunc[I]{ + NotifyF: func(_ context.Context, input I) error { + unresolvedBlkCheck.Resolve(input.GetID()) + return nil + }, + }) + if err := v.RegisterHealthChecker(unresolvedBlocksHealthChecker, unresolvedBlkCheck); err != nil { + return err + } + + return nil +} + +func (v *VM[I, O, A]) StateSyncEnabled(ctx context.Context) (bool, error) { + return v.stateSyncableVM.StateSyncEnabled(ctx) +} + +func (v *VM[I, O, A]) GetOngoingSyncStateSummary(ctx context.Context) (block.StateSummary, error) { + return v.stateSyncableVM.GetOngoingSyncStateSummary(ctx) +} + +func (v *VM[I, O, A]) GetLastStateSummary(ctx context.Context) (block.StateSummary, error) { + return v.stateSyncableVM.GetLastStateSummary(ctx) +} + +func (v *VM[I, O, A]) ParseStateSummary(ctx context.Context, summaryBytes []byte) (block.StateSummary, error) { + return v.stateSyncableVM.ParseStateSummary(ctx, summaryBytes) +} + +func (v *VM[I, O, A]) GetStateSummary(ctx context.Context, summaryHeight uint64) (block.StateSummary, error) { + return v.stateSyncableVM.GetStateSummary(ctx, summaryHeight) +} diff --git a/vms/sdk/snow/vm.go b/vms/sdk/snow/vm.go new file mode 100644 index 000000000000..548c9013dd93 --- /dev/null +++ b/vms/sdk/snow/vm.go @@ -0,0 +1,541 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "time" + + apimetrics "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/trace" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/profiler" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/vms/sdk/event" + + "github.com/ava-labs/avalanchego/cache" + hcontext "github.com/ava-labs/avalanchego/vms/sdk/context" +) + +var _ block.StateSyncableVM = (*VM[ConcreteBlock, ConcreteBlock, ConcreteBlock])(nil) + +type ChainInput struct { + SnowCtx *snow.Context + GenesisBytes, UpgradeBytes []byte + ToEngine chan<- common.Message + Shutdown <-chan struct{} + Tracer trace.Tracer + Config hcontext.Config +} + +// ConcreteVM provides a reduced interface for VM developers using golang generics and the exact +// AvalancheGo consensus invariants to provide VM developers EXACTLY what the information that is +// guaranteed to be available during each state transition it needs to handle. +// The consensus engine can see blocks in four different states: +// - bytes +// - Input - parsed, but not verified in consensus, no guarantee of state availability +// - Output - parsed and verified successfully, will eventually be either Accepted or Rejected +// - Accepted - +// +// Finally, the snow package breaks down the VM interface from a single conglomerate type to expose +// all VM functionality to AvalancheGo into setters, so that the ConcreteVM can override the +// defaults where desired, but does not need to supply boilerplate where it can rely on a default +// behavior. +type ConcreteVM[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock] interface { + // Initialize the chain, optionally configures the VM via vm, and returns + // a persistent index of the chain's input block type, the last output and accetped block, + // and whether or not the VM currently has a valid state. + // If stateReady is false, the VM must be mid-state sync, such that it does not have a valid + // last output or accepted block. + Initialize( + ctx context.Context, + chainInput ChainInput, + vm *VM[I, O, A], + ) (inputChainIndex ChainIndex[I], lastOutput O, lastAccepted A, stateReady bool, err error) + // SetConsensusIndex sets the ChainIndex[I, O, A] on the VM to provide the + // VM with: + // 1. A cached index of the chain + // 2. The ability to fetch the latest consensus state (preferred output block and last accepted block) + SetConsensusIndex(consensusIndex *ConsensusIndex[I, O, A]) + // BuildBlock returns a new input and output block built on top of the provided parent. + // The provided parent will be the current preference of the consensus engine. + BuildBlock(ctx context.Context, blockContext *block.Context, parent O) (I, O, error) + // ParseBlock parses the provided bytes into an input block. + ParseBlock(ctx context.Context, bytes []byte) (I, error) + // VerifyBlock verifies the provided block is valid given its already verified parent + // and returns the resulting output of executing the block. + VerifyBlock( + ctx context.Context, + parent O, + block I, + ) (O, error) + // AcceptBlock marks block as accepted and returns the resulting Accepted block type. + // AcceptBlock is guaranteed to be called after the input block has been persisted + // to disk. + AcceptBlock(ctx context.Context, acceptedParent A, block O) (A, error) +} + +type namedCloser struct { + name string + close func() error +} + +type VM[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock] struct { + handlers map[string]http.Handler + network *p2p.Network + stateSyncableVM block.StateSyncableVM + closers []namedCloser + + // onNormalOperationsStarted contains callbacks that execute when the VM transitions + // to normal operation, either after bootstrapping or state sync completion. + onNormalOperationsStarted []func(context.Context) error + + // onBootstrapStarted contains callbacks that execute when the VM begins + // bootstrapping state from the network. + onBootstrapStarted []func(context.Context) error + + // onStateSyncStarted contains callbacks that execute when the VM begins + // state sync to synchronize with the network's state. + onStateSyncStarted []func(context.Context) error + + verifiedSubs []event.Subscription[O] + rejectedSubs []event.Subscription[O] + acceptedSubs []event.Subscription[A] + preReadyAcceptedSubs []event.Subscription[I] + // preRejectedSubs handles rejections of I (Input) during/after state sync, before they reach O (Output) state + preRejectedSubs []event.Subscription[I] + version string + + // chainLock provides a synchronization point between state sync and normal operation. + // To complete dynamic state sync, we must: + // 1. Accept a sequence of blocks from the final state sync target to the last accepted block + // 2. Re-process all outstanding processing blocks + // 3. Mark the VM as ready for normal operation + // + // During this time, we must not allow any new blocks to be verified/accepted. + chainLock sync.Mutex + chain ConcreteVM[I, O, A] + ready bool + + inputChainIndex ChainIndex[I] + consensusIndex *ConsensusIndex[I, O, A] + + snowCtx *snow.Context + hconfig hcontext.Config + vmConfig VMConfig + + // Each element is a block that passed verification but + // hasn't yet been accepted/rejected + verifiedL sync.RWMutex + verifiedBlocks map[ids.ID]*Block[I, O, A] + + // We store the last [AcceptedBlockWindowCache] blocks in memory + // to avoid reading blocks from disk. + acceptedBlocksByID *cache.FIFO[ids.ID, *Block[I, O, A]] + acceptedBlocksByHeight *cache.FIFO[uint64, ids.ID] + + metaLock sync.Mutex + lastAcceptedBlock *Block[I, O, A] + preferredBlkID ids.ID + + metrics *metrics + log logging.Logger + tracer trace.Tracer + + shutdownChan chan struct{} + + healthCheckers sync.Map +} + +func NewVM[I ConcreteBlock, O ConcreteBlock, A ConcreteBlock](version string, chain ConcreteVM[I, O, A]) *VM[I, O, A] { + return &VM[I, O, A]{ + handlers: make(map[string]http.Handler), + stateSyncableVM: block.StateSyncableVMDisabled{}, + version: version, + chain: chain, + } +} + +func (v *VM[I, O, A]) Initialize( + ctx context.Context, + chainCtx *snow.Context, + _ database.Database, + genesisBytes []byte, + upgradeBytes []byte, + configBytes []byte, + toEngine chan<- common.Message, + _ []*common.Fx, + appSender common.AppSender, +) error { + v.snowCtx = chainCtx + v.shutdownChan = make(chan struct{}) + + hconfig, err := hcontext.NewConfig(configBytes) + if err != nil { + return fmt.Errorf("failed to create hypersdk config: %w", err) + } + v.hconfig = hconfig + tracerConfig, err := GetTracerConfig(hconfig) + if err != nil { + return fmt.Errorf("failed to fetch tracer config: %w", err) + } + tracer, err := trace.New(tracerConfig) + if err != nil { + return err + } + v.tracer = tracer + ctx, span := v.tracer.Start(ctx, "VM.Initialize") + defer span.End() + + v.vmConfig, err = GetVMConfig(hconfig) + if err != nil { + return fmt.Errorf("failed to parse vm config: %w", err) + } + + defaultRegistry, err := apimetrics.MakeAndRegister(v.snowCtx.Metrics, "snow") + if err != nil { + return err + } + metrics, err := newMetrics(defaultRegistry) + if err != nil { + return err + } + v.metrics = metrics + v.log = chainCtx.Log + + continuousProfilerConfig, err := GetProfilerConfig(hconfig) + if err != nil { + return fmt.Errorf("failed to parse continuous profiler config: %w", err) + } + if continuousProfilerConfig.Enabled { + continuousProfiler := profiler.NewContinuous( + continuousProfilerConfig.Dir, + continuousProfilerConfig.Freq, + continuousProfilerConfig.MaxNumFiles, + ) + v.addCloser("continuous profiler", func() error { + continuousProfiler.Shutdown() + return nil + }) + go continuousProfiler.Dispatch() //nolint:errcheck + } + + v.network, err = p2p.NewNetwork(v.log, appSender, defaultRegistry, "p2p") + if err != nil { + return fmt.Errorf("failed to initialize p2p: %w", err) + } + + acceptedBlocksByIDCache, err := cache.NewFIFO[ids.ID, *Block[I, O, A]](v.vmConfig.AcceptedBlockWindowCache) + if err != nil { + return err + } + v.acceptedBlocksByID = acceptedBlocksByIDCache + acceptedBlocksByHeightCache, err := cache.NewFIFO[uint64, ids.ID](v.vmConfig.AcceptedBlockWindowCache) + if err != nil { + return err + } + v.acceptedBlocksByHeight = acceptedBlocksByHeightCache + v.verifiedBlocks = make(map[ids.ID]*Block[I, O, A]) + + chainInput := ChainInput{ + SnowCtx: chainCtx, + GenesisBytes: genesisBytes, + UpgradeBytes: upgradeBytes, + ToEngine: toEngine, + Shutdown: v.shutdownChan, + Tracer: v.tracer, + Config: v.hconfig, + } + + inputChainIndex, lastOutput, lastAccepted, stateReady, err := v.chain.Initialize( + ctx, + chainInput, + v, + ) + if err != nil { + return err + } + v.inputChainIndex = inputChainIndex + if err := v.makeConsensusIndex(ctx, v.inputChainIndex, lastOutput, lastAccepted, stateReady); err != nil { + return err + } + v.chain.SetConsensusIndex(v.consensusIndex) + if err := v.lastAcceptedBlock.notifyAccepted(ctx); err != nil { + return fmt.Errorf("failed to notify last accepted on startup: %w", err) + } + + if err := v.initHealthCheckers(); err != nil { + return err + } + + return nil +} + +func (v *VM[I, O, A]) setLastAccepted(lastAcceptedBlock *Block[I, O, A]) { + v.metaLock.Lock() + defer v.metaLock.Unlock() + + v.lastAcceptedBlock = lastAcceptedBlock + v.acceptedBlocksByHeight.Put(v.lastAcceptedBlock.Height(), v.lastAcceptedBlock.ID()) + v.acceptedBlocksByID.Put(v.lastAcceptedBlock.ID(), v.lastAcceptedBlock) +} + +func (v *VM[I, O, A]) getBlockFromCache(blkID ids.ID) (*Block[I, O, A], bool) { + if blk, ok := v.acceptedBlocksByID.Get(blkID); ok { + return blk, true + } + + v.verifiedL.RLock() + defer v.verifiedL.RUnlock() + if blk, exists := v.verifiedBlocks[blkID]; exists { + return blk, true + } + return nil, false +} + +func (v *VM[I, O, A]) GetBlock(ctx context.Context, blkID ids.ID) (*Block[I, O, A], error) { + ctx, span := v.tracer.Start(ctx, "VM.GetBlock") + defer span.End() + + if blk, ok := v.getBlockFromCache(blkID); ok { + return blk, nil + } + + // Retrieve and parse from disk + // Note: this returns an accepted block with only the input block set. + // The consensus engine guarantees that: + // 1. Verify is only called on a block whose parent is lastAcceptedBlock or in verifiedBlocks + // 2. Accept is only called on a block whose parent is lastAcceptedBlock + blk, err := v.inputChainIndex.GetBlock(ctx, blkID) + if err != nil { + return nil, err + } + return NewInputBlock(v, blk), nil +} + +func (v *VM[I, O, A]) GetBlockByHeight(ctx context.Context, height uint64) (*Block[I, O, A], error) { + ctx, span := v.tracer.Start(ctx, "VM.GetBlockByHeight") + defer span.End() + + if v.lastAcceptedBlock.Height() == height { + return v.lastAcceptedBlock, nil + } + var blkID ids.ID + if fetchedBlkID, ok := v.acceptedBlocksByHeight.Get(height); ok { + blkID = fetchedBlkID + } else { + fetchedBlkID, err := v.inputChainIndex.GetBlockIDAtHeight(ctx, height) + if err != nil { + return nil, err + } + blkID = fetchedBlkID + } + + if blk, ok := v.acceptedBlocksByID.Get(blkID); ok { + return blk, nil + } + + return v.GetBlock(ctx, blkID) +} + +func (v *VM[I, O, A]) ParseBlock(ctx context.Context, bytes []byte) (*Block[I, O, A], error) { + ctx, span := v.tracer.Start(ctx, "VM.ParseBlock") + defer span.End() + + start := time.Now() + defer func() { + v.metrics.blockParse.Observe(float64(time.Since(start))) + }() + + inputBlk, err := v.chain.ParseBlock(ctx, bytes) + if err != nil { + return nil, err + } + + // If the block is pinned/cached, return the uniquified block + if blk, ok := v.getBlockFromCache(inputBlk.GetID()); ok { + return blk, nil + } + + return NewInputBlock(v, inputBlk), nil +} + +func (v *VM[I, O, A]) BuildBlockWithContext(ctx context.Context, blockCtx *block.Context) (*Block[I, O, A], error) { + return v.buildBlock(ctx, blockCtx) +} + +func (v *VM[I, O, A]) BuildBlock(ctx context.Context) (*Block[I, O, A], error) { + return v.buildBlock(ctx, nil) +} + +func (v *VM[I, O, A]) buildBlock(ctx context.Context, blockCtx *block.Context) (*Block[I, O, A], error) { + v.chainLock.Lock() + defer v.chainLock.Unlock() + + ctx, span := v.tracer.Start(ctx, "VM.BuildBlock") + defer span.End() + + start := time.Now() + defer func() { + v.metrics.blockBuild.Observe(float64(time.Since(start))) + }() + + preferredBlk, err := v.GetBlock(ctx, v.preferredBlkID) + if err != nil { + return nil, fmt.Errorf("failed to get preferred block %s to build: %w", v.preferredBlkID, err) + } + // This is the only place where we call in to the chain while holding a lock + // and it's also unclear if we need to hold the lock here? + inputBlock, outputBlock, err := v.chain.BuildBlock(ctx, blockCtx, preferredBlk.Output) + if err != nil { + return nil, err + } + sb := NewVerifiedBlock[I, O, A](v, inputBlock, outputBlock) + return sb, nil +} + +func (v *VM[I, O, A]) LastAcceptedBlock(_ context.Context) *Block[I, O, A] { + return v.lastAcceptedBlock +} + +func (v *VM[I, O, A]) GetBlockIDAtHeight(ctx context.Context, blkHeight uint64) (ids.ID, error) { + ctx, span := v.tracer.Start(ctx, "VM.GetBlockIDAtHeight") + defer span.End() + + if blkHeight == v.lastAcceptedBlock.Height() { + return v.lastAcceptedBlock.ID(), nil + } + if blkID, ok := v.acceptedBlocksByHeight.Get(blkHeight); ok { + return blkID, nil + } + return v.inputChainIndex.GetBlockIDAtHeight(ctx, blkHeight) +} + +func (v *VM[I, O, A]) SetPreference(_ context.Context, blkID ids.ID) error { + v.metaLock.Lock() + defer v.metaLock.Unlock() + + v.preferredBlkID = blkID + return nil +} + +func (v *VM[I, O, A]) LastAccepted(context.Context) (ids.ID, error) { + return v.lastAcceptedBlock.ID(), nil +} + +func (v *VM[I, O, A]) SetState(ctx context.Context, state snow.State) error { + v.log.Info("Setting state to %s", zap.Stringer("state", state)) + switch state { + case snow.StateSyncing: + for _, startStateSyncF := range v.onStateSyncStarted { + if err := startStateSyncF(ctx); err != nil { + return err + } + } + return nil + case snow.Bootstrapping: + for _, startBootstrappingF := range v.onBootstrapStarted { + if err := startBootstrappingF(ctx); err != nil { + return err + } + } + return nil + case snow.NormalOp: + for _, startNormalOpF := range v.onNormalOperationsStarted { + if err := startNormalOpF(ctx); err != nil { + return err + } + } + return nil + default: + return snow.ErrUnknownState + } +} + +func (v *VM[I, O, A]) CreateHandlers(_ context.Context) (map[string]http.Handler, error) { + return v.handlers, nil +} + +func (v *VM[I, O, A]) Shutdown(context.Context) error { + v.log.Info("Shutting down VM") + close(v.shutdownChan) + + errs := make([]error, len(v.closers)) + for i, closer := range v.closers { + v.log.Info("Shutting down service", zap.String("service", closer.name)) + start := time.Now() + errs[i] = closer.close() + v.log.Info("Finished shutting down service", zap.String("service", closer.name), zap.Duration("duration", time.Since(start))) + } + return errors.Join(errs...) +} + +func (v *VM[I, O, A]) Version(context.Context) (string, error) { + return v.version, nil +} + +func (v *VM[I, O, A]) addCloser(name string, closer func() error) { + v.closers = append(v.closers, namedCloser{name, closer}) +} + +func (v *VM[I, O, A]) GetInputCovariantVM() *InputCovariantVM[I, O, A] { + return &InputCovariantVM[I, O, A]{vm: v} +} + +func (v *VM[I, O, A]) GetNetwork() *p2p.Network { + return v.network +} + +func (v *VM[I, O, A]) AddAcceptedSub(sub ...event.Subscription[A]) { + v.acceptedSubs = append(v.acceptedSubs, sub...) +} + +func (v *VM[I, O, A]) AddRejectedSub(sub ...event.Subscription[O]) { + v.rejectedSubs = append(v.rejectedSubs, sub...) +} + +func (v *VM[I, O, A]) AddVerifiedSub(sub ...event.Subscription[O]) { + v.verifiedSubs = append(v.verifiedSubs, sub...) +} + +func (v *VM[I, O, A]) AddPreReadyAcceptedSub(sub ...event.Subscription[I]) { + v.preReadyAcceptedSubs = append(v.preReadyAcceptedSubs, sub...) +} + +// AddPreRejectedSub adds subscriptions tracking rejected blocks that were +// vacuously verified during state sync before the VM had the state to verify them +func (v *VM[I, O, A]) AddPreRejectedSub(sub ...event.Subscription[I]) { + v.preRejectedSubs = append(v.preRejectedSubs, sub...) +} + +func (v *VM[I, O, A]) AddHandler(name string, handler http.Handler) { + v.handlers[name] = handler +} + +func (v *VM[I, O, A]) AddCloser(name string, closer func() error) { + v.addCloser(name, closer) +} + +// AddStateSyncStarter registers a callback that will be executed when the engine invokes SetState(snow.StateSyncing) +// i.e., when it's in state sync operation +func (v *VM[I, O, A]) AddStateSyncStarter(onStateSyncStarted ...func(context.Context) error) { + v.onStateSyncStarted = append(v.onStateSyncStarted, onStateSyncStarted...) +} + +// AddNormalOpStarter registers a callback that will be executed when the engine invokes SetState(snow.NormalOp) +// i.e., transitioning from state sync / bootstrapping to normal operation. +func (v *VM[I, O, A]) AddNormalOpStarter(onNormalOpStartedF ...func(context.Context) error) { + v.onNormalOperationsStarted = append(v.onNormalOperationsStarted, onNormalOpStartedF...) +} diff --git a/vms/sdk/snow/vm_test.go b/vms/sdk/snow/vm_test.go new file mode 100644 index 000000000000..71ed9da1ea0d --- /dev/null +++ b/vms/sdk/snow/vm_test.go @@ -0,0 +1,1101 @@ +// Copyright (C) 2024, Ava Labs, Inv. All rights reserved. +// See the file LICENSE for licensing terms. + +package snow + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math" + "math/rand" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/enginetest" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/snowtest" + "github.com/ava-labs/avalanchego/utils" + "github.com/ava-labs/avalanchego/utils/hashing" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + + "github.com/ava-labs/avalanchego/vms/sdk/chainindex" +) + +var ( + _ ConcreteBlock = (*TestBlock)(nil) + _ ConcreteVM[*TestBlock, *TestBlock, *TestBlock] = (*TestChain)(nil) +) + +var errVerifyInvalidBlock = errors.New("verified invalid block") + +const ( + testVersion = "v0.0.1" + blockStringerF = "(BlockID = %s, ParentID = %s, Timestamp = %d, Height = %d, RandomData = %x, Invalid = %t)" +) + +type TestBlock struct { + PrntID ids.ID `json:"parentID"` + Tmstmp int64 `json:"timestamp"` + Hght uint64 `json:"height"` + // RandomData is used to uniquify blocks given there's no state or application data + // included in the tests otherwise. + RandomData []byte `json:"randomData"` + + // Invalid marks a block that should return an error during execution. + // This should make it easy to construct a block that should fail execution. + Invalid bool `json:"invalid"` + + BlockContext *block.Context `json:"pChainHeight"` + + outputPopulated bool + acceptedPopulated bool +} + +func NewTestBlockFromParent(parent *TestBlock) *TestBlock { + return &TestBlock{ + PrntID: parent.GetID(), + Tmstmp: parent.GetTimestamp() + 1, + Hght: parent.GetHeight() + 1, + RandomData: utils.RandomBytes(32), + } +} + +func NewTestBlockFromParentWithContext(parent *TestBlock, ctx *block.Context) *TestBlock { + blk := NewTestBlockFromParent(parent) + blk.BlockContext = ctx + return blk +} + +func (t *TestBlock) GetID() ids.ID { + return hashing.ComputeHash256Array(t.GetBytes()) +} + +func (t *TestBlock) GetParent() ids.ID { + return t.PrntID +} + +func (t *TestBlock) GetTimestamp() int64 { + return t.Tmstmp +} + +func (t *TestBlock) GetBytes() []byte { + b, err := json.Marshal(t) + if err != nil { + panic(err) + } + return b +} + +func (t *TestBlock) GetHeight() uint64 { + return t.Hght +} + +func (t *TestBlock) GetContext() *block.Context { + return t.BlockContext +} + +func (t *TestBlock) String() string { + return fmt.Sprintf(blockStringerF, t.GetID(), t.PrntID, t.Tmstmp, t.Hght, t.RandomData, t.Invalid) +} + +func NewTestBlockFromBytes(b []byte) (*TestBlock, error) { + blk := &TestBlock{} + if err := json.Unmarshal(b, blk); err != nil { + return nil, err + } + return blk, nil +} + +type TestChain struct { + t *testing.T + require *require.Assertions + initLastAcceptedBlock *TestBlock +} + +func NewTestChain( + t *testing.T, + require *require.Assertions, + initLastAcceptedBlock *TestBlock, +) *TestChain { + return &TestChain{ + t: t, + require: require, + initLastAcceptedBlock: initLastAcceptedBlock, + } +} + +func (t *TestChain) Initialize( + ctx context.Context, + chainInput ChainInput, + _ *VM[*TestBlock, *TestBlock, *TestBlock], +) (ChainIndex[*TestBlock], *TestBlock, *TestBlock, bool, error) { + chainIndex, err := chainindex.New[*TestBlock](chainInput.SnowCtx.Log, prometheus.NewRegistry(), chainindex.NewDefaultConfig(), t, memdb.New()) + if err != nil { + return nil, nil, nil, false, err + } + if err := chainIndex.UpdateLastAccepted(ctx, t.initLastAcceptedBlock); err != nil { + return nil, nil, nil, false, err + } + return chainIndex, t.initLastAcceptedBlock, t.initLastAcceptedBlock, t.initLastAcceptedBlock.acceptedPopulated, nil +} + +func (*TestChain) SetConsensusIndex(_ *ConsensusIndex[*TestBlock, *TestBlock, *TestBlock]) {} + +func (t *TestChain) BuildBlock(_ context.Context, blkContext *block.Context, parent *TestBlock) (*TestBlock, *TestBlock, error) { + t.require.True(parent.outputPopulated) + builtBlock := NewTestBlockFromParentWithContext(parent, blkContext) + builtBlock.outputPopulated = true + return builtBlock, builtBlock, nil +} + +func (*TestChain) ParseBlock(_ context.Context, bytes []byte) (*TestBlock, error) { + return NewTestBlockFromBytes(bytes) +} + +func (t *TestChain) VerifyBlock(_ context.Context, parent *TestBlock, block *TestBlock) (*TestBlock, error) { + // The parent must have been executed before we execute the block + t.require.True(parent.outputPopulated) + if block.Invalid { + return nil, fmt.Errorf("%w: %s", errVerifyInvalidBlock, block) + } + + // A block should only be executed once + t.require.False(block.outputPopulated) + block.outputPopulated = true + + return block, nil +} + +func (t *TestChain) AcceptBlock(_ context.Context, acceptedParent *TestBlock, verifiedBlock *TestBlock) (*TestBlock, error) { + // This block must be executed before calling accept + t.require.True(acceptedParent.outputPopulated) + t.require.True(acceptedParent.acceptedPopulated) + t.require.True(verifiedBlock.outputPopulated) + + // The block should only be accepted once + t.require.False(verifiedBlock.acceptedPopulated) + verifiedBlock.acceptedPopulated = true + + return verifiedBlock, nil +} + +type TestConsensusEngine struct { + t *testing.T + require *require.Assertions + rand *rand.Rand + chain *TestChain + vm *SnowVM[*TestBlock, *TestBlock, *TestBlock] + + lastAccepted *Block[*TestBlock, *TestBlock, *TestBlock] + preferred *Block[*TestBlock, *TestBlock, *TestBlock] + verified map[ids.ID]*Block[*TestBlock, *TestBlock, *TestBlock] + children map[ids.ID]set.Set[ids.ID] + accepted []*Block[*TestBlock, *TestBlock, *TestBlock] +} + +func NewTestConsensusEngine(t *testing.T, initLastAcceptedBlock *TestBlock) *TestConsensusEngine { + rand := rand.New(rand.NewSource(0)) //nolint:gosec + return NewTestConsensusEngineWithRand(t, rand, initLastAcceptedBlock) +} + +func NewTestConsensusEngineWithRand(t *testing.T, rand *rand.Rand, initLastAcceptedBlock *TestBlock) *TestConsensusEngine { + r := require.New(t) + ctx := context.Background() + chain := NewTestChain(t, r, initLastAcceptedBlock) + vm := NewSnowVM[*TestBlock, *TestBlock, *TestBlock](testVersion, chain) + toEngine := make(chan common.Message, 1) + ce := &TestConsensusEngine{ + t: t, + require: r, + rand: rand, + chain: chain, + vm: vm, + verified: make(map[ids.ID]*Block[*TestBlock, *TestBlock, *TestBlock]), + children: make(map[ids.ID]set.Set[ids.ID]), + } + snowCtx := snowtest.Context(t, ids.GenerateTestID()) + snowCtx.ChainDataDir = t.TempDir() + config := map[string]interface{}{ + SnowVMConfigKey: VMConfig{ + AcceptedBlockWindowCache: 2, + }, + } + configBytes, err := json.Marshal(config) + r.NoError(err) + r.NoError(vm.Initialize(ctx, snowCtx, nil, nil, configBytes, nil, toEngine, nil, &enginetest.Sender{T: t})) + ce.lastAccepted = vm.LastAcceptedBlock(ctx) + ce.preferred = ce.lastAccepted + t.Cleanup(func() { + r.NoError(vm.Shutdown(ctx)) + }) + return ce +} + +// BuildBlock copies the expected behavior of the consensus engine when building a block +// and assumes the VM always builds a correct block. +func (ce *TestConsensusEngine) BuildBlock(ctx context.Context) (*Block[*TestBlock, *TestBlock, *TestBlock], bool) { + preferredID := ce.preferred.ID() + blk, err := ce.vm.VM.BuildBlock(ctx) + + ce.require.NoError(err) + ce.require.Equal(preferredID, blk.Parent()) + // Skip if we built a block identical to one we've already verified + if _, ok := ce.verified[blk.ID()]; ok { + return blk, false + } + + ce.verifyValidBlock(ctx, blk) + ce.require.NoError(blk.Verify(ctx)) + ce.verified[blk.ID()] = blk + + // Note: there is technically a case in the engine where building a block can enable issuance of + // pending blocks that are missing an ancestor. We ignore this edge case for simplicity here. + ce.require.NoError(ce.vm.SetPreference(ctx, blk.ID())) + ce.preferred = blk + return blk, true +} + +func (ce *TestConsensusEngine) verifyValidBlock(ctx context.Context, blk *Block[*TestBlock, *TestBlock, *TestBlock]) { + ce.require.NoError(blk.Verify(ctx)) + ce.verified[blk.ID()] = blk + + children, ok := ce.children[blk.Parent()] + if !ok { + children = set.NewSet[ids.ID](1) + ce.children[blk.Parent()] = children + } + children.Add(blk.ID()) +} + +func (ce *TestConsensusEngine) selectRandomVerifiedBlock() (*Block[*TestBlock, *TestBlock, *TestBlock], bool) { + for _, blk := range ce.verified { + return blk, true + } + return nil, false +} + +// getLastAcceptedToBlk returns the chain of blocks in the range (lastAcceptedBlk, blk] +// If lastAcceptedBlk == blk, this returns an empty chain +// Assumes that blk and its ancestors tracing back to lastAcceptedBlk are in verified +func (ce *TestConsensusEngine) getLastAcceptedToBlk(_ context.Context, blk *Block[*TestBlock, *TestBlock, *TestBlock]) []*Block[*TestBlock, *TestBlock, *TestBlock] { + if blk.ID() == ce.lastAccepted.ID() { + return nil + } + + chain := make([]*Block[*TestBlock, *TestBlock, *TestBlock], 0) + for { + // Add the block to the chain and check for the next block + chain = append(chain, blk) + if parentBlk, ok := ce.verified[blk.Parent()]; ok { + blk = parentBlk + continue + } + + if blk.Parent() == ce.lastAccepted.ID() { + break + } + ce.require.FailNow("could not find parent tracing to last accepted block") + } + slices.Reverse(chain) + return chain +} + +// acceptChain should mimic the accept behavior of acceptPreferredChild in the Snow consensus engine +// Ref. https://github.com/ava-labs/avalanchego/blob/f6a5c1cd9e0fce911fb2367d1e69b8bb9af1fceb/snow/consensus/snowman/topological.go#L578 +func (ce *TestConsensusEngine) acceptChain(ctx context.Context, chain []*Block[*TestBlock, *TestBlock, *TestBlock]) { + for _, blk := range chain { + _, ok := ce.verified[blk.ID()] + ce.require.True(ok) + + parent := ce.lastAccepted + ce.require.Equal(parent.ID(), blk.Parent()) + + ce.require.NoError(blk.Accept(ctx)) + delete(ce.verified, blk.ID()) + ce.lastAccepted = blk + ce.accepted = append(ce.accepted, blk) + + children := ce.children[parent.ID()] + children.Remove(blk.ID()) + delete(ce.children, parent.ID()) + + ce.rejectTransitively(ctx, children) + } +} + +func (ce *TestConsensusEngine) rejectTransitively(ctx context.Context, toReject set.Set[ids.ID]) { + for child := range toReject { + childBlk, ok := ce.verified[child] + ce.require.True(ok) + ce.require.NoError(childBlk.Reject(ctx)) + delete(ce.verified, child) + + ce.rejectTransitively(ctx, ce.children[child]) + } +} + +func (ce *TestConsensusEngine) AcceptPreferredChain(ctx context.Context) (*Block[*TestBlock, *TestBlock, *TestBlock], bool) { + preferredChain := ce.getLastAcceptedToBlk(ctx, ce.preferred) + + if len(preferredChain) == 0 { + return nil, false + } + + ce.acceptChain(ctx, preferredChain) + return preferredChain[len(preferredChain)-1], true +} + +func (ce *TestConsensusEngine) GetAcceptedBlock(ctx context.Context) { + if len(ce.accepted) == 0 { + return + } + + selectedBlk := ce.accepted[ce.rand.Intn(len(ce.accepted))] + retrievedBlk, err := ce.vm.GetBlock(ctx, selectedBlk.ID()) + ce.require.NoError(err) + ce.require.Equal(retrievedBlk.ID(), selectedBlk.ID()) + + retrievedBlkID, err := ce.vm.GetBlockIDAtHeight(ctx, selectedBlk.Height()) + ce.require.NoError(err) + ce.require.Equal(selectedBlk.ID(), retrievedBlkID) + + retrievedBlk, err = ce.vm.GetBlockByHeight(ctx, selectedBlk.GetHeight()) + ce.require.NoError(err) + ce.require.Equal(retrievedBlk.ID(), selectedBlk.ID()) +} + +func (ce *TestConsensusEngine) ParseFutureBlock(ctx context.Context) { + tBlk := &TestBlock{ + PrntID: ids.GenerateTestID(), + Tmstmp: math.MaxInt64, + Hght: math.MaxUint64, + } + blk, err := ce.vm.ParseBlock(ctx, tBlk.GetBytes()) + ce.require.NoError(err) + ce.require.Equal(tBlk.GetID(), blk.ID()) + ce.require.Equal(tBlk.GetParent(), blk.Parent()) + ce.require.Equal(time.UnixMilli(tBlk.GetTimestamp()), blk.Timestamp()) + ce.require.Equal(tBlk.GetHeight(), blk.Height()) +} + +func (ce *TestConsensusEngine) ParseAndVerifyNewBlock(ctx context.Context, parent *Block[*TestBlock, *TestBlock, *TestBlock]) *Block[*TestBlock, *TestBlock, *TestBlock] { + newBlk := NewTestBlockFromParent(parent.Input) + parsedBlk, err := ce.vm.VM.ParseBlock(ctx, newBlk.GetBytes()) + ce.require.NoError(err) + ce.require.Equal(newBlk.GetID(), parsedBlk.ID()) + ce.verifyValidBlock(ctx, parsedBlk) + return parsedBlk +} + +func (ce *TestConsensusEngine) ParseAndVerifyNewRandomBlock(ctx context.Context) *Block[*TestBlock, *TestBlock, *TestBlock] { + blk, ok := ce.selectRandomVerifiedBlock() + if !ok { + blk = ce.lastAccepted + } + + return ce.ParseAndVerifyNewBlock(ctx, blk) +} + +func (ce *TestConsensusEngine) ParseVerifiedBlk(ctx context.Context) (*Block[*TestBlock, *TestBlock, *TestBlock], bool) { + blk, ok := ce.selectRandomVerifiedBlock() + if !ok { + return nil, false + } + + parsedBlk, err := ce.vm.ParseBlock(ctx, blk.Bytes()) + ce.require.NoError(err) + ce.require.Equal(blk.ID(), parsedBlk.ID()) + ce.require.Equal(blk, parsedBlk) + return blk, true +} + +func (ce *TestConsensusEngine) ParseAndVerifyInvalidBlock(ctx context.Context) { + blk, ok := ce.selectRandomVerifiedBlock() + if !ok { + blk = ce.lastAccepted + } + + newBlk := NewTestBlockFromParent(blk.Input) + newBlk.Invalid = true + parsedBlk, err := ce.vm.ParseBlock(ctx, newBlk.GetBytes()) + ce.require.NoError(err) + ce.require.Equal(newBlk.GetID(), parsedBlk.ID()) + ce.require.ErrorIs(parsedBlk.Verify(ctx), errVerifyInvalidBlock) + _, ok = ce.verified[parsedBlk.ID()] + ce.require.False(ok) +} + +func (ce *TestConsensusEngine) SwapRandomPreference(ctx context.Context) (*Block[*TestBlock, *TestBlock, *TestBlock], *Block[*TestBlock, *TestBlock, *TestBlock], bool) { + selectedBlk, ok := ce.selectRandomVerifiedBlock() + if !ok { + selectedBlk = ce.lastAccepted + } + oldPreference := ce.preferred + newPreference := selectedBlk + changed := ce.preferred.ID() != selectedBlk.ID() + ce.preferred = selectedBlk + ce.require.NoError(ce.vm.SetPreference(ctx, selectedBlk.ID())) + return oldPreference, newPreference, changed +} + +func (ce *TestConsensusEngine) SetPreference(ctx context.Context, blkID ids.ID) (*Block[*TestBlock, *TestBlock, *TestBlock], *Block[*TestBlock, *TestBlock, *TestBlock], bool) { + selectedBlk, ok := ce.verified[blkID] + if !ok && ce.lastAccepted.ID() == blkID { + selectedBlk = ce.lastAccepted + ok = true + } + ce.require.True(ok) + + oldPreference := ce.preferred + newPreference := selectedBlk + changed := ce.preferred.ID() != selectedBlk.ID() + ce.preferred = selectedBlk + ce.require.NoError(ce.vm.SetPreference(ctx, selectedBlk.ID())) + return oldPreference, newPreference, changed +} + +func (ce *TestConsensusEngine) AcceptNonPreferredBlock(ctx context.Context) { + preferredChain := ce.getLastAcceptedToBlk(ctx, ce.preferred) + preferredSet := set.NewSet[ids.ID](len(preferredChain)) + for _, blk := range preferredChain { + preferredSet.Add(blk.ID()) + } + + var selectedBlk *Block[*TestBlock, *TestBlock, *TestBlock] + for _, blk := range ce.verified { + if !preferredSet.Contains(blk.ID()) { + selectedBlk = blk + break + } + } + if selectedBlk == nil { + ce.t.Log("no non-preferred block to accept") + return + } + + nonPreferredChain := ce.getLastAcceptedToBlk(ctx, selectedBlk) + ce.acceptChain(ctx, nonPreferredChain) + _, _, changedPref := ce.SetPreference(ctx, nonPreferredChain[len(nonPreferredChain)-1].ID()) + ce.require.True(changedPref) +} + +func (ce *TestConsensusEngine) GetVerifiedBlock(ctx context.Context) (*Block[*TestBlock, *TestBlock, *TestBlock], bool) { + selectedBlk, ok := ce.selectRandomVerifiedBlock() + if !ok { + return nil, false + } + + blk, err := ce.vm.GetBlock(ctx, selectedBlk.ID()) + ce.require.NoError(err) + ce.require.Equal(blk, selectedBlk) + return selectedBlk, true +} + +func (ce *TestConsensusEngine) GetLastAcceptedBlock(ctx context.Context) { + blk, err := ce.vm.GetBlock(ctx, ce.lastAccepted.ID()) + ce.require.NoError(err) + ce.require.Equal(blk.ID(), ce.lastAccepted.ID()) + ce.require.Equal(blk, ce.lastAccepted) +} + +func (ce *TestConsensusEngine) StartStateSync(ctx context.Context, target *TestBlock) { + ce.require.NoError(ce.vm.StartStateSync(ctx, target)) +} + +func (ce *TestConsensusEngine) FinishStateSync(ctx context.Context, blk *Block[*TestBlock, *TestBlock, *TestBlock]) { + ce.vm.snowCtx.Lock.Lock() + defer ce.vm.snowCtx.Lock.Unlock() + + blk.Input.outputPopulated = true + blk.Input.acceptedPopulated = true + blk.setAccepted(blk.Input, blk.Input) + ce.require.NoError(ce.vm.FinishStateSync(ctx, blk.Input, blk.Output, blk.Accepted)) +} + +type step int + +const ( + buildBlock step = iota + acceptPreferredChain + getAcceptedBlock + parseFutureBlock + parseAndVerifyNewRandomBlock + parseVerifiedBlock + parseAndVerifyInvalidBlock + swapRandomPreference + acceptNonPreferredBlock + getVerifiedBlock + getLastAcceptedBlock + maxStepValue +) + +func (s step) String() string { + switch s { + case buildBlock: + return "buildBlock" + case acceptPreferredChain: + return "acceptPreferredChain" + case getAcceptedBlock: + return "getAcceptedBlock" + case parseFutureBlock: + return "parseFutureBlock" + case parseAndVerifyNewRandomBlock: + return "parseAndVerifyNewRandomBlock" + case parseVerifiedBlock: + return "parseVerifiedBlock" + case parseAndVerifyInvalidBlock: + return "parseAndVerifyInvalidBlock" + case swapRandomPreference: + return "swapRandomPreference" + case acceptNonPreferredBlock: + return "acceptNonPreferredBlock" + case getVerifiedBlock: + return "getVerifiedBlock" + case getLastAcceptedBlock: + return "getLastAcceptedBlock" + default: + panic("invalid step") + } +} + +func (ce *TestConsensusEngine) Step(ctx context.Context, s step) { + switch s { + case buildBlock: + ce.BuildBlock(ctx) + case acceptPreferredChain: + ce.AcceptPreferredChain(ctx) + case getAcceptedBlock: + ce.GetAcceptedBlock(ctx) + case parseFutureBlock: + ce.ParseFutureBlock(ctx) + case parseAndVerifyNewRandomBlock: + ce.ParseAndVerifyNewRandomBlock(ctx) + case parseVerifiedBlock: + ce.ParseVerifiedBlk(ctx) + case parseAndVerifyInvalidBlock: + ce.ParseAndVerifyInvalidBlock(ctx) + case swapRandomPreference: + ce.SwapRandomPreference(ctx) + case acceptNonPreferredBlock: + ce.AcceptNonPreferredBlock(ctx) + case getVerifiedBlock: + ce.GetVerifiedBlock(ctx) + case getLastAcceptedBlock: + ce.GetLastAcceptedBlock(ctx) + default: + // No such step, leave to fuzzer to realize this. + } +} + +func TestBuildAndAcceptBlock(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk1, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + ce.require.Equal(uint64(1), blk1.Height()) + ce.require.Equal(uint64(1), ce.preferred.Height()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk1.ID()) + + ce.require.Empty(ce.verified) +} + +func TestBuildAndAcceptChain(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk1, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + ce.require.Equal(uint64(1), blk1.Height()) + blk2, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + ce.require.Equal(uint64(2), blk2.Height()) + ce.require.Equal(blk2.ID(), ce.preferred.ID()) + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk2.ID()) + + ce.require.Empty(ce.verified) +} + +func TestParseAndAcceptBlock(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk1 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.require.Equal(uint64(1), blk1.Height()) + + oldPref, newPref, changed := ce.SetPreference(ctx, blk1.ID()) + ce.require.True(changed) + ce.require.Equal(ce.lastAccepted, oldPref) + ce.require.Equal(blk1, newPref) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk1.ID()) + + ce.require.Empty(ce.verified) +} + +func TestParseAndAcceptChain(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk0 := ce.lastAccepted + blk1 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.require.Equal(uint64(1), blk1.Height()) + + oldPref, newPref, changed := ce.SetPreference(ctx, blk1.ID()) + ce.require.True(changed) + ce.require.Equal(blk0, oldPref) + ce.require.Equal(blk1, newPref) + ce.require.Equal(uint64(1), ce.preferred.Height()) + + blk2 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.require.Equal(uint64(2), blk2.Height()) + + oldPref, newPref, changed = ce.SetPreference(ctx, blk2.ID()) + ce.require.True(changed) + ce.require.Equal(blk1, oldPref) + ce.require.Equal(blk2, newPref) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk2.ID()) + + ce.require.Empty(ce.verified) +} + +func TestBuild_Parse_Get_Accept(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk1, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + + parsedBlk, ok := ce.ParseVerifiedBlk(ctx) + ce.require.True(ok) + ce.require.Equal(blk1.ID(), parsedBlk.ID()) + + gotBlk, ok := ce.GetVerifiedBlock(ctx) + ce.require.True(ok) + ce.require.Equal(blk1.ID(), gotBlk.ID()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk1.ID()) + + ce.require.Empty(ce.verified) +} + +func TestBuild_ParseAndExtendPreferredChain_Accept(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk1, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + + blk2 := ce.ParseAndVerifyNewBlock(ctx, blk1) + ce.require.Equal(uint64(2), blk2.Height()) + ce.SetPreference(ctx, blk2.ID()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk2.ID()) + + ce.require.Empty(ce.verified) +} + +func TestConflictingChains(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + genesis := ce.lastAccepted + + builtBlk1, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + ce.require.Equal(uint64(1), builtBlk1.Height()) + + builtBlk2, ok := ce.BuildBlock(ctx) + ce.require.True(ok) + ce.require.Equal(uint64(2), builtBlk2.Height()) + + parsedBlk1 := ce.ParseAndVerifyNewBlock(ctx, genesis) + ce.require.Equal(uint64(1), parsedBlk1.Height()) + parsedBlk2 := ce.ParseAndVerifyNewBlock(ctx, parsedBlk1) + ce.require.Equal(uint64(2), parsedBlk2.Height()) + ce.SetPreference(ctx, parsedBlk2.ID()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), parsedBlk2.ID()) + + ce.require.Empty(ce.verified) +} + +func TestBuildBlockWithContext(t *testing.T) { + tests := []struct { + name string + buildContext *block.Context + verifyContext *block.Context + expectedErr error + }{ + { + name: "build = nil, verify = nil", + }, + { + name: "build = 1, verify = nil", + verifyContext: &block.Context{PChainHeight: 1}, + expectedErr: errMismatchedPChainContext, + }, + { + name: "build = 1, verify = 2", + buildContext: &block.Context{PChainHeight: 1}, + verifyContext: &block.Context{PChainHeight: 2}, + expectedErr: errMismatchedPChainContext, + }, + { + name: "build = 1, verify = 1", + buildContext: &block.Context{PChainHeight: 1}, + verifyContext: &block.Context{PChainHeight: 1}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + blk, err := ce.vm.VM.BuildBlockWithContext(ctx, test.buildContext) + ce.require.NoError(err) + ce.require.Equal(test.buildContext, blk.Input.GetContext()) + + ce.require.ErrorIs(blk.VerifyWithContext(ctx, test.verifyContext), test.expectedErr) + }) + } +} + +func TestVerifyBlockWithContext(t *testing.T) { + tests := []struct { + name string + suppliedContext *block.Context + verifyContext *block.Context + expectedErr error + }{ + { + name: "build = nil, verify = nil", + }, + { + name: "build = 1, verify = nil", + verifyContext: &block.Context{PChainHeight: 1}, + expectedErr: errMismatchedPChainContext, + }, + { + name: "build = 1, verify = 2", + suppliedContext: &block.Context{PChainHeight: 1}, + verifyContext: &block.Context{PChainHeight: 2}, + expectedErr: errMismatchedPChainContext, + }, + { + name: "build = 1, verify = 1", + suppliedContext: &block.Context{PChainHeight: 1}, + verifyContext: &block.Context{PChainHeight: 1}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + testBlk := NewTestBlockFromParentWithContext(ce.lastAccepted.Input, test.suppliedContext) + + blk, err := ce.vm.VM.ParseBlock(ctx, testBlk.GetBytes()) + ce.require.NoError(err) + ce.require.Equal(test.suppliedContext, blk.Input.GetContext()) + + ce.require.ErrorIs(blk.VerifyWithContext(ctx, test.verifyContext), test.expectedErr) + }) + } +} + +func TestDynamicStateSyncTransition_NoPending(t *testing.T) { + ctx := context.Background() + + // Create consensus engine in dynamic state sync mode. + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + // Parse and verify a new block, which should be a pass through. + blk1 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.SetPreference(ctx, blk1.ID()) + ce.require.Equal(uint64(1), blk1.Height()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk1.ID()) + + // Tip should not be verified/accepted, since it was handled prior + // to the VM being marked ready. + ce.require.False(acceptedTip.verified) + ce.require.False(acceptedTip.accepted) + + // Mark the VM ready and fully populate the last accepted block. + ce.FinishStateSync(ctx, acceptedTip) + + ce.ParseAndVerifyInvalidBlock(ctx) + + blk2 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.require.Equal(uint64(2), blk2.Height()) + ce.SetPreference(ctx, blk2.ID()) + + ce.ParseAndVerifyInvalidBlock(ctx) + updatedAcceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(updatedAcceptedTip.ID(), blk2.ID()) +} + +func TestDynamicStateSyncTransition_PendingTree_AcceptSingleBlock(t *testing.T) { + ctx := context.Background() + + // Create consensus engine in dynamic state sync mode. + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + parent := ce.lastAccepted + // Parse and verify a new block, which should be a pass through. + blk1 := ce.ParseAndVerifyNewBlock(ctx, parent) + ce.SetPreference(ctx, blk1.ID()) + ce.require.Equal(uint64(1), blk1.Height()) + + ce.FinishStateSync(ctx, ce.lastAccepted) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk1.ID()) +} + +func TestDynamicStateSyncTransition_PendingTree_AcceptChain(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + parent := ce.lastAccepted + // Parse and verify a new block, which should be a pass through. + blk1 := ce.ParseAndVerifyNewBlock(ctx, parent) + ce.SetPreference(ctx, blk1.ID()) + ce.require.Equal(uint64(1), blk1.Height()) + + blk2 := ce.ParseAndVerifyNewBlock(ctx, blk1) + ce.SetPreference(ctx, blk2.ID()) + ce.require.Equal(uint64(2), blk2.Height()) + + ce.FinishStateSync(ctx, ce.lastAccepted) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk2.ID()) +} + +func TestDynamicStateSyncTransition_PendingTree_VerifySingleBlock(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + parent := ce.lastAccepted + // Parse and verify a new block, which should be a pass through. + blk1 := ce.ParseAndVerifyNewBlock(ctx, parent) + ce.SetPreference(ctx, blk1.ID()) + ce.require.Equal(uint64(1), blk1.Height()) + + ce.FinishStateSync(ctx, ce.lastAccepted) + + blk2 := ce.ParseAndVerifyNewBlock(ctx, blk1) + ce.SetPreference(ctx, blk2.ID()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk2.ID()) +} + +func TestDynamicStateSyncTransition_PendingTree_VerifyChain(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + parent := ce.lastAccepted + // Parse and verify a new block, which should be a pass through. + blk1 := ce.ParseAndVerifyNewBlock(ctx, parent) + ce.SetPreference(ctx, blk1.ID()) + ce.require.Equal(uint64(1), blk1.Height()) + + ce.FinishStateSync(ctx, ce.lastAccepted) + + blk2 := ce.ParseAndVerifyNewBlock(ctx, blk1) + ce.SetPreference(ctx, blk2.ID()) + + blk3 := ce.ParseAndVerifyNewBlock(ctx, blk2) + ce.SetPreference(ctx, blk3.ID()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk3.ID()) +} + +func TestDynamicStateSyncTransition_PendingTree_VerifyBlockWithInvalidAncestor(t *testing.T) { + ctx := context.Background() + + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + // Check health - should be unhealthy during state sync + details, err := ce.vm.HealthCheck(ctx) + ce.require.ErrorIs(err, errVMNotReady) + ce.require.Equal(map[string]interface{}{ + vmReadinessHealthChecker: false, + }, details) + + parent := ce.lastAccepted + invalidTestBlock1 := NewTestBlockFromParent(parent.Input) + invalidTestBlock1.Invalid = true + + parsedBlk1, err := ce.vm.VM.ParseBlock(ctx, invalidTestBlock1.GetBytes()) + ce.require.NoError(err) + ce.verifyValidBlock(ctx, parsedBlk1) + + invalidTestBlock2 := NewTestBlockFromParent(invalidTestBlock1) + invalidTestBlock2.Invalid = true + + parsedBlk2, err := ce.vm.VM.ParseBlock(ctx, invalidTestBlock2.GetBytes()) + ce.require.NoError(err) + ce.verifyValidBlock(ctx, parsedBlk2) + + ce.FinishStateSync(ctx, ce.lastAccepted) + + // Check health - should be unhealthy due to unresolved blocks + details, err = ce.vm.HealthCheck(ctx) + ce.require.ErrorIs(err, errUnresolvedBlocks) + ce.require.Equal(map[string]any{ + vmReadinessHealthChecker: true, + unresolvedBlocksHealthChecker: 2, + }, details) + + // Construct a new child of the invalid block at depth 1 marked as processing + invalidatedChildTestBlock1 := NewTestBlockFromParent(invalidTestBlock1) + invalidatedChildBlock1, err := ce.vm.ParseBlock(ctx, invalidatedChildTestBlock1.GetBytes()) + ce.require.NoError(err) + + invalidatedChildBlock1Err := invalidatedChildBlock1.Verify(ctx) + ce.require.ErrorIs(invalidatedChildBlock1Err, errParentFailedVerification) + + // Construct a new child of the invalid block at depth 2 marked as processing + // This tests that if a parent block fails verification, a re-processing child + // will also fail verification after transitioning out of state sync. + invalidatedChildTestBlock2 := NewTestBlockFromParent(invalidTestBlock2) + invalidatedChildBlock2, err := ce.vm.ParseBlock(ctx, invalidatedChildTestBlock2.GetBytes()) + ce.require.NoError(err) + + invalidatedChildBlk2 := invalidatedChildBlock2.Verify(ctx) + ce.require.ErrorIs(invalidatedChildBlk2, errParentFailedVerification) + + // Accept a new block to reject the invalid chain + // Note: consensus only rejects blocks after accepting a conflict, so we + // mimic this behavior here. + validBlk1 := ce.ParseAndVerifyNewBlock(ctx, ce.lastAccepted) + ce.SetPreference(ctx, validBlk1.ID()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), validBlk1.ID()) + + details, err = ce.vm.HealthCheck(ctx) + ce.require.NoError(err) + ce.require.Equal(map[string]any{ + vmReadinessHealthChecker: true, + unresolvedBlocksHealthChecker: 0, + }, details) +} + +func TestDynamicStateSync_FinishOnAcceptedAncestor(t *testing.T) { + ctx := context.Background() + + // Create consensus engine in dynamic state sync mode. + ce := NewTestConsensusEngine(t, &TestBlock{}) + ce.StartStateSync(ctx, ce.lastAccepted.Input) + + notReadyLastAccepted := ce.lastAccepted + + // Parse and verify a new block, which should be a pass through. + blk1 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.SetPreference(ctx, blk1.ID()) + ce.require.Equal(uint64(1), blk1.Height()) + + acceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(acceptedTip.ID(), blk1.ID()) + + // Tip should not be verified/accepted, since it was handled prior + // to the VM being marked ready. + ce.require.False(acceptedTip.verified) + ce.require.False(acceptedTip.accepted) + + // Mark the VM ready and set the last accepted block to an ancestor + // of the current last accepted block + ce.FinishStateSync(ctx, notReadyLastAccepted) + + ce.ParseAndVerifyInvalidBlock(ctx) + + blk2 := ce.ParseAndVerifyNewRandomBlock(ctx) + ce.require.Equal(uint64(2), blk2.Height()) + ce.SetPreference(ctx, blk2.ID()) + + ce.ParseAndVerifyInvalidBlock(ctx) + updatedAcceptedTip, ok := ce.AcceptPreferredChain(ctx) + ce.require.True(ok) + ce.require.Equal(updatedAcceptedTip.ID(), blk2.ID()) +} + +func FuzzSnowVM(f *testing.F) { + for i := byte(0); i < 100; i++ { + randomSteps := hashing.ComputeHash256([]byte{i}) + f.Add(int64(i), randomSteps) + } + + maxFuzzSteps := 50 + // Cap the number of steps to take by using byte as the type + f.Fuzz(func(t *testing.T, randSource int64, byteSteps []byte) { + rand := rand.New(rand.NewSource(randSource)) //nolint:gosec + + ctx := context.Background() + ce := NewTestConsensusEngineWithRand(t, rand, &TestBlock{outputPopulated: true, acceptedPopulated: true}) + + byteSteps = byteSteps[:min(maxFuzzSteps, len(byteSteps))] + for _, byteStep := range byteSteps { + selectedStep := step(byteStep % byte(maxStepValue)) + t.Logf("Step: %s", selectedStep) + ce.Step(ctx, selectedStep) + } + }) +}