-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: Internalize FNS logic to stage FinalizeBlock events #2399
Conversation
WalkthroughThe pull request introduces significant modifications across various files, primarily focusing on the handling of order book fill updates and subaccount updates. Key changes include the addition of a new field in the Changes
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (8)
protocol/streaming/types/interface.go (1)
Line range hint
1-72
: Summary of changes toFullNodeStreamingManager
interfaceThe changes to this interface align well with the PR objective of internalizing logic for FinalizeBlock events. Key observations:
- Consistent replacement of
execMode sdk.ExecMode
withctx sdk.Context
, providing more flexibility and information to the methods.- Shift towards handling individual updates (e.g.,
SendOrderbookFillUpdate
) rather than batches in some cases.- Removal of
StageFinalizeBlockFill
and addition ofSendSubaccountUpdate
, suggesting a restructuring of FinalizeBlock event handling.These changes appear to improve the interface's flexibility and granularity. However, please ensure that:
- The performance implications of processing individual updates instead of batches have been considered.
- The functionality previously handled by
StageFinalizeBlockFill
is adequately covered in the new structure.- All parts of the codebase that interact with this interface have been updated to reflect these changes.
Consider documenting these significant interface changes, especially the rationale behind moving from batch to individual processing and the new approach to handling FinalizeBlock events. This will help maintain the codebase's clarity and assist future developers in understanding the design decisions.
protocol/streaming/noop_streaming_manager.go (3)
35-35
: LGTM. Consider adding a comment explaining the noop behavior.The change from
execMode sdk.ExecMode
toctx sdk.Context
is appropriate and consistent with the PR objectives. It provides more flexibility and access to additional context information.Consider adding a comment explaining why this method does nothing, as it might not be immediately clear to other developers why a "noop" implementation is used here. For example:
// SendOrderbookUpdates is a no-op implementation as part of the NoopGrpcStreamingManager.
88-91
: LGTM. Consider adding a comment for clarity.The rename from
StageFinalizeBlockSubaccountUpdate
toSendSubaccountUpdate
is appropriate and aligns with the PR objectives of internalizing logic for FinalizeBlock events.Consider adding a comment to explain the purpose of this method and its no-op nature:
// SendSubaccountUpdate is a no-op implementation for sending subaccount updates // as part of the NoopGrpcStreamingManager.
Line range hint
1-103
: Overall changes look good. Consider updating documentation.The changes in this file are consistent with the PR objectives of internalizing logic for FinalizeBlock events. The shift from
execMode
toctx
and the renaming of methods improve flexibility and clarity.Consider updating the package-level documentation or README to reflect these changes in the event handling flow, especially:
- The removal of the
StageFinalizeBlockFill
method.- The renaming of methods from "stage" to "send" (e.g.,
StageFinalizeBlockSubaccountUpdate
toSendSubaccountUpdate
).- The change in
SendOrderbookFillUpdate
to handle individual updates instead of batches.This will help other developers understand the new architecture and event flow more easily.
protocol/x/clob/keeper/process_operations.go (3)
563-567
: LGTM! Consider extracting common parameters.The change from
StageFinalizeBlockFill
toSendOrderbookFillUpdate
looks good and appears to provide more detailed information for the update. This change is consistent with the modifications in other functions.Consider extracting the common parameters (
uint32(ctx.BlockHeight())
,ctx
,k.PerpetualIdToClobPairId
) into a separate struct or function to reduce duplication across similar function calls in this file.
850-852
: LGTM, but consider aligning with other functions for consistency.The change to use
SendOrderbookFillUpdate
is consistent with the overall refactoring. However, unlike the other modified functions, this one doesn't include the additional parameters (block height, context, and perpetual ID to CLOB pair ID mapping).For consistency and to potentially future-proof this function, consider aligning the
SendOrderbookFillUpdate
call with the implementation inPersistMatchOrdersToState
andPersistMatchLiquidationToState
. This might involve adding the missing parameters:k.SendOrderbookFillUpdate( streamOrderbookFill, uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId, )If these parameters are not needed in this context, it might be worth adding a comment explaining why to prevent future confusion.
563-567
: Overall LGTM. Consider addressing consistency and potential refactoring.The changes in this file primarily involve replacing
StageFinalizeBlockFill
withSendOrderbookFillUpdate
across multiple functions. This refactoring appears to improve the handling of orderbook fill updates by providing more detailed information.To further improve the code:
- Consider extracting the common parameters (
uint32(ctx.BlockHeight())
,ctx
,k.PerpetualIdToClobPairId
) into a separate struct or function to reduce duplication.- Align the implementation in
PersistMatchDeleveragingToState
with the other functions for consistency, or add a comment explaining the difference if it's intentional.- If this change is part of a larger refactoring effort, ensure that all related parts of the codebase are updated consistently.
These improvements will enhance code maintainability and reduce the likelihood of future inconsistencies or errors.
Also applies to: 675-679, 850-852
protocol/streaming/full_node_streaming_manager.go (1)
610-612
: TODO: Implement missing logicThere is a
// TODO
comment here indicating that some implementation is missing. It's important to address this to ensure the functionality is complete.Would you like me to help implement the missing logic or open a GitHub issue to track this task?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
protocol/x/clob/types/streaming.pb.go
is excluded by!**/*.pb.go
📒 Files selected for processing (11)
- proto/dydxprotocol/clob/streaming.proto (1 hunks)
- protocol/mocks/MemClobKeeper.go (1 hunks)
- protocol/streaming/full_node_streaming_manager.go (3 hunks)
- protocol/streaming/noop_streaming_manager.go (2 hunks)
- protocol/streaming/types/interface.go (2 hunks)
- protocol/testutil/memclob/keeper.go (1 hunks)
- protocol/x/clob/keeper/keeper.go (1 hunks)
- protocol/x/clob/keeper/process_operations.go (3 hunks)
- protocol/x/clob/memclob/memclob.go (1 hunks)
- protocol/x/clob/types/mem_clob_keeper.go (1 hunks)
- protocol/x/subaccounts/keeper/subaccount.go (1 hunks)
🔇 Additional comments (17)
protocol/streaming/types/interface.go (4)
37-41
: Approve changes toSendOrderbookFillUpdate
method, but consider performance implications.The method has been renamed and modified to handle individual orderbook fills instead of a batch. This change allows for more granular control and potentially easier error handling. The replacement of
execMode
withctx sdk.Context
is consistent with other changes and provides more flexibility.However, please consider the performance implications of processing individual fills instead of batches. Can you provide insights into why this change was made and if there are any potential performance concerns?
To verify the usage of this updated method, please run the following script:
#!/bin/bash # Description: Verify all calls to SendOrderbookFillUpdate use the new signature with individual fill and sdk.Context # Test: Search for function calls to SendOrderbookFillUpdate. Expect: Only occurrences of the new signature. rg --type go -A 5 $'SendOrderbookFillUpdate\('
Line range hint
1-1
: Clarify handling of FinalizeBlock fill events after removal ofStageFinalizeBlockFill
.The
StageFinalizeBlockFill
method has been removed from the interface. This aligns with the PR objective of internalizing logic for FinalizeBlock events. However, it's important to ensure that the functionality provided by this method is either no longer needed or has been replaced elsewhere.Could you please clarify how the functionality previously handled by
StageFinalizeBlockFill
is now being managed? Are there any other parts of the codebase that need to be updated due to this removal?To help verify the impact of this removal, please run the following script:
#!/bin/bash # Description: Search for any remaining references to StageFinalizeBlockFill # Test: Search for any occurrences of StageFinalizeBlockFill. Expect: No results. rg --type go $'StageFinalizeBlockFill'
53-56
: Approve addition ofSendSubaccountUpdate
method.The new
SendSubaccountUpdate
method is a welcome addition to the interface. It's consistent with the use ofsdk.Context
in other methods and appears to be part of the restructuring to internalize FinalizeBlock event logic.To ensure proper implementation and usage of this new method, please run the following script:
#!/bin/bash # Description: Verify the implementation and usage of the new SendSubaccountUpdate method # Test 1: Search for the implementation of SendSubaccountUpdate echo "Searching for SendSubaccountUpdate implementation:" rg --type go -A 10 $'func \(.*\) SendSubaccountUpdate\(' # Test 2: Search for calls to SendSubaccountUpdate echo "Searching for SendSubaccountUpdate usage:" rg --type go -A 5 $'SendSubaccountUpdate\('Please review the implementation to ensure it correctly handles the subaccount update logic, and verify that it's being called appropriately throughout the codebase.
35-35
: Approve change toSendOrderbookUpdates
method signature.The replacement of
execMode sdk.ExecMode
withctx sdk.Context
provides more flexibility and access to additional information that might be needed for sending orderbook updates. This change aligns with the PR objective of internalizing logic for FinalizeBlock events.To ensure consistency across the codebase, please run the following script to verify the usage of this method:
✅ Verification successful
Apologies for the confusion in the previous script. Please use the corrected script below to verify all calls to
SendOrderbookUpdates
utilize the new signature withsdk.Context
:
Verification Successful: All
SendOrderbookUpdates
Calls Updated AppropriatelyAll instances of
SendOrderbookUpdates
across the codebase now utilize the new signature withsdk.Context
, ensuring consistency and alignment with the PR objectives.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all calls to SendOrderbookUpdates use the new signature with sdk.Context # Test: Search for function calls to SendOrderbookUpdates. Expect: Only occurrences of the new signature. rg --type go -A 5 $'SendOrderbookUpdates\('Length of output: 3828
Script:
#!/bin/bash # Description: Verify all calls to SendOrderbookUpdates use the new signature with sdk.Context # Test: Search for function calls to SendOrderbookUpdates. Expect: Only occurrences of the new signature. rg --type go -A 5 'SendOrderbookUpdates\('Length of output: 5901
protocol/streaming/noop_streaming_manager.go (1)
39-44
: LGTM. Consider updating related code for consistency.The changes to this method, including the rename and parameter updates, are appropriate and align with the PR objectives. The shift from batch processing to individual processing of order book fills may improve granularity and flexibility.
To ensure consistency across the codebase, please run the following script to check for any remaining references to the old method name:
If any results are found, consider updating those references to use the new method name and signature.
protocol/x/clob/types/mem_clob_keeper.go (1)
105-108
: Approve the change with considerations.The method signature change from
SendOrderbookFillUpdates
toSendOrderbookFillUpdate
simplifies the interface by handling single updates instead of batches. This change is acceptable, but there are some considerations:
- This change may impact the performance and behavior of the system, potentially requiring more frequent method calls for multiple updates.
- Ensure that all implementations and callers of this method are updated accordingly.
To verify the impact and consistency of this change, please run the following script:
Consider adding batch processing capabilities if handling multiple updates efficiently is still a requirement for the system.
protocol/x/clob/keeper/keeper.go (1)
317-327
: LGTM: Method refactored to handle single orderbook fill.The changes to
SendOrderbookFillUpdate
look good. The method has been refactored to handle a single orderbook fill instead of a batch, which aligns with the PR objective of internalizing logic to stage FinalizeBlock events.To ensure this change doesn't break existing functionality, please verify:
- All callers of this method have been updated to pass a single
StreamOrderbookFill
.- The
FullNodeStreamingManager
interface has been updated to accept a single fill.Run the following script to check for any remaining calls to the old method signature:
protocol/testutil/memclob/keeper.go (1)
511-514
: Method signature updated to handle single orderbook fillThe
SendOrderbookFillUpdates
method has been renamed toSendOrderbookFillUpdate
, and its parameter type has changed from a slice oftypes.StreamOrderbookFill
to a singletypes.StreamOrderbookFill
. This change aligns with processing individual orderbook fills rather than batches.To ensure this change is consistent across the codebase, please run the following script:
This script will help identify any places where the old method name or parameter type is still being used, ensuring consistency with the new implementation.
✅ Verification successful
Verification successful: No remaining references found
All references to
SendOrderbookFillUpdates
and[]types.StreamOrderbookFill
have been removed or updated accordingly.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for any remaining references to SendOrderbookFillUpdates and usages of []types.StreamOrderbookFill # Search for any remaining references to SendOrderbookFillUpdates echo "Searching for SendOrderbookFillUpdates references:" rg --type go "SendOrderbookFillUpdates" # Search for any remaining usages of []types.StreamOrderbookFill echo "Searching for []types.StreamOrderbookFill usages:" rg --type go "\[\]types\.StreamOrderbookFill"Length of output: 307
protocol/mocks/MemClobKeeper.go (1)
418-420
: LGTM! Verify usage across the codebase.The change from
SendOrderbookFillUpdates
toSendOrderbookFillUpdate
and the corresponding parameter change from a slice to a singleStreamOrderbookFill
looks good. This shift aligns with the PR objective of internalizing logic for FinalizeBlock events, potentially allowing for more granular control over orderbook fill updates.To ensure this change doesn't break existing functionality, please run the following script to check for any remaining usages of the old method name:
protocol/x/subaccounts/keeper/subaccount.go (2)
Line range hint
1-451
: Ensure comprehensive testing of the subaccount update process.While the change is isolated to the
UpdateSubaccounts
function, it's crucial to verify that this modification doesn't introduce any unintended side effects in the overall subaccount update process.To ensure the change doesn't affect other parts of the system, please run the following tests:
#!/bin/bash # Run all tests related to subaccount updates go test ./... -run "TestSubaccountUpdate|TestUpdateSubaccounts" # Check for any integration or end-to-end tests that involve subaccount updates fd -e go | xargs grep -l "UpdateSubaccounts" | xargs grep -l "func Test"Additionally, consider adding or updating integration tests to specifically cover the new
SendSubaccountUpdate
functionality in various scenarios.
448-451
: LGTM! Verify related changes for consistency.The change from
StageFinalizeBlockSubaccountUpdate
toSendSubaccountUpdate
aligns with the PR objective of internalizing the logic for staging FinalizeBlock events. This modification appears to maintain the existing functionality while potentially improving the directness and efficiency of sending subaccount updates.To ensure consistency across the codebase, please run the following script to check for any other occurrences of
StageFinalizeBlockSubaccountUpdate
that might need similar updates:protocol/x/clob/keeper/process_operations.go (1)
675-679
: LGTM! Consistent with previous changes.The modification here is consistent with the changes in
PersistMatchOrdersToState
, replacingStageFinalizeBlockFill
withSendOrderbookFillUpdate
and adding the same new parameters.This change reinforces the suggestion to consider extracting these common parameters into a separate struct or function to improve code maintainability.
protocol/x/clob/memclob/memclob.go (1)
405-405
: LGTM! Consider verifying theSendOrderbookFillUpdate
method signature.The change from
SendOrderbookFillUpdates
toSendOrderbookFillUpdate
simplifies the method call by passing a singleorderbookMatchFill
object instead of a slice. This is likely to improve performance slightly by avoiding the creation of a temporary slice.To ensure consistency, please run the following command to verify the signature of the
SendOrderbookFillUpdate
method:protocol/streaming/full_node_streaming_manager.go (4)
398-400
: Logic Update Approved: Early return when not in DeliverTx modeThe addition of the check for
!lib.IsDeliverTxMode(ctx)
ensures that subaccount updates are only staged duringDeliverTx
mode, preventing optimistic updates from being streamed inadvertently. This is a sensible safeguard.
618-627
: Logic Update Approved: Stream optimistic updates directly when not in DeliverTx modeThe added logic correctly handles optimistic updates by streaming them directly when not in
DeliverTx
mode, which aligns with the intended behavior.
629-639
:⚠️ Potential issueHandle errors instead of panicking
Similar to an earlier point, the use of
panic(err)
is present in this section when handling errors. Consider modifying the error handling to avoid panics.Apply this diff to handle the error properly:
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, blockHeight uint32, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { // ... v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates) if err != nil { - panic(err) + sm.logger.Error("Failed to get offchain updates V1", "error", err) + return } // ... }Likely invalid or redundant comment.
394-396
: Function Rename: Ensure consistency across the codebaseThe method
StageFinalizeBlockSubaccountUpdate
has been renamed toSendSubaccountUpdate
. Please verify that all references to the old method name have been updated to prevent any runtime errors.You can run the following script to check for any lingering references to
StageFinalizeBlockSubaccountUpdate
:✅ Verification successful
All references to
StageFinalizeBlockSubaccountUpdate
have been successfully updated toSendSubaccountUpdate
across the codebase.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find references to the old method name in the codebase. # Expected result: No occurrences of `StageFinalizeBlockSubaccountUpdate`. rg --type go --fixed-strings 'StageFinalizeBlockSubaccountUpdate'Length of output: 65
Script:
#!/bin/bash # Comprehensive search for any references to 'StageFinalizeBlockSubaccountUpdate' across all file types. # Expected result: No occurrences of 'StageFinalizeBlockSubaccountUpdate' in the codebase. rg --fixed-strings 'StageFinalizeBlockSubaccountUpdate'Length of output: 55
Script:
#!/bin/bash # Case-insensitive search for 'StageFinalizeBlockSubaccountUpdate' across all file types. rg -i --fixed-strings 'StageFinalizeBlockSubaccountUpdate' # Search for dynamic references to 'StageFinalizeBlockSubaccountUpdate', such as within strings or concatenations. rg --regexp 'StageFinalizeBlockSubaccountUpdate|StageFinalizeBlock' # Search within documentation and comments for references to the old method name. rg --type markdown --type go --type txt --type doc --regexp 'StageFinalizeBlockSubaccountUpdate'Length of output: 253
Script:
#!/bin/bash # Case-insensitive search for 'StageFinalizeBlockSubaccountUpdate' across all file types. rg -i --fixed-strings 'StageFinalizeBlockSubaccountUpdate' # Search for dynamic references to 'StageFinalizeBlockSubaccountUpdate', such as within strings or concatenations. rg --regexp 'StageFinalizeBlockSubaccountUpdate|StageFinalizeBlock' # Search within documentation and comments for references to the old method name. rg --type markdown --type go --type txt --regexp 'StageFinalizeBlockSubaccountUpdate'Length of output: 214
@@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent { | |||
oneof event { | |||
StreamOrderbookFill order_fill = 1; | |||
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2; | |||
StreamOrderbookUpdate orderbook_update = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Missing handling for orderbook_update
in event switches
The new StreamOrderbookUpdate orderbook_update = 3;
has been added to the StagedFinalizeBlockEvent
message, but the corresponding handling in switch statements is missing. Please update the following file to handle the new event type:
protocol/streaming/full_node_streaming_manager.go
: Add a case for*clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate
in the switch statement handlingStagedFinalizeBlockEvent.Event
.
🔗 Analysis chain
LGTM! Verify related code updates.
The addition of StreamOrderbookUpdate orderbook_update = 3;
to the StagedFinalizeBlockEvent
message is well-structured and maintains backward compatibility. This change aligns with the PR objective of internalizing logic to stage FinalizeBlock events.
To ensure comprehensive implementation, please verify that all code handling StagedFinalizeBlockEvent
has been updated to account for this new event type. Run the following script to identify potentially affected areas:
Review the output to ensure all relevant code paths have been updated to handle the new orderbook_update
event type.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find usages of StagedFinalizeBlockEvent in the codebase
# Search for StagedFinalizeBlockEvent usage
echo "Searching for StagedFinalizeBlockEvent usage:"
rg --type go -A 5 "StagedFinalizeBlockEvent"
# Search for switch or if statements handling event types
echo "Searching for switch or if statements handling event types:"
rg --type go -A 10 -e "switch.*event" -e "if.*event\s*:?=\s*"
Length of output: 24292
if err != nil { | ||
panic(err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle errors instead of panicking
The use of panic(err)
for error handling may not be appropriate in production code, as it can cause the application to crash. Consider returning the error or handling it gracefully to improve the robustness of the application.
Apply this diff to handle the error properly:
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
ctx sdk.Context,
) {
// ...
v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates)
if err != nil {
- panic(err)
+ sm.logger.Error("Failed to get offchain updates V1", "error", err)
+ return
}
// ...
}
Committable suggestion was skipped due to low confidence.
2e7d9c9
to
c4f1566
Compare
c4f1566
to
26723d6
Compare
26723d6
to
ee90970
Compare
// execution (once optimistically and optionally once on the canonical block), | ||
// we need to stage the events in transient store and later emit them | ||
// during `Precommit`. | ||
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Folded into SendOrderbookFillUpdate
v1updates := make([]ocutypes.OffChainUpdateV1, 0) | ||
for _, message := range offchainUpdates.Messages { | ||
var update ocutypes.OffChainUpdateV1 | ||
err := proto.Unmarshal(message.Message.Value, &update) | ||
if err != nil { | ||
return nil, err | ||
panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upstream always panics anyways; not returning error for simplicity
@@ -501,28 +483,37 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes. | |||
} | |||
|
|||
func getStreamUpdatesFromOffchainUpdates( | |||
offchainUpdates *clobtypes.OffchainUpdates, | |||
v1updates []ocutypes.OffChainUpdateV1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (3)
protocol/streaming/full_node_streaming_manager.go (3)
Line range hint
393-411
: LGTM: ImprovedSendSubaccountUpdate
functionThe changes to this function are well-structured and improve its functionality:
- The function name is now more generic, allowing for broader use.
- The
DeliverTx
mode check ensures that updates are only staged when appropriate.- Early return for non-
DeliverTx
modes prevents unnecessary processing.One minor suggestion:
Consider adding a comment explaining why optimistic subaccount updates are not streamed in non-
DeliverTx
mode. This would improve code readability and maintainability.// If not `DeliverTx`, return since we don't stream optimistic subaccount updates. if !lib.IsDeliverTxMode(ctx) { + // Optimistic updates are not streamed to ensure consistency with the finalized state return }
536-568
: LGTM: ImprovedSendOrderbookUpdates
with context-aware processingThe changes to this function are well-structured and improve its functionality:
- Using
sdk.Context
provides more flexibility and access to additional information.- The split between optimistic and consensus update handling is clear and logical.
- Staging events for consensus updates aligns with the overall design for handling
DeliverTx
mode updates.Suggestion for improvement:
Consider extracting the logic for creating and staging the event into a separate helper function. This would improve readability and potentially allow for reuse in similar scenarios.
+func (sm *FullNodeStreamingManagerImpl) stageOrderbookUpdate(ctx sdk.Context, v1updates []ocutypes.OffChainUpdateV1) { + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, + }, + }, + } + sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent)) +} // In the SendOrderbookUpdates function if lib.IsDeliverTxMode(ctx) { - stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: false, - }, - }, - } - sm.stageFinalizeBlockEvent( - ctx, - sm.cdc.MustMarshal(&stagedEvent), - ) + sm.stageOrderbookUpdate(ctx, v1updates) }
Line range hint
972-1004
: LGTM: UpdatedgetStagedEventsFromFinalizeBlock
to handle orderbook updatesThe changes to this function are well-implemented and consistent with the overall updates:
- The addition of
finalizedOrderbookUpdates
as a return value completes the set of finalized event types.- The handling of
StagedFinalizeBlockEvent_OrderbookUpdate
is correctly implemented.Suggestion for improvement:
Consider using a map to group the different event types instead of separate slices. This could make the function more scalable and easier to maintain if new event types are added in the future.
func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( ctx sdk.Context, ) ( - finalizedFills []clobtypes.StreamOrderbookFill, - finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate, - finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate, + finalizedEvents map[string]interface{}, ) { + finalizedEvents = make(map[string]interface{}) + finalizedEvents["fills"] = []clobtypes.StreamOrderbookFill{} + finalizedEvents["subaccountUpdates"] = []satypes.StreamSubaccountUpdate{} + finalizedEvents["orderbookUpdates"] = []clobtypes.StreamOrderbookUpdate{} // ... existing code ... for _, stagedEvent := range stagedEvents { switch event := stagedEvent.Event.(type) { case *clobtypes.StagedFinalizeBlockEvent_OrderFill: - finalizedFills = append(finalizedFills, *event.OrderFill) + finalizedEvents["fills"] = append(finalizedEvents["fills"].([]clobtypes.StreamOrderbookFill), *event.OrderFill) case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate: - finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate) + finalizedEvents["subaccountUpdates"] = append(finalizedEvents["subaccountUpdates"].([]satypes.StreamSubaccountUpdate), *event.SubaccountUpdate) case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate: - finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate) + finalizedEvents["orderbookUpdates"] = append(finalizedEvents["orderbookUpdates"].([]clobtypes.StreamOrderbookUpdate), *event.OrderbookUpdate) default: panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event)) } } - return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates + return finalizedEvents }This change would make it easier to add new event types in the future without modifying the function signature.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
protocol/x/clob/types/streaming.pb.go
is excluded by!**/*.pb.go
📒 Files selected for processing (13)
- indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts (5 hunks)
- proto/dydxprotocol/clob/streaming.proto (1 hunks)
- protocol/mocks/MemClobKeeper.go (1 hunks)
- protocol/streaming/full_node_streaming_manager.go (14 hunks)
- protocol/streaming/noop_streaming_manager.go (2 hunks)
- protocol/streaming/types/interface.go (1 hunks)
- protocol/streaming/util/util.go (1 hunks)
- protocol/testutil/memclob/keeper.go (1 hunks)
- protocol/x/clob/keeper/keeper.go (2 hunks)
- protocol/x/clob/keeper/process_operations.go (3 hunks)
- protocol/x/clob/memclob/memclob.go (1 hunks)
- protocol/x/clob/types/mem_clob_keeper.go (1 hunks)
- protocol/x/subaccounts/keeper/subaccount.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
- indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts
- proto/dydxprotocol/clob/streaming.proto
- protocol/mocks/MemClobKeeper.go
- protocol/streaming/noop_streaming_manager.go
- protocol/streaming/types/interface.go
- protocol/testutil/memclob/keeper.go
- protocol/x/clob/keeper/keeper.go
- protocol/x/clob/keeper/process_operations.go
- protocol/x/clob/memclob/memclob.go
- protocol/x/clob/types/mem_clob_keeper.go
- protocol/x/subaccounts/keeper/subaccount.go
🔇 Additional comments (4)
protocol/streaming/full_node_streaming_manager.go (4)
23-24
: LGTM: New import added for off-chain update typesThe addition of the
ocutypes
import is consistent with the changes made in the file to handle off-chain updates.
870-885
: LGTM: Improved naming and documentation forcacheStreamUpdatesByClobPairWithLock
The changes to this function enhance its clarity and usability:
- The new name
cacheStreamUpdatesByClobPairWithLock
clearly indicates that the function requires a lock, preventing potential misuse.- The added comments provide valuable context about the function's purpose and the assumption of an acquired lock.
These improvements contribute to better code readability and maintainability.
Line range hint
887-902
: LGTM: New functioncacheStreamUpdatesBySubaccountWithLock
addedThe addition of
cacheStreamUpdatesBySubaccountWithLock
is a well-implemented extension of the caching functionality:
- It follows the same pattern as
cacheStreamUpdatesByClobPairWithLock
, maintaining consistency in the codebase.- The function name clearly indicates its purpose and the requirement for a lock.
- The implementation correctly caches subaccount stream updates and their corresponding subscription IDs.
This new function enhances the streaming manager's capability to handle subaccount-specific updates efficiently.
Line range hint
1-1074
: Overall assessment: Well-implemented improvements to the FullNodeStreamingManagerThe changes made to this file significantly enhance the functionality and efficiency of the FullNodeStreamingManager:
The introduction of context-aware processing in functions like
SendSubaccountUpdate
,SendOrderbookUpdates
, andSendOrderbookFillUpdate
improves flexibility and allows for better handling of different execution modes.The refactoring of update handling, particularly the separation of optimistic and consensus updates, aligns well with the system's architecture and improves clarity.
The addition of new functions like
cacheStreamUpdatesBySubaccountWithLock
and the modifications to existing ones likegetStreamUpdatesFromOffchainUpdates
demonstrate a thoughtful approach to extending the manager's capabilities.The improvements in error handling and the addition of more detailed comments contribute to better code maintainability.
These changes appear to be part of a larger effort to optimize the streaming process and handle various types of updates more efficiently. The consistency in approach across different functions is commendable.
While there are a few minor suggestions for improvements (as noted in individual comments), the overall quality of the changes is high. The refactoring seems to have successfully enhanced the streaming manager's ability to handle different types of updates in a more organized and efficient manner.
v1updates []ocutypes.OffChainUpdateV1, | ||
blockHeight uint32, | ||
execMode sdk.ExecMode, | ||
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { | ||
// Group updates by clob pair id. | ||
updates := make(map[uint32]*clobtypes.OffchainUpdates) | ||
for _, message := range offchainUpdates.Messages { | ||
clobPairId := message.OrderId.ClobPairId | ||
if _, ok := updates[clobPairId]; !ok { | ||
updates[clobPairId] = clobtypes.NewOffchainUpdates() | ||
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) | ||
for _, v1update := range v1updates { | ||
var clobPairId uint32 | ||
switch u := v1update.UpdateMessage.(type) { | ||
case *ocutypes.OffChainUpdateV1_OrderPlace: | ||
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId | ||
case *ocutypes.OffChainUpdateV1_OrderReplace: | ||
clobPairId = u.OrderReplace.OldOrderId.ClobPairId | ||
case *ocutypes.OffChainUpdateV1_OrderRemove: | ||
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId | ||
case *ocutypes.OffChainUpdateV1_OrderUpdate: | ||
clobPairId = u.OrderUpdate.OrderId.ClobPairId | ||
default: | ||
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u)) | ||
} | ||
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) | ||
|
||
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { | ||
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} | ||
} | ||
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
LGTM: Refactored getStreamUpdatesFromOffchainUpdates
for improved efficiency
The changes to this function are well-implemented:
- The function now works directly with
OffChainUpdateV1
types, which is more efficient. - The logic for grouping updates by clob pair id has been updated accordingly.
One suggestion for improvement:
Consider handling the default
case in the switch statement more gracefully. Instead of panicking, you could log an error and continue processing other updates. This would make the function more robust in case of unexpected update types.
default:
- panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
+ sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u))
+ continue
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
v1updates []ocutypes.OffChainUpdateV1, | |
blockHeight uint32, | |
execMode sdk.ExecMode, | |
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { | |
// Group updates by clob pair id. | |
updates := make(map[uint32]*clobtypes.OffchainUpdates) | |
for _, message := range offchainUpdates.Messages { | |
clobPairId := message.OrderId.ClobPairId | |
if _, ok := updates[clobPairId]; !ok { | |
updates[clobPairId] = clobtypes.NewOffchainUpdates() | |
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) | |
for _, v1update := range v1updates { | |
var clobPairId uint32 | |
switch u := v1update.UpdateMessage.(type) { | |
case *ocutypes.OffChainUpdateV1_OrderPlace: | |
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderReplace: | |
clobPairId = u.OrderReplace.OldOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderRemove: | |
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderUpdate: | |
clobPairId = u.OrderUpdate.OrderId.ClobPairId | |
default: | |
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u)) | |
} | |
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) | |
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { | |
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} | |
} | |
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) | |
v1updates []ocutypes.OffChainUpdateV1, | |
blockHeight uint32, | |
execMode sdk.ExecMode, | |
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { | |
// Group updates by clob pair id. | |
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) | |
for _, v1update := range v1updates { | |
var clobPairId uint32 | |
switch u := v1update.UpdateMessage.(type) { | |
case *ocutypes.OffChainUpdateV1_OrderPlace: | |
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderReplace: | |
clobPairId = u.OrderReplace.OldOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderRemove: | |
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderUpdate: | |
clobPairId = u.OrderUpdate.OrderId.ClobPairId | |
default: | |
sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u)) | |
continue | |
} | |
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { | |
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} | |
} | |
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) |
finalizedFills, | ||
finalizedSubaccountUpdates, | ||
finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) | ||
|
||
sm.Lock() | ||
defer sm.Unlock() | ||
|
||
// Flush all pending updates, since we want the onchain updates to arrive in a batch. | ||
sm.FlushStreamUpdatesWithLock() | ||
|
||
orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
orderBookUpdatesToSyncLocalOpsQueue, | ||
// Cache updates to sync local ops queue | ||
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), | ||
uint32(ctx.BlockHeight()), | ||
ctx.ExecMode(), | ||
) | ||
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) | ||
|
||
// Cache updates for finalized fills. | ||
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( | ||
finalizedFills, | ||
uint32(ctx.BlockHeight()), | ||
ctx.ExecMode(), | ||
perpetualIdToClobPairId, | ||
) | ||
sm.cacheStreamUpdatesByClobPairWithLock(fillStreamUpdates, fillClobPairIds) | ||
|
||
// Cache updates for finalized orderbook updates (e.g. RemoveOrderFillAmount in `EndBlocker`). | ||
for _, finalizedUpdate := range finalizedOrderbookUpdates { | ||
streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
finalizedUpdate.Updates, | ||
uint32(ctx.BlockHeight()), | ||
ctx.ExecMode(), | ||
) | ||
sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds) | ||
} | ||
|
||
// Finally, cache updates for finalized subaccount updates | ||
subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( | ||
finalizedSubaccountUpdates, | ||
uint32(ctx.BlockHeight()), | ||
ctx.ExecMode(), | ||
) | ||
|
||
sm.Lock() | ||
defer sm.Unlock() | ||
|
||
// Flush all pending updates, since we want the onchain updates to arrive in a batch. | ||
sm.FlushStreamUpdatesWithLock() | ||
|
||
sm.addBatchUpdatesToCacheWithLock( | ||
orderbookStreamUpdates, | ||
orderbookClobPairIds, | ||
fillStreamUpdates, | ||
fillClobPairIds, | ||
subaccountStreamUpdates, | ||
subaccountIds, | ||
) | ||
sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
LGTM: Enhanced StreamBatchUpdatesAfterFinalizeBlock
with improved update handling
The changes to this function significantly improve its functionality and organization:
- The addition of handling for finalized orderbook updates completes the set of update types processed.
- The use of helper functions (
getStagedEventsFromFinalizeBlock
,getStreamUpdatesFromOffchainUpdates
, etc.) improves code readability and maintainability. - The clear separation between different types of updates (sync local, fills, orderbook, subaccount) enhances the function's structure.
Suggestion for improvement:
Consider adding error handling for the calls to helper functions. While they may not return errors currently, adding error checks would make the function more robust to future changes.
// Cache updates to sync local ops queue
-sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
+sycnLocalUpdates, syncLocalClobPairIds, err := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
+if err != nil {
+ sm.logger.Error("Failed to get stream updates for sync local ops queue", "error", err)
+ // Consider how to handle this error (e.g., skip these updates, return from the function, etc.)
+}
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
Apply similar error handling to other helper function calls in this method.
Committable suggestion was skipped due to low confidence.
// SendOrderbookFillUpdate groups fills by their clob pair ids and | ||
// sends messages to the subscribers. | ||
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( | ||
orderbookFills []clobtypes.StreamOrderbookFill, | ||
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( | ||
orderbookFill clobtypes.StreamOrderbookFill, | ||
blockHeight uint32, | ||
execMode sdk.ExecMode, | ||
ctx sdk.Context, | ||
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, | ||
) { | ||
defer metrics.ModuleMeasureSince( | ||
metrics.FullNodeGrpc, | ||
metrics.GrpcSendOrderbookFillsLatency, | ||
time.Now(), | ||
) | ||
// If not `DeliverTx`, then updates are optimistic. Stream them directly. | ||
if !lib.IsDeliverTxMode(ctx) { | ||
defer metrics.ModuleMeasureSince( | ||
metrics.FullNodeGrpc, | ||
metrics.GrpcSendOrderbookFillsLatency, | ||
time.Now(), | ||
) | ||
|
||
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( | ||
orderbookFills, | ||
blockHeight, | ||
execMode, | ||
perpetualIdToClobPairId, | ||
) | ||
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( | ||
[]clobtypes.StreamOrderbookFill{orderbookFill}, | ||
blockHeight, | ||
ctx.ExecMode(), | ||
perpetualIdToClobPairId, | ||
) | ||
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) | ||
return | ||
} | ||
|
||
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. | ||
stagedEvent := clobtypes.StagedFinalizeBlockEvent{ | ||
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ | ||
OrderFill: &orderbookFill, | ||
}, | ||
} | ||
|
||
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) | ||
sm.stageFinalizeBlockEvent( | ||
ctx, | ||
sm.cdc.MustMarshal(&stagedEvent), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
LGTM: Refactored SendOrderbookFillUpdate
for single-fill processing
The changes to this function align well with the improvements made to SendOrderbookUpdates
:
- Processing a single fill at a time may lead to more efficient and granular updates.
- The split between optimistic and consensus update handling is consistent with the overall design.
- Using
sdk.Context
provides more flexibility and consistency with other functions.
Suggestion for improvement:
For consistency with the SendOrderbookUpdates
function, consider extracting the logic for creating and staging the event into a separate helper function.
+func (sm *FullNodeStreamingManagerImpl) stageOrderbookFillUpdate(ctx sdk.Context, orderbookFill clobtypes.StreamOrderbookFill) {
+ stagedEvent := clobtypes.StagedFinalizeBlockEvent{
+ Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
+ OrderFill: &orderbookFill,
+ },
+ }
+ sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent))
+}
// In the SendOrderbookFillUpdate function
if lib.IsDeliverTxMode(ctx) {
- stagedEvent := clobtypes.StagedFinalizeBlockEvent{
- Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
- OrderFill: &orderbookFill,
- },
- }
-
- sm.stageFinalizeBlockEvent(
- ctx,
- sm.cdc.MustMarshal(&stagedEvent),
- )
+ sm.stageOrderbookFillUpdate(ctx, orderbookFill)
}
This change would improve code consistency and readability across similar functions.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// SendOrderbookFillUpdate groups fills by their clob pair ids and | |
// sends messages to the subscribers. | |
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( | |
orderbookFills []clobtypes.StreamOrderbookFill, | |
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( | |
orderbookFill clobtypes.StreamOrderbookFill, | |
blockHeight uint32, | |
execMode sdk.ExecMode, | |
ctx sdk.Context, | |
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, | |
) { | |
defer metrics.ModuleMeasureSince( | |
metrics.FullNodeGrpc, | |
metrics.GrpcSendOrderbookFillsLatency, | |
time.Now(), | |
) | |
// If not `DeliverTx`, then updates are optimistic. Stream them directly. | |
if !lib.IsDeliverTxMode(ctx) { | |
defer metrics.ModuleMeasureSince( | |
metrics.FullNodeGrpc, | |
metrics.GrpcSendOrderbookFillsLatency, | |
time.Now(), | |
) | |
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( | |
orderbookFills, | |
blockHeight, | |
execMode, | |
perpetualIdToClobPairId, | |
) | |
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( | |
[]clobtypes.StreamOrderbookFill{orderbookFill}, | |
blockHeight, | |
ctx.ExecMode(), | |
perpetualIdToClobPairId, | |
) | |
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) | |
return | |
} | |
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. | |
stagedEvent := clobtypes.StagedFinalizeBlockEvent{ | |
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ | |
OrderFill: &orderbookFill, | |
}, | |
} | |
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) | |
sm.stageFinalizeBlockEvent( | |
ctx, | |
sm.cdc.MustMarshal(&stagedEvent), | |
) | |
// SendOrderbookFillUpdate groups fills by their clob pair ids and | |
// sends messages to the subscribers. | |
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( | |
orderbookFill clobtypes.StreamOrderbookFill, | |
blockHeight uint32, | |
ctx sdk.Context, | |
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, | |
) { | |
// If not `DeliverTx`, then updates are optimistic. Stream them directly. | |
if !lib.IsDeliverTxMode(ctx) { | |
defer metrics.ModuleMeasureSince( | |
metrics.FullNodeGrpc, | |
metrics.GrpcSendOrderbookFillsLatency, | |
time.Now(), | |
) | |
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( | |
[]clobtypes.StreamOrderbookFill{orderbookFill}, | |
blockHeight, | |
ctx.ExecMode(), | |
perpetualIdToClobPairId, | |
) | |
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) | |
return | |
} | |
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block. | |
sm.stageOrderbookFillUpdate(ctx, orderbookFill) | |
} | |
func (sm *FullNodeStreamingManagerImpl) stageOrderbookFillUpdate(ctx sdk.Context, orderbookFill clobtypes.StreamOrderbookFill) { | |
stagedEvent := clobtypes.StagedFinalizeBlockEvent{ | |
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ | |
OrderFill: &orderbookFill, | |
}, | |
} | |
sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent)) | |
} |
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 { | ||
v1updates := make([]ocutypes.OffChainUpdateV1, 0) | ||
for _, message := range offchainUpdates.Messages { | ||
var update ocutypes.OffChainUpdateV1 | ||
err := proto.Unmarshal(message.Message.Value, &update) | ||
if err != nil { | ||
return nil, err | ||
panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Handle Errors Returned by GetOffchainUpdatesV1
in All Callers
The verification indicates that callers of GetOffchainUpdatesV1
in protocol/streaming/full_node_streaming_manager.go
do not handle the returned error. Please update these locations to appropriately handle errors to ensure application robustness.
protocol/streaming/full_node_streaming_manager.go
🔗 Analysis chain
Avoid panicking in utility functions; return errors instead
Panicking within GetOffchainUpdatesV1
on unmarshalling failure can cause the entire application to crash unexpectedly. It's better practice to return an error and allow the caller to handle it appropriately, enhancing robustness and flexibility.
Apply this diff to restore error handling:
-func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 {
+func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, message := range offchainUpdates.Messages {
var update ocutypes.OffChainUpdateV1
err := proto.Unmarshal(message.Message.Value, &update)
if err != nil {
- panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err))
+ return nil, fmt.Errorf("failed to unmarshal OffChainUpdateV1: %v", err)
}
v1updates = append(v1updates, update)
}
- return v1updates
+ return v1updates, nil
}
Run the following script to verify that all callers of GetOffchainUpdatesV1
handle the updated error return:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all usages of GetOffchainUpdatesV1 and check for error handling.
# Search for function calls to GetOffchainUpdatesV1
rg --type go 'GetOffchainUpdatesV1\(' -A 2
Length of output: 1173
if err != nil { | ||
panic(err) | ||
} | ||
for clobPairId, v1updates := range clobPairIdToV1Updates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we iterate over maps deterministically here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
blockHeight uint32, | ||
execMode sdk.ExecMode, | ||
ctx sdk.Context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might as well read block height from ctx? or are you worried about the +1 problem for check tx?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this function and couple of others to only take in ctx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
protocol/streaming/full_node_streaming_manager.go (3)
Line range hint
392-412
: LGTM: Improved SendSubaccountUpdate functionThe changes to this function enhance its consistency and efficiency:
- Renaming to
SendSubaccountUpdate
aligns with other function names.- Using
sdk.Context
as a parameter simplifies the function signature.- Early return for non-DeliverTx mode prevents unnecessary processing.
- Staging events only in DeliverTx mode ensures only finalized updates are staged.
These changes improve the overall design and efficiency of the function.
Consider updating the comment to be more precise:
-// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. +// In `DeliverTx` mode, stage the update to be streamed after consensus finalizes on a block.
621-656
: LGTM: Improved SendOrderbookFillUpdate function with suggestionThe changes to this function enhance its functionality and consistency:
- Renaming to
SendOrderbookFillUpdate
and handling a single fill makes the function more focused.- Using
sdk.Context
as a parameter simplifies the function signature and provides more context.- The differentiation between DeliverTx and non-DeliverTx modes allows for appropriate handling of updates:
- In non-DeliverTx mode, updates are processed immediately.
- In DeliverTx mode, updates are staged for later processing.
These changes align well with the modifications made to the
SendOrderbookUpdates
function, improving overall consistency.For even better consistency with
SendOrderbookUpdates
, consider extracting the staging logic into a separate method:+func (sm *FullNodeStreamingManagerImpl) stageOrderbookFillUpdate(ctx sdk.Context, orderbookFill clobtypes.StreamOrderbookFill) { + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ + OrderFill: &orderbookFill, + }, + } + sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent)) +} // In the SendOrderbookFillUpdate function if lib.IsDeliverTxMode(ctx) { - stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ - OrderFill: &orderbookFill, - }, - } - - sm.stageFinalizeBlockEvent( - ctx, - sm.cdc.MustMarshal(&stagedEvent), - ) + sm.stageOrderbookFillUpdate(ctx, orderbookFill) }This change would improve code consistency and readability across similar functions.
Line range hint
926-1015
: LGTM: Comprehensive improvements to StreamBatchUpdatesAfterFinalizeBlockThe changes to this function significantly enhance its functionality and organization:
- Addition of handling for finalized orderbook updates completes the set of update types processed.
- Use of helper functions (
getStagedEventsFromFinalizeBlock
,getStreamUpdatesFromOffchainUpdates
, etc.) improves code readability and maintainability.- Clear separation between different types of updates (sync local, fills, orderbook, subaccount) enhances the function's structure.
- The function now provides a comprehensive batch update after block finalization, ensuring all relevant updates are processed and cached.
These improvements contribute to a more robust and efficient streaming process after block finalization.
Consider adding error handling for the calls to helper functions. While they may not return errors currently, adding error checks would make the function more robust to future changes:
// Cache updates to sync local ops queue -sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( +sycnLocalUpdates, syncLocalClobPairIds, err := getStreamUpdatesFromOffchainUpdates( streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) +if err != nil { + sm.logger.Error("Failed to get stream updates for sync local ops queue", "error", err) + // Consider how to handle this error (e.g., skip these updates, return from the function, etc.) +} sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)Apply similar error handling to other helper function calls in this method.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
- protocol/streaming/full_node_streaming_manager.go (14 hunks)
- protocol/streaming/noop_streaming_manager.go (2 hunks)
- protocol/streaming/types/interface.go (1 hunks)
- protocol/x/clob/keeper/keeper.go (2 hunks)
- protocol/x/clob/keeper/process_operations.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- protocol/streaming/noop_streaming_manager.go
- protocol/streaming/types/interface.go
- protocol/x/clob/keeper/keeper.go
- protocol/x/clob/keeper/process_operations.go
🔇 Additional comments (3)
protocol/streaming/full_node_streaming_manager.go (3)
23-24
: LGTM: Import added for off-chain update typesThe addition of the import for
ocutypes
is appropriate given the changes in the function signatures and implementations that now use off-chain update types directly.
547-581
: LGTM: Enhanced SendOrderbookUpdates functionThe changes to this function significantly improve its functionality and flexibility:
- Using
sdk.Context
as a parameter simplifies the function signature and provides more context.- The differentiation between DeliverTx and non-DeliverTx modes allows for appropriate handling of updates:
- In non-DeliverTx mode, updates are streamed directly, allowing for immediate processing of optimistic updates.
- In DeliverTx mode, updates are staged to be streamed later, ensuring that finalized updates are processed after consensus.
- The use of
lib.MustConvertIntegerToUint32(ctx.BlockHeight())
andctx.ExecMode()
ensures consistent handling of block height and execution mode.These changes enhance the overall design and efficiency of the function, making it more robust and adaptable to different execution contexts.
Line range hint
881-914
: LGTM: Well-implemented caching functions for stream updatesThe addition of
cacheStreamUpdatesByClobPairWithLock
andcacheStreamUpdatesBySubaccountWithLock
functions improves the organization and efficiency of update caching:
- Clear separation of concerns: Each function handles a specific type of update (clob pairs or subaccounts).
- Efficient implementation: The functions directly append updates to the cache and manage the corresponding mappings.
- Clear documentation: The comments explicitly state the assumption about lock acquisition, preventing potential misuse.
These functions contribute to a more modular and maintainable codebase.
v1updates []ocutypes.OffChainUpdateV1, | ||
blockHeight uint32, | ||
execMode sdk.ExecMode, | ||
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { | ||
// Group updates by clob pair id. | ||
updates := make(map[uint32]*clobtypes.OffchainUpdates) | ||
for _, message := range offchainUpdates.Messages { | ||
clobPairId := message.OrderId.ClobPairId | ||
if _, ok := updates[clobPairId]; !ok { | ||
updates[clobPairId] = clobtypes.NewOffchainUpdates() | ||
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) | ||
// unique list of clob pair Ids to send updates for. | ||
clobPairIds = make([]uint32, 0) | ||
for _, v1update := range v1updates { | ||
var clobPairId uint32 | ||
switch u := v1update.UpdateMessage.(type) { | ||
case *ocutypes.OffChainUpdateV1_OrderPlace: | ||
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId | ||
case *ocutypes.OffChainUpdateV1_OrderReplace: | ||
clobPairId = u.OrderReplace.OldOrderId.ClobPairId | ||
case *ocutypes.OffChainUpdateV1_OrderRemove: | ||
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId | ||
case *ocutypes.OffChainUpdateV1_OrderUpdate: | ||
clobPairId = u.OrderUpdate.OrderId.ClobPairId | ||
default: | ||
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u)) | ||
} | ||
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) | ||
|
||
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { | ||
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} | ||
clobPairIds = append(clobPairIds, clobPairId) | ||
} | ||
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) | ||
} | ||
|
||
// Unmarshal each per-clob pair message to v1 updates. | ||
streamUpdates = make([]clobtypes.StreamUpdate, 0) | ||
clobPairIds = make([]uint32, 0) | ||
for clobPairId, update := range updates { | ||
v1updates, err := streaming_util.GetOffchainUpdatesV1(update) | ||
if err != nil { | ||
panic(err) | ||
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) | ||
|
||
for _, clobPairId := range clobPairIds { | ||
v1updates, exists := clobPairIdToV1Updates[clobPairId] | ||
if !exists { | ||
panic(fmt.Sprintf( | ||
"clob pair id %v not found in clobPairIdToV1Updates: %v", | ||
clobPairId, | ||
clobPairIdToV1Updates, | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM: Improved getStreamUpdatesFromOffchainUpdates function with suggestion
The changes to this function significantly improve its functionality and efficiency:
- Using
[]ocutypes.OffChainUpdateV1
allows for more direct and efficient processing of updates. - Grouping updates by clob pair id and creating a unique list prevents duplicate processing.
- The switch statement ensures all update types are handled correctly.
These changes enhance the overall performance and robustness of the function.
However, the use of panic
for unhandled update types might be too aggressive for production code. Consider logging an error and continuing processing instead:
default:
- panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
+ sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u))
+ continue
This change would make the function more resilient to unexpected update types without crashing the application.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
v1updates []ocutypes.OffChainUpdateV1, | |
blockHeight uint32, | |
execMode sdk.ExecMode, | |
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { | |
// Group updates by clob pair id. | |
updates := make(map[uint32]*clobtypes.OffchainUpdates) | |
for _, message := range offchainUpdates.Messages { | |
clobPairId := message.OrderId.ClobPairId | |
if _, ok := updates[clobPairId]; !ok { | |
updates[clobPairId] = clobtypes.NewOffchainUpdates() | |
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) | |
// unique list of clob pair Ids to send updates for. | |
clobPairIds = make([]uint32, 0) | |
for _, v1update := range v1updates { | |
var clobPairId uint32 | |
switch u := v1update.UpdateMessage.(type) { | |
case *ocutypes.OffChainUpdateV1_OrderPlace: | |
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderReplace: | |
clobPairId = u.OrderReplace.OldOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderRemove: | |
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderUpdate: | |
clobPairId = u.OrderUpdate.OrderId.ClobPairId | |
default: | |
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u)) | |
} | |
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) | |
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { | |
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} | |
clobPairIds = append(clobPairIds, clobPairId) | |
} | |
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) | |
} | |
// Unmarshal each per-clob pair message to v1 updates. | |
streamUpdates = make([]clobtypes.StreamUpdate, 0) | |
clobPairIds = make([]uint32, 0) | |
for clobPairId, update := range updates { | |
v1updates, err := streaming_util.GetOffchainUpdatesV1(update) | |
if err != nil { | |
panic(err) | |
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) | |
for _, clobPairId := range clobPairIds { | |
v1updates, exists := clobPairIdToV1Updates[clobPairId] | |
if !exists { | |
panic(fmt.Sprintf( | |
"clob pair id %v not found in clobPairIdToV1Updates: %v", | |
clobPairId, | |
clobPairIdToV1Updates, | |
)) | |
v1updates []ocutypes.OffChainUpdateV1, | |
blockHeight uint32, | |
execMode sdk.ExecMode, | |
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { | |
// Group updates by clob pair id. | |
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) | |
// unique list of clob pair Ids to send updates for. | |
clobPairIds = make([]uint32, 0) | |
for _, v1update := range v1updates { | |
var clobPairId uint32 | |
switch u := v1update.UpdateMessage.(type) { | |
case *ocutypes.OffChainUpdateV1_OrderPlace: | |
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderReplace: | |
clobPairId = u.OrderReplace.OldOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderRemove: | |
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId | |
case *ocutypes.OffChainUpdateV1_OrderUpdate: | |
clobPairId = u.OrderUpdate.OrderId.ClobPairId | |
default: | |
sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u)) | |
continue | |
} | |
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { | |
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} | |
clobPairIds = append(clobPairIds, clobPairId) | |
} | |
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) | |
} | |
// Unmarshal each per-clob pair message to v1 updates. | |
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) | |
for _, clobPairId := range clobPairIds { | |
v1updates, exists := clobPairIdToV1Updates[clobPairId] | |
if !exists { | |
panic(fmt.Sprintf( | |
"clob pair id %v not found in clobPairIdToV1Updates: %v", | |
clobPairId, | |
clobPairIdToV1Updates, | |
)) |
@Mergifyio backport release/protocol/v7.x |
✅ Backports have been created
|
(cherry picked from commit d583dbc)
) (#2404) Co-authored-by: Teddy Ding <[email protected]>
Changelist
Context
The FNS manager interface now only includes
SendXXXXUpdate
, and decides whether to stage or send directly based on the input context.Test Plan
Testing on feature branch
feat/full-node-streaming
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
New Features
Changes
These enhancements improve the efficiency and clarity of order book and subaccount update processes.