-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: upgrade dependencies, add persistence and other fixes #22
Conversation
WalkthroughThe changes in this pull request introduce several enhancements, including the addition of a database path command-line flag in Changes
Assessment against linked issues
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🧹 Outside diff range and nitpick comments (8)
main.go (2)
43-43
: Enhance thedb_path
flag definitionWhile the addition of the
-db_path
flag is correct and consistent with other flag definitions, consider the following improvements:
- Add a usage description to provide clarity for users.
- Consider providing a default value or checking for an empty string in the code to ensure a valid path is always used.
Here's a suggested improvement:
-flag.StringVar(&db_path, "db_path", "", "path to the database") +flag.StringVar(&db_path, "db_path", "/path/to/default/db", "path to the persistence database (required)")Also, consider adding a check for an empty
db_path
value afterflag.Parse()
to ensure a valid path is provided.
Line range hint
1-80
: Summary of changes in main.goThe changes in this file successfully implement the groundwork for adding persistence as outlined in issue #17. The addition of the
db_path
flag and its integration into theNewSequencer
function call are appropriate.However, there are a few areas where the implementation could be improved:
- Providing a more informative usage description for the
db_path
flag.- Implementing error handling to ensure a valid database path is provided.
- Considering a default value for the
db_path
flag.These improvements would enhance the robustness and user-friendliness of the application.
As the project moves forward with implementing persistence, consider the following architectural advice:
- Ensure that the
NewSequencer
function in thesequencing
package properly initializes the database connection using the provided path.- Implement proper error handling and recovery mechanisms for database operations throughout the sequencer logic.
- Consider implementing a configuration file approach in addition to command-line flags for more complex setups.
- Plan for future scalability by considering how the persistence layer might need to evolve (e.g., supporting different database backends).
go.mod (1)
8-8
: Approve BadgerDB addition with version consideration.The addition of BadgerDB aligns with the PR objective to implement persistence. This is a good choice for a key-value store with good performance characteristics.
Consider using the latest stable version of BadgerDB (v4.2.0 as of April 2024) for potential performance improvements and bug fixes, unless there's a specific reason to use v3.2103.5.
da/da.go (1)
Line range hint
202-212
: Approve the change with a minor suggestion for error handlingThe modification improves clarity by distinguishing between the result of
GetIDs
and the actual IDs. This change aligns well with the PR objective of upgrading dependencies, as it adapts to what appears to be an API change in thego-da
library.Consider adding a nil check for
idsResult.IDs
before assigning it toids
. This would provide more robust error handling:idsResult, err := dac.DA.GetIDs(ctx, dataLayerHeight, dac.Namespace) if err != nil { return ResultRetrieveBatch{ BaseResult: BaseResult{ Code: StatusError, Message: fmt.Sprintf("failed to get IDs: %s", err.Error()), DAHeight: dataLayerHeight, }, } } +if idsResult.IDs == nil { + return ResultRetrieveBatch{ + BaseResult: BaseResult{ + Code: StatusError, + Message: "GetIDs returned nil IDs", + DAHeight: dataLayerHeight, + }, + } +} ids := idsResult.IDsThis addition would catch potential issues where
GetIDs
succeeds but returns a nilIDs
field.sequencing/sequencer_test.go (1)
112-112
: Avoid potential flakiness in time-based assertion.Asserting
assert.Equal(t, time.Now().Day(), res.Timestamp.Day())
might cause tests to fail if they run across midnight. Consider mocking the current time or allowing for a time tolerance to ensure reliable test results.sequencing/sequencer.go (3)
669-679
: Implement missing DA verification inVerifyBatch
methodThe
VerifyBatch
method contains a TODO comment indicating that DA (Data Availability) verification needs to be implemented.I can assist in implementing the DA verification logic. Would you like me to help draft the required code or open a GitHub issue to track this task?
Line range hint
705-714
: Handle context cancellation inbatchSubmissionLoop
In the
batchSubmissionLoop
method, there is a select statement that checksc.ctx.Done()
, but it might not properly handle context cancellation during thepublishBatch
call.Consider checking for context cancellation after
publishBatch
to ensure the loop exits promptly when the context is cancelled.
592-595
: Validate transaction size before adding to queueIn
SubmitRollupTransaction
, there is no validation of the transaction size before adding it to the transaction queue. This could lead to oversized transactions being processed, potentially causing issues downstream.Consider adding a size check to ensure transactions do not exceed a maximum allowed size.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (6)
- .gitignore (1 hunks)
- da/da.go (2 hunks)
- go.mod (3 hunks)
- main.go (3 hunks)
- sequencing/sequencer.go (10 hunks)
- sequencing/sequencer_test.go (6 hunks)
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🔇 Additional comments (10)
main.go (2)
34-34
: LGTM: Addition ofdb_path
variableThe addition of the
db_path
variable is appropriate for implementing the persistence feature as outlined in issue #17. The variable type and placement are consistent with the existing code style.
63-63
: LGTM: UpdatedNewSequencer
call withdb_path
The addition of
db_path
to theNewSequencer
function call is correct and aligns with the PR objective of adding persistence.To ensure that the
NewSequencer
function in thesequencing
package has been updated to handle the newdb_path
parameter, please run the following verification script:This script will help verify that the
NewSequencer
function has been properly updated to accept the newdb_path
parameter.Consider adding error handling specific to the
db_path
parameter before callingNewSequencer
. For example:if db_path == "" { log.Fatalf("Database path is required. Please provide a valid path using the -db_path flag.") }This will ensure that a valid database path is always provided before initializing the sequencer.
go.mod (7)
12-12
: Approve go-sequencing update to stable version.The update of go-sequencing to v0.2.0 is a positive change, moving from a development version to a stable release. This aligns with the PR objective to upgrade dependencies.
13-13
: Approve rollkit update.The update of rollkit to v0.13.7 is a minor version bump, which aligns with the PR objective to upgrade dependencies. This update likely includes bug fixes and minor improvements.
36-36
: Approve glog update.The update of glog to v1.2.2 is a patch version bump, which aligns with the PR objective to upgrade dependencies. This update likely includes minor bug fixes or improvements.
89-89
: Approve gRPC update.The update of gRPC to v1.67.1 is a minor version bump, which aligns with the PR objective to upgrade dependencies. This update likely includes bug fixes and minor improvements.
Line range hint
1-93
: Summary of dependency updates.The changes in the
go.mod
file align well with the PR objectives:
- Addition of BadgerDB for persistence.
- Upgrades to key dependencies like go-da, go-sequencing, and rollkit.
- Other minor updates to improve the overall package ecosystem.
These updates should enhance the project's functionality and keep it up-to-date with the latest improvements in its dependencies.
To ensure smooth integration of these updates, please:
- Run all tests and CI pipelines to catch any potential compatibility issues.
- Review the changelogs of significantly updated packages (e.g., go-da, go-jsonrpc) for any breaking changes or new features that might affect the project.
- Update any necessary documentation to reflect these dependency changes.
32-32
: Approve go-jsonrpc update with compatibility check.The update of go-jsonrpc to v0.6.0 aligns with the PR objective to upgrade dependencies.
Given the significant version jump (from v0.3.1 to v0.6.0), please verify that this update doesn't introduce any breaking changes. Run the following command to check for any compatibility issues:
✅ Verification successful
go-jsonrpc update verified.
No direct usages of the
go-jsonrpc
package were found in the codebase, ensuring that the update tov0.6.0
does not introduce any compatibility issues.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for usage of go-jsonrpc package and check for any deprecated or changed APIs rg --type go -A 5 'github\.com/filecoin-project/go-jsonrpc'Length of output: 61
11-11
: Approve go-da update with compatibility check.The update of go-da to v0.8.0 aligns with the PR objective to upgrade dependencies.
Please ensure that this significant version jump (from v0.5.0 to v0.8.0) doesn't introduce any breaking changes that could affect the project. Run the following command to check for any compatibility issues:
da/da.go (1)
Line range hint
1-274
: Reminder: Address persistence and linked issuesWhile the changes in this file are good, I noticed that this PR also aims to address issues #16 and #17, particularly adding persistence using BadgerDB or cometdb. However, this file doesn't contain any changes related to these objectives.
To ensure we're not missing any relevant changes, let's check for new files or modifications in other parts of the codebase:
If these changes are planned for a separate PR or file, please update the PR description to clarify the scope of this particular change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🧹 Outside diff range and nitpick comments (6)
sequencing/sequencer_test.go (2)
54-59
: Consider providing a meaningful value for the new parameter inNewSequencer
.The test is using an empty string for the additional parameter in
NewSequencer
. If this parameter affects the sequencer's behavior, consider providing a meaningful test value to ensure the new functionality is properly exercised.On a positive note, the addition of the defer function to close the sequencer is a good practice for proper resource cleanup.
95-107
: LGTM: Good implementation of database initialization and cleanup.The changes in this segment effectively implement the persistence feature:
- Using
getDB()
to initialize the database is a good practice.- Adding the
db
field to the Sequencer struct is necessary for persistence.- The defer function ensures proper cleanup of resources.
Consider adding a check for the error returned by
seq.Close()
in the defer function, similar to how it's done in other test functions:defer func() { err := seq.Close() require.NoError(t, err) }()This ensures that any errors during cleanup are caught and reported.
sequencing/sequencer.go (4)
7-7
: Group standard library imports togetherThe import statement for
"encoding/hex"
at line 7 should be grouped with other standard library imports for better readability. Consider grouping imports into standard libraries, third-party libraries, and local packages, separated by blank lines.
Line range hint
177-209
: Optimize batch size calculation inGetNextBatch
The loop starting at line 189 reduces
batchSize
by one each time until the batch fits themax
size. This can be inefficient for large batches. Consider implementing a binary search to find the maximum batch size that fits withinmax
.
357-365
: Log errors when closing BadgerDBIn the
Close
method (lines 357-365), if an error occurs while closing BadgerDB, it's returned but not logged. Since this is during shutdown, consider logging the error for troubleshooting purposes.
660-672
: Implement DA verification inVerifyBatch
There's a TODO at line 662 indicating that DA verification needs to be implemented in the
VerifyBatch
method. Completing this functionality is important for ensuring the integrity of batch verification.Would you like assistance in implementing the DA verification logic or creating a GitHub issue to track this task?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- sequencing/sequencer.go (10 hunks)
- sequencing/sequencer_test.go (3 hunks)
🔇 Additional comments (9)
sequencing/sequencer_test.go (7)
80-82
: LGTM: Improved method calls and error handling.The changes in this segment are well-implemented:
- The use of a structured request object for
SubmitRollupTransaction
improves clarity.- Checking for nil response values enhances error handling.
- Passing
seq.db
toGetNextBatch
is consistent with the new persistence feature.These modifications align well with the updated method signatures and the PR objectives.
Also applies to: 85-85, 88-90
110-113
: LGTM: Proper implementation of the updated GetNextBatch method.The changes in this segment correctly implement the new GetNextBatch signature:
- Using a structured request object improves clarity.
- Checking both the Timestamp and Batch in the response ensures the method behaves as expected.
These modifications align well with the updated method signature and provide good test coverage.
117-130
: LGTM: Consistent implementation of database handling and error checking.This segment maintains consistency with previous changes:
- Proper database initialization and cleanup.
- Use of structured request object for GetNextBatch.
- Improved error handling with nil response check.
These changes contribute to the overall robustness of the test suite.
Also applies to: 133-135
139-159
: LGTM: Good addition of edge case testing.This segment maintains consistency with previous changes and adds valuable test coverage:
- Proper database initialization and cleanup.
- Use of structured request object for GetNextBatch.
- Additional test case for nil LastBatchHash improves edge case coverage.
These changes enhance the robustness of the test suite.
161-168
: LGTM: Well-implemented in-memory database initialization for testing.The
getDB
function is well-designed for testing purposes:
- Uses in-memory storage, which is ideal for tests.
- Disables logging, which can reduce noise in test output.
- Proper error handling and wrapping.
This implementation aligns well with best practices for database handling in test environments.
175-231
: LGTM: Consistent implementation and good use of hex encoding for batch hashes.This segment maintains consistency with previous changes and introduces good practices:
- Proper database initialization and cleanup.
- Use of structured request objects for GetNextBatch and VerifyBatch.
- Hex encoding of batch hashes for storage in the seenBatches map is a good practice.
Regarding the past review comment about inconsistent hex encoding:
The use ofhex.EncodeToString(batchHash)
in line 221 is correct for storing the hash in theseenBatches
map. However, when callingVerifyBatch
in line 224, the rawbatchHash
is used. This is consistent with the method signature and doesn't require changes.These modifications contribute to the overall robustness and correctness of the test suite.
Line range hint
1-231
: Overall assessment: Well-implemented persistence and API updates.This PR successfully addresses the objectives outlined in issue #17:
- Persistence has been added using BadgerDB, with proper initialization and cleanup in tests.
- The Sequencer API has been updated to use structured request/response objects, improving clarity and type safety.
- Test coverage has been maintained and expanded to cover new functionality and edge cases.
The changes are consistent throughout the file and align well with best practices for Go testing and database handling. The use of in-memory storage for tests is particularly commendable.
Minor suggestions for improvement have been noted in previous comments, but overall, this is a solid implementation that enhances the robustness and functionality of the sequencer.
sequencing/sequencer.go (2)
22-23
: EnsureErrInvalidRollupId
is utilizedThe error variable
ErrInvalidRollupId
is declared at lines 22-23. Verify that this error is used throughout the codebase where appropriate to handle invalid rollup IDs. If not used, consider removing it to keep the code clean.
Line range hint
294-351
: Handle initialization errors gracefullyIn the
NewSequencer
function (lines 294-351), errors during database loading prevent the sequencer from starting. Consider whether the sequencer should continue with partial data or implement a recovery mechanism to handle corrupt or missing data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this branch need to be updated/rebased? Seems like a lot of other changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (3)
sequencing/sequencer.go (3)
71-101
: Improve error handling in Next methodThe
Next
method has been updated to handle both in-memory and persistent storage, which is good. However, the error handling can be improved:
- When returning errors, it's beneficial to provide more context about where the error occurred.
- The method returns an empty batch with nil transactions in some error cases, which might be confusing for the caller.
Consider wrapping errors with more context and returning a nil batch instead of an empty one in error cases.
Here's a suggested improvement:
func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { bq.mu.Lock() defer bq.mu.Unlock() if len(bq.queue) == 0 { - return &sequencing.Batch{Transactions: nil}, nil + return nil, nil } batch := bq.queue[0] bq.queue = bq.queue[1:] h, err := batch.Hash() if err != nil { - return &sequencing.Batch{Transactions: nil}, err + return nil, fmt.Errorf("failed to hash batch: %w", err) } // Remove the batch from BadgerDB after processing err = db.Update(func(txn *badger.Txn) error { // Get the batch to ensure it exists in the DB before deleting _, err := txn.Get(h) if err != nil { - return err + return fmt.Errorf("batch not found in DB: %w", err) } // Delete the batch from BadgerDB return txn.Delete(h) }) if err != nil { - return &sequencing.Batch{Transactions: nil}, err + return nil, fmt.Errorf("failed to remove batch from DB: %w", err) } return &batch, nil }This change provides more context in error messages and consistently returns
nil
for the batch in error cases.
Line range hint
168-203
: Improve error handling and batch processing in GetNextBatchThe
GetNextBatch
method has been updated to handle both in-memory and persistent storage, which is good. However, there are some potential issues:
- If an error occurs while processing transactions, the method returns an empty batch, potentially losing valid transactions.
- The method doesn't handle partial success scenarios where some transactions are successfully processed and others fail.
Consider implementing a transaction-level rollback mechanism and handling partial successes.
Here's a suggested improvement:
func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { tq.mu.Lock() defer tq.mu.Unlock() var batch [][]byte batchSize := len(tq.queue) if batchSize == 0 { return sequencing.Batch{Transactions: nil} } for { batch = tq.queue[:batchSize] blobSize := totalBytes(batch) if uint64(blobSize) <= max { break } batchSize = batchSize - 1 } + var processedTxs [][]byte + var err error // Retrieve transactions from BadgerDB and remove processed ones - for _, tx := range batch { + err = db.Update(func(txn *badger.Txn) error { + for _, tx := range batch { txHash := GetTransactionHash(tx) - err := db.Update(func(txn *badger.Txn) error { // Get and then delete the transaction from BadgerDB _, err := txn.Get([]byte(txHash)) if err != nil { return err } - return txn.Delete([]byte(txHash)) // Remove processed transaction + err = txn.Delete([]byte(txHash)) // Remove processed transaction + if err != nil { + return err + } + processedTxs = append(processedTxs, tx) + return nil - }) - if err != nil { - return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails - } } + return nil + }) + if err != nil { + // If there's an error, only remove the successfully processed transactions + tq.queue = tq.queue[len(processedTxs):] + return sequencing.Batch{Transactions: processedTxs} + } tq.queue = tq.queue[batchSize:] return sequencing.Batch{Transactions: batch} }This change ensures that:
- All database operations are performed within a single transaction.
- In case of an error, we return a partial batch with only the successfully processed transactions.
- The in-memory queue is updated to reflect only the successfully processed transactions.
Line range hint
285-342
: Improve error handling in NewSequencer initializationThe
NewSequencer
function has been well updated to initialize BadgerDB and load data. However, there's room for improvement in error handling:
- If an error occurs during the loading process (e.g., loading last batch hash, seen batches, or queues), the function returns immediately without cleaning up resources.
- This could lead to resource leaks, particularly the opened BadgerDB instance.
Consider implementing a cleanup mechanism for partial initialization failures.
Here's a suggested improvement:
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { // ... (existing code) s := &Sequencer{ dalc: dalc, batchTime: batchTime, ctx: ctx, maxSize: maxBlobSize, rollupId: daNamespace, tq: NewTransactionQueue(), bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), db: db, } + var initErr error + defer func() { + if initErr != nil { + // Clean up resources if initialization fails + if err := s.Close(); err != nil { + log.Errorf("Failed to clean up resources: %v", err) + } + } + }() // Load last batch hash from DB to recover from crash - err = s.LoadLastBatchHashFromDB() + initErr = s.LoadLastBatchHashFromDB() - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load last batch hash from DB: %w", err) + return nil, fmt.Errorf("failed to load last batch hash from DB: %w", initErr) } // Load seen batches from DB to recover from crash - err = s.LoadSeenBatchesFromDB() + initErr = s.LoadSeenBatchesFromDB() - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load seen batches from DB: %w", err) + return nil, fmt.Errorf("failed to load seen batches from DB: %w", initErr) } // Load TransactionQueue and BatchQueue from DB to recover from crash - err = s.tq.LoadFromDB(s.db) // Load transactions + initErr = s.tq.LoadFromDB(s.db) // Load transactions - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load transaction queue from DB: %w", err) + return nil, fmt.Errorf("failed to load transaction queue from DB: %w", initErr) } - err = s.bq.LoadFromDB(s.db) // Load batches + initErr = s.bq.LoadFromDB(s.db) // Load batches - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) + return nil, fmt.Errorf("failed to load batch queue from DB: %w", initErr) } go s.batchSubmissionLoop(s.ctx) return s, nil }This change:
- Uses a deferred function to clean up resources if initialization fails.
- Uses a separate
initErr
variable to track initialization errors, ensuring that the cleanup logic is triggered for any initialization failure.- Calls the
Close
method to properly clean up resources, including closing the BadgerDB instance.This approach ensures that resources are properly cleaned up in case of partial initialization failures, preventing potential resource leaks.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
go.sum
is excluded by!**/*.sum
📒 Files selected for processing (6)
- .gitignore (1 hunks)
- da/da.go (2 hunks)
- go.mod (3 hunks)
- main.go (3 hunks)
- sequencing/sequencer.go (10 hunks)
- sequencing/sequencer_test.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- .gitignore
- da/da.go
- go.mod
- main.go
🧰 Additional context used
🔇 Additional comments (16)
sequencing/sequencer_test.go (11)
5-6
: LGTM: New imports added.The addition of "encoding/hex" and "fmt" imports is appropriate for the new functionality in the tests.
12-12
: LGTM: BadgerDB import added.The addition of the BadgerDB import is appropriate for the new database functionality in the tests.
54-59
: Consider using a meaningful value for the new parameter in NewSequencer.While the addition of the deferred
Close()
call is a good practice, the new parameter inNewSequencer
is set to an empty string. Consider using a meaningful test value to ensure the new functionality is properly exercised.The deferred
Close()
call is a good addition to ensure proper cleanup after the test.
70-75
: LGTM: Proper cleanup with deferred Close.The addition of the deferred
Close()
call ensures proper cleanup after the test, which is a good practice.
80-82
: LGTM: Improved API usage and error handling.The use of a structured request for
SubmitRollupTransaction
and the nil check on the response improve the API usage and error handling in the test.
85-85
: LGTM: Updated GetNextBatch call.The addition of
seq.db
as a parameter toGetNextBatch
is consistent with the new database functionality.
88-90
: LGTM: Improved error handling.The updates to the
SubmitRollupTransaction
call and subsequent error checking enhance the test's robustness and align with the updated API.
95-107
: LGTM: Proper database initialization and cleanup.The addition of database initialization using
getDB()
, including it in the Sequencer struct, and the deferredClose()
call demonstrate good practices for database handling in tests.
110-113
: LGTM: Updated GetNextBatch call and assertions.The use of a structured request for
GetNextBatch
and the updated assertions correctly reflect the changes in the method signature and return values.
117-168
: LGTM: Consistent database handling and in-memory database for tests.The addition of consistent database initialization and cleanup across multiple test functions demonstrates good testing practices. The
getDB
function's use of an in-memory database is particularly commendable as it ensures test isolation and avoids file system dependencies.
175-231
: LGTM: Consistent updates across all test functions.The changes consistently apply the new patterns of database handling and API updates across all test functions. This includes proper initialization, cleanup, use of structured requests, and updated assertions.
Note the consistent use of
hex.EncodeToString
for encoding batch hashes, which aligns with the implementation in the main code.sequencing/sequencer.go (5)
Line range hint
7-23
: LGTM: New imports and error variable addedThe addition of the BadgerDB import and the introduction of the
ErrInvalidRollupId
variable are appropriate for the new persistent storage functionality and error handling improvements.
148-152
: LGTM: Simplified GetTransactionHash functionThe simplification of the
GetTransactionHash
function usingsha256.Sum256
is an excellent improvement. It makes the code more concise and potentially more efficient by using a single function call instead of multiple steps.
265-281
: LGTM: Sequencer struct updated with BadgerDB supportThe updates to the
Sequencer
struct are well-implemented:
- The addition of the
db *badger.DB
field is necessary for the new persistent storage functionality.- The
dbMux sync.Mutex
field is a good practice for ensuring safe concurrent access to the database.These changes provide a solid foundation for implementing persistent storage in the Sequencer.
347-356
: LGTM: Well-implemented Close methodThe
Close
method is a good addition to theSequencer
struct:
- It properly checks if the
db
is nil before attempting to close it, preventing potential nil pointer dereferences.- The error handling is well-implemented, wrapping the error with additional context.
- The method ensures that the BadgerDB instance is properly closed, preventing resource leaks.
This is a crucial method for proper resource management, especially when dealing with persistent storage.
396-416
: Refine key filtering in LoadSeenBatchesFromDBThe
LoadSeenBatchesFromDB
method is a good addition for loading seen batches from BadgerDB. However, there's a potential issue:
- The method is loading all keys from the database into the
seenBatches
map without any
can you elaborate what you mean here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (2)
sequencing/sequencer.go (2)
Line range hint
169-204
: Optimize GetNextBatch for better error handling and performanceThe changes to
GetNextBatch
improve consistency between in-memory and persistent storage. However, there are potential issues with error handling and performance:
- If an error occurs during the database operation for a single transaction, the entire batch is discarded.
- The method performs individual database operations for each transaction, which could be inefficient for large batches.
Consider the following improvements:
- Implement a more granular error handling approach that allows partial batch processing.
- Use a single database transaction for all operations to improve performance.
Here's a suggested implementation:
func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { tq.mu.Lock() defer tq.mu.Unlock() var batch [][]byte batchSize := len(tq.queue) if batchSize == 0 { return sequencing.Batch{Transactions: nil} } for { batch = tq.queue[:batchSize] blobSize := totalBytes(batch) if uint64(blobSize) <= max { break } batchSize = batchSize - 1 } + var processedTxs [][]byte + err := db.Update(func(txn *badger.Txn) error { for _, tx := range batch { txHash := GetTransactionHash(tx) - err := db.Update(func(txn *badger.Txn) error { - // Get and then delete the transaction from BadgerDB - _, err := txn.Get([]byte(txHash)) - if err != nil { - return err - } - return txn.Delete([]byte(txHash)) // Remove processed transaction - }) + // Get and then delete the transaction from BadgerDB + _, err := txn.Get([]byte(txHash)) if err != nil { - return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails + log.Warnf("Failed to retrieve transaction %s: %v", txHash, err) + continue } + err = txn.Delete([]byte(txHash)) // Remove processed transaction + if err != nil { + log.Warnf("Failed to delete transaction %s: %v", txHash, err) + continue + } + processedTxs = append(processedTxs, tx) } + return nil + }) + if err != nil { + log.Errorf("Database transaction failed: %v", err) + return sequencing.Batch{Transactions: nil} + } - tq.queue = tq.queue[batchSize:] + tq.queue = tq.queue[len(processedTxs):] - return sequencing.Batch{Transactions: batch} + return sequencing.Batch{Transactions: processedTxs} }This implementation:
- Uses a single database transaction for all operations, improving performance.
- Continues processing transactions even if some fail, logging warnings for failed operations.
- Only removes successfully processed transactions from the in-memory queue.
- Returns a batch containing only the successfully processed transactions.
These changes improve error handling and performance while maintaining consistency between the in-memory queue and the database.
Line range hint
266-343
: Improve error handling during Sequencer initializationThe changes to the
Sequencer
struct andNewSequencer
function improve persistence and recovery by incorporating BadgerDB. However, there's a potential issue with error handling during initialization:
- If an error occurs during the loading of data from BadgerDB (last batch hash, seen batches, transaction queue, or batch queue), the function returns immediately, potentially leaving the sequencer in an inconsistent state.
- Resources (like the BadgerDB instance) might not be properly cleaned up in case of an error.
Consider implementing a more robust error handling mechanism that ensures proper cleanup and returns a detailed error message. Here's a suggested improvement:
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { // ... (existing code) s := &Sequencer{ dalc: dalc, batchTime: batchTime, ctx: ctx, maxSize: maxBlobSize, rollupId: daNamespace, tq: NewTransactionQueue(), bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), db: db, } + var initErrors []error // Load last batch hash from DB to recover from crash err = s.LoadLastBatchHashFromDB() if err != nil { - return nil, fmt.Errorf("failed to load last batch hash from DB: %w", err) + initErrors = append(initErrors, fmt.Errorf("failed to load last batch hash from DB: %w", err)) } // Load seen batches from DB to recover from crash err = s.LoadSeenBatchesFromDB() if err != nil { - return nil, fmt.Errorf("failed to load seen batches from DB: %w", err) + initErrors = append(initErrors, fmt.Errorf("failed to load seen batches from DB: %w", err)) } // Load TransactionQueue and BatchQueue from DB to recover from crash err = s.tq.LoadFromDB(s.db) // Load transactions if err != nil { - return nil, fmt.Errorf("failed to load transaction queue from DB: %w", err) + initErrors = append(initErrors, fmt.Errorf("failed to load transaction queue from DB: %w", err)) } err = s.bq.LoadFromDB(s.db) // Load batches if err != nil { - return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) + initErrors = append(initErrors, fmt.Errorf("failed to load batch queue from DB: %w", err)) } + if len(initErrors) > 0 { + s.Close() // Ensure resources are cleaned up + return nil, fmt.Errorf("sequencer initialization failed with %d errors: %v", len(initErrors), initErrors) + } go s.batchSubmissionLoop(s.ctx) return s, nil }This change:
- Collects all initialization errors instead of returning immediately on the first error.
- Calls
s.Close()
to clean up resources if any initialization errors occur.- Returns a comprehensive error message detailing all initialization failures.
These improvements ensure that all initialization steps are attempted, resources are properly cleaned up in case of errors, and a detailed error report is provided if initialization fails.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- sequencing/sequencer.go (10 hunks)
🧰 Additional context used
📓 Learnings (1)
sequencing/sequencer.go (1)
Learnt from: gupadhyaya PR: rollkit/centralized-sequencer#22 File: sequencing/sequencer.go:369-394 Timestamp: 2024-10-17T06:42:26.385Z Learning: In `sequencing/sequencer.go`, within the `LoadLastBatchHashFromDB` function, a nil `c.lastBatchHash` after loading is acceptable and properly handled elsewhere in the code.
🔇 Additional comments (5)
sequencing/sequencer.go (5)
Line range hint
7-24
: LGTM: New imports and error definitionThe new imports are necessary for the added functionality, and the error definition for invalid rollup ID is appropriate.
149-153
: LGTM: Simplified GetTransactionHash functionThe simplification of the
GetTransactionHash
function usingsha256.Sum256
is a good improvement. It makes the code more concise and efficient.
359-372
: LGTM: Thread-safe CompareAndSetMaxSize methodThe
CompareAndSetMaxSize
method is well-implemented:
- It uses atomic operations to ensure thread-safety when updating
c.maxSize
.- The loop structure correctly handles potential concurrent updates.
- It only updates the max size if the new size is smaller than the current size, which is a sensible approach for maintaining a maximum value.
This implementation is correct, efficient, and thread-safe.
48-69
:⚠️ Potential issueImprove error handling and atomicity in AddBatch
While the changes improve persistence by storing batches in BadgerDB, there's a potential atomicity issue:
- If the database operation fails, the batch remains in the in-memory queue.
- This could lead to inconsistency between the in-memory state and the persistent storage.
Consider wrapping both operations in a transaction or implementing a rollback mechanism for the in-memory queue if the database operation fails.
Here's a suggested improvement:
func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { - bq.mu.Lock() - bq.queue = append(bq.queue, batch) - bq.mu.Unlock() // Get the hash and bytes of the batch h, err := batch.Hash() if err != nil { return err } // Marshal the batch batchBytes, err := batch.Marshal() if err != nil { return err } - // Store the batch in BadgerDB - err = db.Update(func(txn *badger.Txn) error { - return txn.Set(h, batchBytes) - }) - return err + return db.Update(func(txn *badger.Txn) error { + err := txn.Set(h, batchBytes) + if err != nil { + return err + } + bq.mu.Lock() + defer bq.mu.Unlock() + bq.queue = append(bq.queue, batch) + return nil + }) }This ensures that the batch is only added to the in-memory queue if the database operation succeeds.
Likely invalid or redundant comment.
374-401
:⚠️ Potential issueImprove error handling and nil checks in LoadLastBatchHashFromDB
The
LoadLastBatchHashFromDB
method is a good addition for loading the last batch hash from BadgerDB. However, there are some areas for improvement:
- The error handling could be more robust, particularly for the case where the key is not found.
- There's a potential issue with assigning a potentially nil
hash
toc.lastBatchHash
without a nil check.Consider refining the error handling and adding a nil check before assigning to
c.lastBatchHash
. Here's a suggested improvement:func (c *Sequencer) LoadLastBatchHashFromDB() error { c.dbMux.Lock() defer c.dbMux.Unlock() var hash []byte err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte("lastBatchHash")) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { // If no last batch hash exists, it's the first time or nothing was processed c.lastBatchHash = nil return nil } return fmt.Errorf("failed to get lastBatchHash: %w", err) } // Set lastBatchHash in memory from BadgerDB return item.Value(func(val []byte) error { hash = make([]byte, len(val)) copy(hash, val) return nil }) }) if err != nil { return fmt.Errorf("failed to load lastBatchHash from DB: %w", err) } + if hash != nil { c.lastBatchHash = hash + } return nil }This change:
- Improves error handling by wrapping errors with more context.
- Adds a nil check before assigning
hash
toc.lastBatchHash
.- Creates a copy of the value from BadgerDB to ensure we're not holding onto a reference to BadgerDB's internal memory.
These improvements make the method more robust and less prone to potential nil pointer dereferences or data races.
⛔ Skipped due to learnings
Learnt from: gupadhyaya PR: rollkit/centralized-sequencer#22 File: sequencing/sequencer.go:369-394 Timestamp: 2024-10-17T06:42:26.385Z Learning: In `sequencing/sequencer.go`, within the `LoadLastBatchHashFromDB` function, a nil `c.lastBatchHash` after loading is acceptable and properly handled elsewhere in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (2)
sequencing/sequencer.go (2)
Line range hint
170-205
: Improve error handling and atomicity in GetNextBatch method.The
GetNextBatch
method now handles both in-memory and persistent storage removal, which is good. However, there are potential issues with error handling and atomicity:
- If an error occurs during the database operation for a single transaction, the method returns an empty batch, potentially losing valid transactions.
- The method doesn't ensure atomicity between the in-memory queue and the database operations.
Consider the following improvements:
- Use a single database transaction for all operations to ensure atomicity.
- Handle errors for individual transactions without discarding the entire batch.
Here's a suggested improvement:
func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { tq.mu.Lock() defer tq.mu.Unlock() var batch [][]byte batchSize := len(tq.queue) if batchSize == 0 { return sequencing.Batch{Transactions: nil} } for { batch = tq.queue[:batchSize] blobSize := totalBytes(batch) if uint64(blobSize) <= max { break } batchSize = batchSize - 1 } + var finalBatch [][]byte + err := db.Update(func(txn *badger.Txn) error { for _, tx := range batch { txHash := GetTransactionHash(tx) - err := db.Update(func(txn *badger.Txn) error { - // Get and then delete the transaction from BadgerDB - _, err := txn.Get([]byte(txHash)) - if err != nil { - return err - } - return txn.Delete([]byte(txHash)) // Remove processed transaction - }) + _, err := txn.Get([]byte(txHash)) if err != nil { - return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails + log.Warnf("Transaction %s not found in DB, skipping", txHash) + continue } + err = txn.Delete([]byte(txHash)) + if err != nil { + return err + } + finalBatch = append(finalBatch, tx) } + return nil + }) + if err != nil { + log.Errorf("Error processing batch: %v", err) + return sequencing.Batch{Transactions: nil} + } - tq.queue = tq.queue[batchSize:] + tq.queue = tq.queue[len(finalBatch):] - return sequencing.Batch{Transactions: batch} + return sequencing.Batch{Transactions: finalBatch} }This implementation ensures atomicity between database operations and in-memory queue updates, and handles errors for individual transactions without discarding the entire batch.
Line range hint
287-346
: Improve error handling in NewSequencer for data loading operations.The
NewSequencer
function now initializes BadgerDB and loads data from it, which is good. However, there's no error handling for the data loading operations (LoadLastBatchHashFromDB, LoadSeenBatchesFromDB, LoadFromDB for TransactionQueue and BatchQueue).Consider adding error handling for these operations and potentially allowing the caller to decide how to proceed if a loading operation fails. Here's a suggested improvement:
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { // ... (existing code) s := &Sequencer{ // ... (existing fields) } - // Load last batch hash from DB to recover from crash - err = s.LoadLastBatchHashFromDB() - if err != nil { - return nil, fmt.Errorf("failed to load last batch hash from DB: %w", err) - } - - // Load seen batches from DB to recover from crash - err = s.LoadSeenBatchesFromDB() - if err != nil { - return nil, fmt.Errorf("failed to load seen batches from DB: %w", err) - } - - // Load TransactionQueue and BatchQueue from DB to recover from crash - err = s.tq.LoadFromDB(s.db) // Load transactions - if err != nil { - return nil, fmt.Errorf("failed to load transaction queue from DB: %w", err) - } - err = s.bq.LoadFromDB(s.db) // Load batches - if err != nil { - return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) - } + // Load data from DB to recover from crash + loadErrors := make(map[string]error) + + if err := s.LoadLastBatchHashFromDB(); err != nil { + loadErrors["lastBatchHash"] = err + } + + if err := s.LoadSeenBatchesFromDB(); err != nil { + loadErrors["seenBatches"] = err + } + + if err := s.tq.LoadFromDB(s.db); err != nil { + loadErrors["transactionQueue"] = err + } + + if err := s.bq.LoadFromDB(s.db); err != nil { + loadErrors["batchQueue"] = err + } + + if len(loadErrors) > 0 { + // Log the errors + for key, err := range loadErrors { + log.Errorf("Failed to load %s from DB: %v", key, err) + } + // Optionally, return an error or allow the caller to decide how to proceed + // return nil, fmt.Errorf("failed to load some data from DB: %v", loadErrors) + } go s.batchSubmissionLoop(s.ctx) return s, nil }This change collects all loading errors and logs them, allowing for more granular error handling and potentially giving the caller the option to decide how to proceed if some data fails to load.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- sequencing/sequencer.go (10 hunks)
- sequencing/sequencer_test.go (3 hunks)
🧰 Additional context used
📓 Learnings (1)
sequencing/sequencer.go (1)
Learnt from: gupadhyaya PR: rollkit/centralized-sequencer#22 File: sequencing/sequencer.go:369-394 Timestamp: 2024-10-17T06:42:26.385Z Learning: In `sequencing/sequencer.go`, within the `LoadLastBatchHashFromDB` function, a nil `c.lastBatchHash` after loading is acceptable and properly handled elsewhere in the code.
🔇 Additional comments (11)
sequencing/sequencer.go (8)
7-7
: New imports added for enhanced functionality.The additions of
"encoding/hex"
,"reflect"
,"sync/atomic"
, and"github.com/dgraph-io/badger/v3"
indicate new features related to hex encoding, reflection, atomic operations, and persistent storage using BadgerDB. These are appropriate for the changes implemented in this file.Also applies to: 10-10, 12-12, 15-15
24-25
: New error variable for invalid rollup ID.The addition of
ErrInvalidRollupId
is a good practice for defining custom errors. It will help in providing clear error messages when an invalid rollup ID is encountered.
360-373
: 🛠️ Refactor suggestionSimplify CompareAndSetMaxSize function.
The
CompareAndSetMaxSize
function uses atomic operations to ensure thread-safety, which is good. However, the logic can be simplified.Consider simplifying the function as follows:
func (c *Sequencer) CompareAndSetMaxSize(size uint64) { - for { - current := atomic.LoadUint64(&c.maxSize) - if size >= current { - return - } - if atomic.CompareAndSwapUint64(&c.maxSize, current, size) { - return - } - } + for { + current := atomic.LoadUint64(&c.maxSize) + if size >= current { + return + } + if atomic.CompareAndSwapUint64(&c.maxSize, current, size) { + return + } + } }This change simplifies the function while maintaining its thread-safe behavior. It will keep trying to update
c.maxSize
until it succeeds or the new size is no longer smaller than the current size.Likely invalid or redundant comment.
236-255
:⚠️ Potential issueImprove atomicity and error handling in AddBatchBackToQueue method.
The
AddBatchBackToQueue
method is a good addition for handling failure scenarios. However, there are potential issues with atomicity and error handling:
- If an error occurs while adding a transaction back to the database, the method returns immediately, leaving the in-memory queue and database in an inconsistent state.
- The method doesn't handle partial failures well, as it stops processing at the first error.
Consider using a transaction to ensure atomicity and implementing a way to handle partial failures. Here's a suggested improvement:
func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badger.DB) error { tq.mu.Lock() defer tq.mu.Unlock() // Add the batch back to the in-memory transaction queue tq.queue = append(tq.queue, batch.Transactions...) + return db.Update(func(txn *badger.Txn) error { + var errs []error // Optionally, persist the batch back to BadgerDB for _, tx := range batch.Transactions { - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB - }) + err := txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB if err != nil { - return fmt.Errorf("failed to revert transaction to DB: %w", err) + errs = append(errs, fmt.Errorf("failed to revert transaction to DB: %w", err)) } } + if len(errs) > 0 { + return fmt.Errorf("failed to revert %d transactions: %v", len(errs), errs) + } + return nil + }) - return nil }This change:
- Uses a single database transaction for all operations, ensuring atomicity.
- Collects all errors that occur during the process.
- Returns a comprehensive error message if any transactions fail to revert.
This approach ensures that either all transactions are reverted successfully, or none are, maintaining consistency between the in-memory queue and the database.
Likely invalid or redundant comment.
349-358
:⚠️ Potential issueImprove error handling in Close method.
The
Close
method is a good addition for safely closing the BadgerDB instance. However, there's room for improvement in error handling:
- The method returns the error directly without providing context about where the error occurred.
- It doesn't handle the case where
c.db
is nil, which could lead to a nil pointer dereference ifClose
is called multiple times.Consider adding more context to the error and handling the nil case. Here's a suggested improvement:
func (c *Sequencer) Close() error { if c.db != nil { err := c.db.Close() + c.db = nil // Prevent potential future calls to Close from accessing a closed database if err != nil { - return err + return fmt.Errorf("failed to close BadgerDB: %w", err) } } return nil }This change:
- Adds more context to the error message, making it clear where the error occurred.
- Sets
c.db
to nil after closing, preventing potential issues ifClose
is called multiple times.- Still returns nil if the database was already nil or closed successfully.
These improvements make the
Close
method more robust and informative.Likely invalid or redundant comment.
375-402
:⚠️ Potential issueImprove nil handling in LoadLastBatchHashFromDB method.
The
LoadLastBatchHashFromDB
method is a good addition for loading the last batch hash from BadgerDB. However, there's a potential issue with assigning a potentially nilhash
toc.lastBatchHash
without a nil check.Consider adding a nil check before assigning to
c.lastBatchHash
. Here's a suggested improvement:func (c *Sequencer) LoadLastBatchHashFromDB() error { c.dbMux.Lock() defer c.dbMux.Unlock() var hash []byte err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte("lastBatchHash")) if errors.Is(err, badger.ErrKeyNotFound) { // If no last batch hash exists, it's the first time or nothing was processed c.lastBatchHash = nil return nil } if err != nil { return err } // Set lastBatchHash in memory from BadgerDB return item.Value(func(val []byte) error { hash = make([]byte, len(val)) copy(hash, val) return nil }) }) - // Set the in-memory lastBatchHash after successfully loading it from DB - c.lastBatchHash = hash + if err == nil && hash != nil { + c.lastBatchHash = hash + } return err }This change ensures that
c.lastBatchHash
is only updated if the database operation was successful and the loaded hash is not nil. It also creates a copy of the value from BadgerDB to ensure⛔ Skipped due to learnings
Learnt from: gupadhyaya PR: rollkit/centralized-sequencer#22 File: sequencing/sequencer.go:369-394 Timestamp: 2024-10-17T06:42:26.385Z Learning: In `sequencing/sequencer.go`, within the `LoadLastBatchHashFromDB` function, a nil `c.lastBatchHash` after loading is acceptable and properly handled elsewhere in the code.
157-166
:⚠️ Potential issueImprove atomicity in AddTransaction method.
Similar to the
AddBatch
method, there's a potential atomicity issue inAddTransaction
. If the database operation fails, the transaction remains in the in-memory queue, which could lead to inconsistency between the in-memory state and the persistent storage.Consider wrapping both operations in a transaction to ensure atomicity. Here's a suggested improvement:
func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { - tq.mu.Lock() - tq.queue = append(tq.queue, tx) - tq.mu.Unlock() - // Store transaction in BadgerDB - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) - }) - return err + return db.Update(func(txn *badger.Txn) error { + err := txn.Set([]byte(GetTransactionHash(tx)), tx) + if err != nil { + return err + } + tq.mu.Lock() + defer tq.mu.Unlock() + tq.queue = append(tq.queue, tx) + return nil + }) }This ensures that the transaction is only added to the in-memory queue if the database operation succeeds.
Likely invalid or redundant comment.
49-70
:⚠️ Potential issueImprove atomicity in AddBatch method.
While the method now handles both in-memory and persistent storage, there's a potential atomicity issue. If the database operation fails, the batch remains in the in-memory queue, which could lead to inconsistency between the in-memory state and the persistent storage.
Consider wrapping both operations in a transaction or implementing a rollback mechanism for the in-memory queue if the database operation fails. Here's a suggested improvement:
func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { - bq.mu.Lock() - bq.queue = append(bq.queue, batch) - bq.mu.Unlock() // Get the hash and bytes of the batch h, err := batch.Hash() if err != nil { return err } // Marshal the batch batchBytes, err := batch.Marshal() if err != nil { return err } - // Store the batch in BadgerDB - err = db.Update(func(txn *badger.Txn) error { - return txn.Set(h, batchBytes) - }) - return err + // Store the batch in BadgerDB and update in-memory queue atomically + return db.Update(func(txn *badger.Txn) error { + err := txn.Set(h, batchBytes) + if err != nil { + return err + } + bq.mu.Lock() + defer bq.mu.Unlock() + bq.queue = append(bq.queue, batch) + return nil + }) }This ensures that the batch is only added to the in-memory queue if the database operation succeeds.
Likely invalid or redundant comment.
sequencing/sequencer_test.go (3)
54-59
: Proper resource cleanup withseq.Close()
Adding the
defer
function to closeseq
ensures that all resources are properly released after the test completes. This prevents potential resource leaks.
166-173
: Good use of in-memory database for testingUtilizing an in-memory BadgerDB instance improves test isolation and performance by avoiding file system dependencies. This enhances the reliability of the tests.
226-236
:⚠️ Potential issueEnsure consistent encoding of
batchHash
inVerifyBatch
The
batchHash
is stored inseq.seenBatches
as a hex-encoded string usinghex.EncodeToString(batchHash)
, but inVerifyBatch
, the rawbatchHash
bytes are used for the lookup. This could lead to mismatches during batch verification.Run the following script to check if
VerifyBatch
correctly encodesBatchHash
before performing the lookup:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
sequencing/sequencer.go (2)
Line range hint
170-205
: Improve error handling and atomicity in GetNextBatch method.The changes to
GetNextBatch
improve consistency between in-memory and persistent storage. However, there are potential issues with error handling and atomicity:
- If an error occurs while removing a transaction from BadgerDB, the method returns an empty batch, potentially losing valid transactions.
- The in-memory queue is modified before ensuring all database operations succeed.
Consider the following improvements:
- Use a single database transaction for all operations to ensure atomicity.
- Only modify the in-memory queue if all database operations succeed.
- Implement better error handling to avoid losing valid transactions on partial failures.
Here's a suggested improvement:
func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { tq.mu.Lock() defer tq.mu.Unlock() var batch [][]byte batchSize := len(tq.queue) if batchSize == 0 { return sequencing.Batch{Transactions: nil} } for { batch = tq.queue[:batchSize] blobSize := totalBytes(batch) if uint64(blobSize) <= max { break } batchSize = batchSize - 1 } + var processedTxs [][]byte + err := db.Update(func(txn *badger.Txn) error { // Retrieve transactions from BadgerDB and remove processed ones for _, tx := range batch { txHash := GetTransactionHash(tx) - err := db.Update(func(txn *badger.Txn) error { - // Get and then delete the transaction from BadgerDB - _, err := txn.Get([]byte(txHash)) - if err != nil { - return err - } - return txn.Delete([]byte(txHash)) // Remove processed transaction - }) + // Get and then delete the transaction from BadgerDB + _, err := txn.Get([]byte(txHash)) if err != nil { - return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails + return fmt.Errorf("failed to get transaction %s: %w", txHash, err) } + err = txn.Delete([]byte(txHash)) + if err != nil { + return fmt.Errorf("failed to delete transaction %s: %w", txHash, err) + } + processedTxs = append(processedTxs, tx) } + return nil + }) + if err != nil { + log.Errorf("Error processing batch: %v", err) + return sequencing.Batch{Transactions: nil} + } - tq.queue = tq.queue[batchSize:] + tq.queue = tq.queue[len(processedTxs):] - return sequencing.Batch{Transactions: batch} + return sequencing.Batch{Transactions: processedTxs} }This implementation ensures atomicity of the database operations and only modifies the in-memory queue if all operations succeed. It also provides better error handling and logging.
Line range hint
287-344
: Improve error handling in NewSequencer initialization.The changes to
NewSequencer
are good additions for implementing persistence. However, there's a potential issue with error handling during the loading process:
- If an error occurs during any of the loading steps (last batch hash, seen batches, transaction queue, or batch queue), the function returns immediately without cleaning up resources.
- This could lead to resource leaks, particularly the open BadgerDB instance.
Consider implementing a cleanup mechanism and more comprehensive error handling.
Here's a suggested improvement:
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { // ... (existing code) s := &Sequencer{ // ... (existing fields) } + var initErr error + defer func() { + if initErr != nil { + s.Close() // Ensure resources are cleaned up if initialization fails + } + }() // Load last batch hash from DB to recover from crash - err = s.LoadLastBatchHashFromDB() + initErr = s.LoadLastBatchHashFromDB() - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load last batch hash from DB: %w", err) + return nil, fmt.Errorf("failed to load last batch hash from DB: %w", initErr) } // Load seen batches from DB to recover from crash - err = s.LoadSeenBatchesFromDB() + initErr = s.LoadSeenBatchesFromDB() - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load seen batches from DB: %w", err) + return nil, fmt.Errorf("failed to load seen batches from DB: %w", initErr) } // Load TransactionQueue and BatchQueue from DB to recover from crash - err = s.tq.LoadFromDB(s.db) // Load transactions + initErr = s.tq.LoadFromDB(s.db) // Load transactions - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load transaction queue from DB: %w", err) + return nil, fmt.Errorf("failed to load transaction queue from DB: %w", initErr) } - err = s.bq.LoadFromDB(s.db) // Load batches + initErr = s.bq.LoadFromDB(s.db) // Load batches - if err != nil { + if initErr != nil { - return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) + return nil, fmt.Errorf("failed to load batch queue from DB: %w", initErr) } go s.batchSubmissionLoop(s.ctx) return s, nil }This change ensures that if any initialization step fails, all resources (including the BadgerDB instance) are properly cleaned up.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- sequencing/sequencer.go (10 hunks)
🧰 Additional context used
📓 Learnings (1)
sequencing/sequencer.go (1)
Learnt from: gupadhyaya PR: rollkit/centralized-sequencer#22 File: sequencing/sequencer.go:369-394 Timestamp: 2024-10-17T06:42:26.385Z Learning: In `sequencing/sequencer.go`, within the `LoadLastBatchHashFromDB` function, a nil `c.lastBatchHash` after loading is acceptable and properly handled elsewhere in the code.
🔇 Additional comments (9)
sequencing/sequencer.go (9)
7-15
: New imports and error variable look good.The added imports are necessary for the new functionality, and the
ErrInvalidRollupId
error variable is well-defined for handling invalid rollup ID cases.Also applies to: 24-25
267-283
: Sequencer struct changes look good.The additions of
db *badger.DB
anddbMux sync.Mutex
to the Sequencer struct are appropriate for implementing persistence and ensuring thread-safety when accessing the database.
360-373
: CompareAndSetMaxSize method looks good.The
CompareAndSetMaxSize
method is well-implemented using atomic operations to ensure thread-safety when updating themaxSize
. The use of a loop withCompareAndSwapUint64
is a correct approach for handling potential concurrent updates.
150-154
: 🛠️ Refactor suggestionSimplify GetTransactionHash function.
The
GetTransactionHash
function can be simplified using thesha256.Sum256
function, which returns the checksum directly.Consider simplifying the function as follows:
func GetTransactionHash(txBytes []byte) string { - hashBytes := sha256.Sum256(txBytes) - return hex.EncodeToString(hashBytes[:]) + return hex.EncodeToString(sha256.Sum256(txBytes)[:]) }This change makes the code more concise and efficient.
Likely invalid or redundant comment.
349-358
:⚠️ Potential issueImprove error handling in Close method.
The
Close
method is a good addition for safely closing the BadgerDB instance. However, there's a potential issue with error handling:
- The method returns the error directly without providing context about where the error occurred.
- It doesn't handle the case where
c.db
is nil, which could lead to a nil pointer dereference ifClose
is called multiple times.Consider adding more context to the error and handling the nil case.
Here's a suggested improvement:
func (c *Sequencer) Close() error { if c.db != nil { err := c.db.Close() + c.db = nil // Prevent potential future calls to Close from accessing a closed database if err != nil { - return err + return fmt.Errorf("failed to close BadgerDB: %w", err) } } return nil }This change:
- Adds more context to the error message, making it clear where the error occurred.
- Sets
c.db
to nil after closing, preventing potential issues ifClose
is called multiple times.- Still returns nil if the database was already nil or closed successfully.
These improvements make the
Close
method more robust and informative.Likely invalid or redundant comment.
49-70
:⚠️ Potential issueConsider improving atomicity in AddBatch method.
While the changes improve persistence by storing batches in BadgerDB, there's a potential atomicity issue:
- If the database operation fails, the batch remains in the in-memory queue.
- This could lead to inconsistency between the in-memory state and the persistent storage.
Consider wrapping both operations in a transaction or implementing a rollback mechanism for the in-memory queue if the database operation fails.
Here's a suggested improvement:
func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { - bq.mu.Lock() - bq.queue = append(bq.queue, batch) - bq.mu.Unlock() - // Get the hash and bytes of the batch - h, err := batch.Hash() - if err != nil { - return err - } - // Marshal the batch - batchBytes, err := batch.Marshal() - if err != nil { - return err - } - // Store the batch in BadgerDB - err = db.Update(func(txn *badger.Txn) error { - return txn.Set(h, batchBytes) - }) - return err + return db.Update(func(txn *badger.Txn) error { + h, err := batch.Hash() + if err != nil { + return err + } + batchBytes, err := batch.Marshal() + if err != nil { + return err + } + err = txn.Set(h, batchBytes) + if err != nil { + return err + } + bq.mu.Lock() + defer bq.mu.Unlock() + bq.queue = append(bq.queue, batch) + return nil + }) }This ensures that the batch is only added to the in-memory queue if the database operation succeeds.
Likely invalid or redundant comment.
375-402
:⚠️ Potential issueImprove error handling and nil checks in LoadLastBatchHashFromDB.
The
LoadLastBatchHashFromDB
method is a good addition for loading the last batch hash from BadgerDB. However, there are some areas for improvement:
- The error handling could be more robust, particularly for the case where the key is not found.
- There's a potential issue with assigning a potentially nil
hash
toc.lastBatchHash
without a nil check.Consider refining the error handling and adding a nil check before assigning to
c.lastBatchHash
.Here's a suggested improvement:
func (c *Sequencer) LoadLastBatchHashFromDB() error { c.dbMux.Lock() defer c.dbMux.Unlock() var hash []byte err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte("lastBatchHash")) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { // If no last batch hash exists, it's the first time or nothing was processed c.lastBatchHash = nil return nil } return fmt.Errorf("failed to get lastBatchHash: %w", err) } // Set lastBatchHash in memory from BadgerDB return item.Value(func(val []byte) error { hash = make([]byte, len(val)) copy(hash, val) return nil }) }) if err != nil { return fmt.Errorf("failed to load lastBatchHash from DB: %w", err) } + if hash != nil { c.lastBatchHash = <details> <summary>⛔ Skipped due to learnings</summary>
Learnt from: gupadhyaya
PR: #22
File: sequencing/sequencer.go:369-394
Timestamp: 2024-10-17T06:42:26.385Z
Learning: Insequencing/sequencer.go
, within theLoadLastBatchHashFromDB
function, a nilc.lastBatchHash
after loading is acceptable and properly handled elsewhere in the code.</details> --- `157-166`: _:warning: Potential issue_ **Improve atomicity in AddTransaction method.** While the changes to `AddTransaction` improve persistence by storing transactions in BadgerDB, there's a potential atomicity issue: 1. If the database operation fails, the transaction remains in the in-memory queue. 2. This could lead to inconsistency between the in-memory state and the persistent storage. Consider wrapping both operations in a transaction or implementing a rollback mechanism for the in-memory queue if the database operation fails. Here's a suggested improvement: ```diff func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { - tq.mu.Lock() - tq.queue = append(tq.queue, tx) - tq.mu.Unlock() - // Store transaction in BadgerDB - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) - }) - return err + return db.Update(func(txn *badger.Txn) error { + err := txn.Set([]byte(GetTransactionHash(tx)), tx) + if err != nil { + return err + } + tq.mu.Lock() + defer tq.mu.Unlock() + tq.queue = append(tq.queue, tx) + return nil + }) }
This ensures that the transaction is only added to the in-memory queue if the database operation succeeds.
Likely invalid or redundant comment.
105-134
:⚠️ Potential issueConsider pagination in LoadFromDB method for large datasets.
The
LoadFromDB
method is a good addition for reloading batches after a crash or restart. However, there are two potential issues:
- The method loads all batches into memory at once, which could be problematic for large datasets.
- If an error occurs during the iteration, the method will return immediately, potentially leaving some batches unloaded.
Consider the following improvements:
- Implement a pagination mechanism to load batches in chunks, reducing memory usage.
- Collect errors during iteration and continue loading other batches.
Here's a suggested improvement:
func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { bq.mu.Lock() defer bq.mu.Unlock() + const batchSize = 1000 // Adjust based on your requirements + var loadErrors []error err := db.View(func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() + count := 0 for it.Rewind(); it.Valid(); it.Next() { item := it.Item() err := item.Value(func(val []byte) error { var batch sequencing.Batch err := batch.Unmarshal(val) if err != nil { - return err + loadErrors = append(loadErrors, fmt.Errorf("failed to unmarshal batch: %w", err)) + return nil } bq.queue = append(bq.queue, batch) return nil }) if err != nil { - return err + loadErrors = append(loadErrors, fmt.Errorf("failed to process item: %w", err)) } + count++ + if count >= batchSize { + // Yield to allow other operations + return nil + } } return nil }) + if err != nil { + loadErrors = append(loadErrors, fmt.Errorf("database view failed: %w", err)) + } + if len(loadErrors) > 0 { + return fmt.Errorf("encountered %d errors while loading batches: %v", len(loadErrors), loadErrors) + } return nil }This implementation loads batches in chunks, continues loading even if some batches fail, and provides a summary of all encountered errors.
Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
utAck
🎉 This PR is included in version 0.2.0 🎉 The release is available on GitHub release Your semantic-release bot 📦🚀 |
Fixes #17 #16
Also upgrades dependencies to latest: go-sequencing and go-da
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores