-
Notifications
You must be signed in to change notification settings - Fork 190
[Storage Refactor] Refactor Chunk Locators to badger updates #6947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
10eec33
3f66e50
c3faf8b
fec3f4b
08f9271
5acba2a
4493e14
9a0ea01
e53b07b
b58358e
3a08dff
1dfa32c
8a637ca
17264a8
c0be640
3d092d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package dbops | ||
|
||
// The dbops feature flag is used to toggle between different database update operations. | ||
// Currently, the existing database update operations use badger-transaction, which is default and deprecated. | ||
// As part of the refactoring process to eventually transition to pebble-batch updates, | ||
// an intermediate step is required to switch to badger-batch. | ||
// This is why the feature flag has three possible values to facilitate the transition. | ||
const DB_OPS_MSG = "database operations to use (badger-transaction, badger-batch, pebble-batch)" | ||
const DB_OPS_DEFAULT = string(BadgerTransaction) | ||
|
||
type DBOps string | ||
|
||
const ( | ||
// BadgerTransaction uses badger transactions (default and deprecated) | ||
BadgerTransaction DBOps = "badger-transaction" | ||
// BadgerBatch uses badger batch updates | ||
BadgerBatch DBOps = "badger-batch" | ||
// PebbleBatch uses pebble batch updates | ||
PebbleBatch DBOps = "pebble-batch" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package operation | ||
|
||
import ( | ||
"github.com/onflow/flow-go/model/chunks" | ||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/storage" | ||
) | ||
|
||
func InsertChunkLocator(w storage.Writer, locator *chunks.Locator) error { | ||
return UpsertByKey(w, MakePrefix(codeChunk, locator.ID()), locator) | ||
} | ||
|
||
func RetrieveChunkLocator(r storage.Reader, locatorID flow.Identifier, locator *chunks.Locator) error { | ||
return RetrieveByKey(r, MakePrefix(codeChunk, locatorID), locator) | ||
} | ||
|
||
func ExistChunkLocator(r storage.Reader, locatorID flow.Identifier) (bool, error) { | ||
return KeyExists(r, MakePrefix(codeChunk, locatorID)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package operation | ||
|
||
import ( | ||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/storage" | ||
) | ||
|
||
func RetrieveJobLatestIndex(r storage.Reader, queue string, index *uint64) error { | ||
return RetrieveByKey(r, MakePrefix(codeJobQueuePointer, queue), index) | ||
} | ||
|
||
func SetJobLatestIndex(w storage.Writer, queue string, index uint64) error { | ||
return UpsertByKey(w, MakePrefix(codeJobQueuePointer, queue), index) | ||
} | ||
|
||
// RetrieveJobAtIndex returns the entity at the given index | ||
func RetrieveJobAtIndex(r storage.Reader, queue string, index uint64, entity *flow.Identifier) error { | ||
return RetrieveByKey(r, MakePrefix(codeJobQueue, queue, index), entity) | ||
} | ||
|
||
// InsertJobAtIndex insert an entity ID at the given index | ||
func InsertJobAtIndex(w storage.Writer, queue string, index uint64, entity flow.Identifier) error { | ||
return UpsertByKey(w, MakePrefix(codeJobQueue, queue, index), entity) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package store | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/onflow/flow-go/model/chunks" | ||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/module" | ||
"github.com/onflow/flow-go/module/metrics" | ||
"github.com/onflow/flow-go/storage" | ||
"github.com/onflow/flow-go/storage/operation" | ||
) | ||
|
||
// ChunksQueue stores a queue of chunk locators that assigned to me to verify. | ||
// Job consumers can read the locators as job from the queue by index. | ||
// Chunk locators stored in this queue are unique. | ||
type ChunksQueue struct { | ||
db storage.DB | ||
chunkLocatorCache *Cache[uint64, *chunks.Locator] // cache for chunk locators, indexed by job index | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a chunk locator cache for better performance. |
||
storing *sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its not quite clear what this is locking. It doesn't look like its locking the cache nor the db because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
const JobQueueChunksQueue = "JobQueueChunksQueue" | ||
const DefaultChunkQueuesCacheSize = uint(1000) | ||
|
||
func newChunkLocatorCache(collector module.CacheMetrics) *Cache[uint64, *chunks.Locator] { | ||
store := func(rw storage.ReaderBatchWriter, index uint64, locator *chunks.Locator) error { | ||
// make sure the chunk locator is unique | ||
err := operation.InsertChunkLocator(rw.Writer(), locator) | ||
if err != nil { | ||
return fmt.Errorf("failed to insert chunk locator: %w", err) | ||
} | ||
|
||
err = operation.InsertJobAtIndex(rw.Writer(), JobQueueChunksQueue, index, locator.ID()) | ||
if err != nil { | ||
return fmt.Errorf("failed to set job index for chunk locator queue at index %v: %w", index, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
retrieve := func(r storage.Reader, index uint64) (*chunks.Locator, error) { | ||
var locatorID flow.Identifier | ||
err := operation.RetrieveJobAtIndex(r, JobQueueChunksQueue, index, &locatorID) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not retrieve chunk locator in queue: %w", err) | ||
} | ||
|
||
var locator chunks.Locator | ||
err = operation.RetrieveChunkLocator(r, locatorID, &locator) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not retrieve locator for chunk id %v: %w", locatorID, err) | ||
} | ||
|
||
return &locator, nil | ||
} | ||
return newCache(collector, metrics.ResourceChunkLocators, | ||
withLimit[uint64, *chunks.Locator](DefaultChunkQueuesCacheSize), | ||
withStore(store), | ||
withRetrieve(retrieve)) | ||
} | ||
|
||
// NewChunkQueue will initialize the underlying badger database of chunk locator queue. | ||
func NewChunkQueue(collector module.CacheMetrics, db storage.DB) *ChunksQueue { | ||
return &ChunksQueue{ | ||
db: db, | ||
chunkLocatorCache: newChunkLocatorCache(collector), | ||
storing: &sync.Mutex{}, | ||
} | ||
} | ||
|
||
// Init initializes chunk queue's latest index with the given default index. | ||
// It returns (false, nil) if the chunk queue is already initialized. | ||
// It returns (true, nil) if the chunk queue is successfully initialized. | ||
func (q *ChunksQueue) Init(defaultIndex uint64) (bool, error) { | ||
q.storing.Lock() | ||
defer q.storing.Unlock() | ||
|
||
_, err := q.LatestIndex() | ||
if err == nil { | ||
// the chunk queue is already initialized | ||
return false, nil | ||
} | ||
|
||
if !errors.Is(err, storage.ErrNotFound) { | ||
return false, fmt.Errorf("could not get latest index: %w", err) | ||
} | ||
|
||
// the latest index does not exist, | ||
// if the latest index is not found, initialize it with the default index | ||
// in this case, double check that no chunk locator exist at the default index | ||
_, err = q.AtIndex(defaultIndex) | ||
if err == nil { | ||
return false, fmt.Errorf("chunk locator already exists at default index %v", defaultIndex) | ||
} | ||
if !errors.Is(err, storage.ErrNotFound) { | ||
return false, fmt.Errorf("could not check chunk locator at default index %v: %w", defaultIndex, err) | ||
} | ||
|
||
// set the default index as the latest index | ||
err = q.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { | ||
return operation.SetJobLatestIndex(rw.Writer(), JobQueueChunksQueue, defaultIndex) | ||
}) | ||
|
||
if err != nil { | ||
return false, fmt.Errorf("could not init chunk locator queue with default index %v: %w", defaultIndex, err) | ||
} | ||
|
||
return true, nil | ||
} | ||
|
||
// StoreChunkLocator stores a new chunk locator that assigned to me to the job queue. | ||
// A true will be returned, if the locator was new. | ||
// A false will be returned, if the locator was duplicate. | ||
func (q *ChunksQueue) StoreChunkLocator(locator *chunks.Locator) (bool, error) { | ||
// storing chunk locator requires reading the latest index and updating it, | ||
// so we need to lock the storing operation | ||
q.storing.Lock() | ||
defer q.storing.Unlock() | ||
|
||
// read the latest index | ||
latest, err := q.LatestIndex() | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
// make sure the chunk locator is unique | ||
exists, err := operation.ExistChunkLocator(q.db.Reader(), locator.ID()) | ||
if err != nil { | ||
return false, fmt.Errorf("failed to check chunk locator existence: %w", err) | ||
} | ||
|
||
// if the locator already exists, return false | ||
if exists { | ||
return false, nil | ||
} | ||
|
||
// insert to the next index | ||
next := latest + 1 | ||
|
||
err = q.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { | ||
// store and cache the chunk locator | ||
err := q.chunkLocatorCache.PutTx(rw, next, locator) | ||
if err != nil { | ||
return fmt.Errorf("failed to store and cache chunk locator: %w", err) | ||
} | ||
|
||
// update the next index as the latest index | ||
err = operation.SetJobLatestIndex(rw.Writer(), JobQueueChunksQueue, next) | ||
if err != nil { | ||
return fmt.Errorf("failed to update latest index %v: %w", next, err) | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
if err != nil { | ||
return false, fmt.Errorf("failed to store chunk locator: %w", err) | ||
} | ||
return true, nil | ||
} | ||
|
||
// LatestIndex returns the index of the latest chunk locator stored in the queue. | ||
func (q *ChunksQueue) LatestIndex() (uint64, error) { | ||
var latest uint64 | ||
err := operation.RetrieveJobLatestIndex(q.db.Reader(), JobQueueChunksQueue, &latest) | ||
if err != nil { | ||
return 0, fmt.Errorf("could not retrieve latest index for chunks queue: %w", err) | ||
} | ||
return latest, nil | ||
} | ||
|
||
// AtIndex returns the chunk locator stored at the given index in the queue. | ||
func (q *ChunksQueue) AtIndex(index uint64) (*chunks.Locator, error) { | ||
return q.chunkLocatorCache.Get(q.db.Reader(), index) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.