Skip to content

[Storage Refactor] Refactor Chunk Locators to badger updates #6947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/dbops"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
)
Expand All @@ -57,6 +58,10 @@ type VerificationConfig struct {
chunkWorkers uint64 // number of chunks processed in parallel.

stopAtHeight uint64 // height to stop the node on

// database operations being used for updates, "badger-transaction" for using badger transactions,
// "badger-batch" for using badger batch updates, "pebble-batch" for using pebble batch updates.
dbOps string
}

type VerificationNodeBuilder struct {
Expand Down Expand Up @@ -84,6 +89,7 @@ func (v *VerificationNodeBuilder) LoadFlags() {
flags.Uint64Var(&v.verConf.blockWorkers, "block-workers", blockconsumer.DefaultBlockWorkers, "maximum number of blocks being processed in parallel")
flags.Uint64Var(&v.verConf.chunkWorkers, "chunk-workers", chunkconsumer.DefaultChunkWorkers, "maximum number of execution nodes a chunk data pack request is dispatched to")
flags.Uint64Var(&v.verConf.stopAtHeight, "stop-at-height", 0, "height to stop the node at (0 to disable)")
flags.StringVar(&v.verConf.dbOps, "db-ops", "badger-transaction", "database operations to use (badger-transaction, badger-batch, pebble-batch)")
})
}

Expand All @@ -95,7 +101,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex storage.ConsumerProgressInitializer // used in chunk consumer
processedBlockHeight storage.ConsumerProgressInitializer // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer
chunkQueue storage.ChunksQueue // used in chunk consumer

syncCore *chainsync.Core // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
Expand Down Expand Up @@ -166,10 +172,30 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return nil
}).
Module("chunks queue", func(node *NodeConfig) error {
chunkQueue = badger.NewChunkQueue(node.DB)
ok, err := chunkQueue.Init(chunkconsumer.DefaultJobIndex)
if err != nil {
return fmt.Errorf("could not initialize default index in chunks queue: %w", err)
var ok bool
var err error

switch dbops.DBOps(v.verConf.dbOps) {
case dbops.BadgerTransaction:
queue := badger.NewChunkQueue(node.DB)
ok, err = queue.Init(chunkconsumer.DefaultJobIndex)
if err != nil {
return fmt.Errorf("could not initialize default index in chunks queue: %w", err)
}

chunkQueue = queue
case dbops.BadgerBatch:
queue := store.NewChunkQueue(node.Metrics.Cache, badgerimpl.ToDB(node.DB))
ok, err = queue.Init(chunkconsumer.DefaultJobIndex)
if err != nil {
return fmt.Errorf("could not initialize default index in chunks queue: %w", err)
}

chunkQueue = queue
case dbops.PebbleBatch:
return fmt.Errorf("to be implemented")
default:
return fmt.Errorf("invalid db opts type: %v", v.verConf.dbOps)
}

node.Logger.Info().
Expand Down
3 changes: 1 addition & 2 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -300,7 +299,7 @@ type VerificationNode struct {

// chunk consumer and processor for fetcher engine
ProcessedChunkIndex storage.ConsumerProgressInitializer
ChunksQueue *bstorage.ChunksQueue
ChunksQueue storage.ChunksQueue
ChunkConsumer *chunkconsumer.ChunkConsumer

// block consumer for chunk consumer
Expand Down
5 changes: 3 additions & 2 deletions engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,10 +993,11 @@ func VerificationNode(t testing.TB,
}

if node.ChunksQueue == nil {
node.ChunksQueue = storage.NewChunkQueue(node.PublicDB)
ok, err := node.ChunksQueue.Init(chunkconsumer.DefaultJobIndex)
cq := store.NewChunkQueue(node.Metrics, badgerimpl.ToDB(node.PublicDB))
ok, err := cq.Init(chunkconsumer.DefaultJobIndex)
require.NoError(t, err)
require.True(t, ok)
node.ChunksQueue = cq
}

if node.ProcessedBlockHeight == nil {
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
ResourceGuarantee = "guarantee"
ResourceResult = "result"
ResourceResultApprovals = "result_approvals"
ResourceChunkLocators = "chunk_locators"
ResourceReceipt = "receipt"
ResourceQC = "qc"
ResourceMyReceipt = "my_receipt"
Expand Down
20 changes: 20 additions & 0 deletions storage/dbops/dbops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dbops

// The dbops feature flag is used to toggle between different database update operations.
// Currently, the existing database update operations use badger-transaction, which is default and deprecated.
// As part of the refactoring process to eventually transition to pebble-batch updates,
// an intermediate step is required to switch to badger-batch.
// This is why the feature flag has three possible values to facilitate the transition.
const DB_OPS_MSG = "database operations to use (badger-transaction, badger-batch, pebble-batch)"
const DB_OPS_DEFAULT = string(BadgerTransaction)

type DBOps string

const (
// BadgerTransaction uses badger transactions (default and deprecated)
BadgerTransaction DBOps = "badger-transaction"
// BadgerBatch uses badger batch updates
BadgerBatch DBOps = "badger-batch"
// PebbleBatch uses pebble batch updates
PebbleBatch DBOps = "pebble-batch"
)
19 changes: 19 additions & 0 deletions storage/operation/chunk_locators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package operation

import (
"github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

func InsertChunkLocator(w storage.Writer, locator *chunks.Locator) error {
return UpsertByKey(w, MakePrefix(codeChunk, locator.ID()), locator)
}

func RetrieveChunkLocator(r storage.Reader, locatorID flow.Identifier, locator *chunks.Locator) error {
return RetrieveByKey(r, MakePrefix(codeChunk, locatorID), locator)
}

func ExistChunkLocator(r storage.Reader, locatorID flow.Identifier) (bool, error) {
return KeyExists(r, MakePrefix(codeChunk, locatorID))
}
24 changes: 24 additions & 0 deletions storage/operation/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package operation

import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

func RetrieveJobLatestIndex(r storage.Reader, queue string, index *uint64) error {
return RetrieveByKey(r, MakePrefix(codeJobQueuePointer, queue), index)
}

func SetJobLatestIndex(w storage.Writer, queue string, index uint64) error {
return UpsertByKey(w, MakePrefix(codeJobQueuePointer, queue), index)
}

// RetrieveJobAtIndex returns the entity at the given index
func RetrieveJobAtIndex(r storage.Reader, queue string, index uint64, entity *flow.Identifier) error {
return RetrieveByKey(r, MakePrefix(codeJobQueue, queue, index), entity)
}

// InsertJobAtIndex insert an entity ID at the given index
func InsertJobAtIndex(w storage.Writer, queue string, index uint64, entity flow.Identifier) error {
return UpsertByKey(w, MakePrefix(codeJobQueue, queue, index), entity)
}
2 changes: 2 additions & 0 deletions storage/store/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) {

type storeFunc[K comparable, V any] func(rw storage.ReaderBatchWriter, key K, val V) error

const DefaultCacheSize = uint(1000)

This comment was marked as resolved.


// nolint:unused
func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) {
return func(c *Cache[K, V]) {
Expand Down
178 changes: 178 additions & 0 deletions storage/store/chunks_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package store

import (
"errors"
"fmt"
"sync"

"github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
)

// ChunksQueue stores a queue of chunk locators that assigned to me to verify.
// Job consumers can read the locators as job from the queue by index.
// Chunk locators stored in this queue are unique.
type ChunksQueue struct {
db storage.DB
chunkLocatorCache *Cache[uint64, *chunks.Locator] // cache for chunk locators, indexed by job index
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a chunk locator cache for better performance.

storing *sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not quite clear what this is locking. It doesn't look like its locking the cache nor the db because AtIndex just accesses both without a lock.

Copy link
Member Author

@zhangchiqing zhangchiqing Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AtIndex doesn't need to be protected. The storing lock is to prevent dirty read of ExistChunkLocator. The Store method depends on the return value of this read. Without the lock, the returned value could be wrong if it's updated by another Store call on a different thread.

}

const JobQueueChunksQueue = "JobQueueChunksQueue"
const DefaultChunkQueuesCacheSize = uint(1000)

func newChunkLocatorCache(collector module.CacheMetrics) *Cache[uint64, *chunks.Locator] {
store := func(rw storage.ReaderBatchWriter, index uint64, locator *chunks.Locator) error {
// make sure the chunk locator is unique
err := operation.InsertChunkLocator(rw.Writer(), locator)
if err != nil {
return fmt.Errorf("failed to insert chunk locator: %w", err)
}

err = operation.InsertJobAtIndex(rw.Writer(), JobQueueChunksQueue, index, locator.ID())
if err != nil {
return fmt.Errorf("failed to set job index for chunk locator queue at index %v: %w", index, err)
}

return nil
}

retrieve := func(r storage.Reader, index uint64) (*chunks.Locator, error) {
var locatorID flow.Identifier
err := operation.RetrieveJobAtIndex(r, JobQueueChunksQueue, index, &locatorID)
if err != nil {
return nil, fmt.Errorf("could not retrieve chunk locator in queue: %w", err)
}

var locator chunks.Locator
err = operation.RetrieveChunkLocator(r, locatorID, &locator)
if err != nil {
return nil, fmt.Errorf("could not retrieve locator for chunk id %v: %w", locatorID, err)
}

return &locator, nil
}
return newCache(collector, metrics.ResourceChunkLocators,
withLimit[uint64, *chunks.Locator](DefaultChunkQueuesCacheSize),
withStore(store),
withRetrieve(retrieve))
}

// NewChunkQueue will initialize the underlying badger database of chunk locator queue.
func NewChunkQueue(collector module.CacheMetrics, db storage.DB) *ChunksQueue {
return &ChunksQueue{
db: db,
chunkLocatorCache: newChunkLocatorCache(collector),
storing: &sync.Mutex{},
}
}

// Init initializes chunk queue's latest index with the given default index.
// It returns (false, nil) if the chunk queue is already initialized.
// It returns (true, nil) if the chunk queue is successfully initialized.
func (q *ChunksQueue) Init(defaultIndex uint64) (bool, error) {
q.storing.Lock()
defer q.storing.Unlock()

_, err := q.LatestIndex()
if err == nil {
// the chunk queue is already initialized
return false, nil
}

if !errors.Is(err, storage.ErrNotFound) {
return false, fmt.Errorf("could not get latest index: %w", err)
}

// the latest index does not exist,
// if the latest index is not found, initialize it with the default index
// in this case, double check that no chunk locator exist at the default index
_, err = q.AtIndex(defaultIndex)
if err == nil {
return false, fmt.Errorf("chunk locator already exists at default index %v", defaultIndex)
}
if !errors.Is(err, storage.ErrNotFound) {
return false, fmt.Errorf("could not check chunk locator at default index %v: %w", defaultIndex, err)
}

// set the default index as the latest index
err = q.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.SetJobLatestIndex(rw.Writer(), JobQueueChunksQueue, defaultIndex)
})

if err != nil {
return false, fmt.Errorf("could not init chunk locator queue with default index %v: %w", defaultIndex, err)
}

return true, nil
}

// StoreChunkLocator stores a new chunk locator that assigned to me to the job queue.
// A true will be returned, if the locator was new.
// A false will be returned, if the locator was duplicate.
func (q *ChunksQueue) StoreChunkLocator(locator *chunks.Locator) (bool, error) {
// storing chunk locator requires reading the latest index and updating it,
// so we need to lock the storing operation
q.storing.Lock()
defer q.storing.Unlock()

// read the latest index
latest, err := q.LatestIndex()
if err != nil {
return false, err
}

// make sure the chunk locator is unique
exists, err := operation.ExistChunkLocator(q.db.Reader(), locator.ID())
if err != nil {
return false, fmt.Errorf("failed to check chunk locator existence: %w", err)
}

// if the locator already exists, return false
if exists {
return false, nil
}

// insert to the next index
next := latest + 1

err = q.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
// store and cache the chunk locator
err := q.chunkLocatorCache.PutTx(rw, next, locator)
if err != nil {
return fmt.Errorf("failed to store and cache chunk locator: %w", err)
}

// update the next index as the latest index
err = operation.SetJobLatestIndex(rw.Writer(), JobQueueChunksQueue, next)
if err != nil {
return fmt.Errorf("failed to update latest index %v: %w", next, err)
}

return nil
})

if err != nil {
return false, fmt.Errorf("failed to store chunk locator: %w", err)
}
return true, nil
}

// LatestIndex returns the index of the latest chunk locator stored in the queue.
func (q *ChunksQueue) LatestIndex() (uint64, error) {
var latest uint64
err := operation.RetrieveJobLatestIndex(q.db.Reader(), JobQueueChunksQueue, &latest)
if err != nil {
return 0, fmt.Errorf("could not retrieve latest index for chunks queue: %w", err)
}
return latest, nil
}

// AtIndex returns the chunk locator stored at the given index in the queue.
func (q *ChunksQueue) AtIndex(index uint64) (*chunks.Locator, error) {
return q.chunkLocatorCache.Get(q.db.Reader(), index)
}
Loading
Loading