-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1004]🚀optimize and improve consume logic⚡️ #1008
Conversation
WalkthroughThe changes in this pull request introduce significant modifications to the RocketMQ client implementation, particularly focusing on enhancing the locking and unlocking mechanisms for message queues. New asynchronous methods are added to various structs and traits, improving the handling of message consumption and queue management. Additionally, new request and response structures are introduced to facilitate batch operations for locking and unlocking message queues. Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (7)
🚧 Files skipped from review as they are similar to previous changes (5)
🔇 Additional comments (2)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1008 +/- ##
==========================================
- Coverage 19.95% 19.92% -0.04%
==========================================
Files 413 418 +5
Lines 34440 34502 +62
==========================================
+ Hits 6874 6876 +2
- Misses 27566 27626 +60 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 22
🧹 Outside diff range and nitpick comments (18)
rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs (1)
23-27
: Consider adding documentation and reviewing trait derivations.The
LockBatchResponseBody
struct looks good overall, but consider the following suggestions:
- Add documentation comments (///) for the struct and its field to improve API clarity.
- The
#[serde(rename = "lockOKMQSet")]
attribute uses camelCase, which is atypical for Rust. If this is for compatibility with an external JSON format, consider adding a comment explaining this choice.- Review if the
Default
trait derivation is necessary. If there's no clear default state for this response body, it might be better to omit it to avoid potential misuse.Example improvement:
/// Represents the response body for a batch locking operation in RocketMQ. #[derive(Serialize, Deserialize, Debug)] pub struct LockBatchResponseBody { /// Set of message queues that were successfully locked. #[serde(rename = "lockOKMQSet")] // Note: camelCase used for compatibility with external JSON format pub lock_ok_mq_set: HashSet<MessageQueue>, }rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1)
24-31
: LGTM! Consider adding documentation.The
LockBatchRequestBody
struct is well-designed with appropriate field types and derive macros. The use ofOption<String>
forconsumer_group
andclient_id
, andHashSet<MessageQueue>
formq_set
are good choices.Consider adding documentation comments to the struct and its fields to improve code readability and maintainability. For example:
/// Represents a batch request for locking message queues. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct LockBatchRequestBody { /// The consumer group making the lock request. pub consumer_group: Option<String>, /// The ID of the client making the lock request. pub client_id: Option<String>, /// Whether to restrict the operation to the current broker only. pub only_this_broker: bool, /// The set of message queues to be locked. pub mq_set: HashSet<MessageQueue>, }rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs (2)
24-31
: LGTM: Struct definition is well-structured. Consider adding documentation.The
UnlockBatchRequestBody
struct is correctly defined with appropriate field types and derive macros. The use ofOption<String>
for optional fields andHashSet<MessageQueue>
for unique message queues is a good design choice.Consider adding documentation comments (///) for the struct and its fields to improve code readability and maintainability. For example:
/// Represents a request body for unlocking a batch of messages. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct UnlockBatchRequestBody { /// The consumer group associated with the unlock request. pub consumer_group: Option<String>, /// The client ID associated with the unlock request. pub client_id: Option<String>, /// Indicates whether the operation should be restricted to the current broker. pub only_this_broker: bool, /// The set of message queues to be unlocked. pub mq_set: HashSet<MessageQueue>, }
33-45
: LGTM: Display trait implementation is correct. Minor style improvement suggested.The Display trait implementation for UnlockBatchRequestBody is well-structured and correctly handles optional fields. It provides a comprehensive string representation of the struct.
Consider using the
format!
macro instead ofwrite!
for better readability:impl Display for UnlockBatchRequestBody { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", format!( "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, mq_set={:?}]", self.consumer_group.as_deref().unwrap_or(""), self.client_id.as_deref().unwrap_or(""), self.only_this_broker, self.mq_set )) } }This change also uses
as_deref()
instead ofas_ref()
for a more idiomatic approach to handlingOption<String>
.rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs (2)
26-31
: LGTM! Consider adding documentation.The
LockBatchMqRequestHeader
struct is well-defined with appropriate derive macros and serde attributes. The use ofOption<RpcRequestHeader>
provides flexibility.Consider adding documentation comments to describe the purpose of this struct and its field. This would improve code readability and maintainability. For example:
/// Represents the header for a batch lock request in the RocketMQ protocol. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct LockBatchMqRequestHeader { /// The RPC request header, flattened into the JSON structure. #[serde(flatten)] pub rpc_request_header: Option<RpcRequestHeader>, }
33-43
: LGTM! Consider handling empty map case.The implementation of
CommandCustomHeader
forLockBatchMqRequestHeader
is correct and efficiently handles the optionalrpc_request_header
.Consider returning
None
if the resulting map is empty. This could provide more meaningful information to the caller. Here's a suggested implementation:impl CommandCustomHeader for LockBatchMqRequestHeader { fn to_map(&self) -> Option<HashMap<String, String>> { self.rpc_request_header .as_ref() .and_then(|header| header.to_map()) .filter(|map| !map.is_empty()) } }This implementation is more concise and returns
None
ifrpc_request_header
isNone
or if itsto_map()
method returns an empty map.rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs (1)
26-31
: LGTM: Struct definition is correct. Consider adding documentation.The
UnlockBatchMqRequestHeader
struct is well-defined with appropriate serde attributes. The use ofOption<RpcRequestHeader>
is a good choice for handling cases where the header might not be present.Consider adding documentation comments to explain the purpose of this struct and its field. For example:
/// Represents a request header for unlocking a batch of message queues. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct UnlockBatchMqRequestHeader { /// The RPC request header, if present. #[serde(flatten)] pub rpc_request_header: Option<RpcRequestHeader>, }rocketmq-client/src/consumer/consumer_impl/re_balance.rs (3)
Line range hint
47-51
: Possible Redundancy in Function NameThe method
remove_unnecessary_pop_message_queue_pop
has "pop" repeated twice in its name, which may be redundant or a typo. Consider renaming it toremove_unnecessary_pop_message_queue
orremove_unnecessary_pop_process_queue
for clarity.
61-61
: Inconsistent Return Types betweencompute_pull_from_where
MethodsThe method
compute_pull_from_where_with_exception
returns aResult<i64>
, whereascompute_pull_from_where
returns ani64
directly. For consistency and better error handling, consider havingcompute_pull_from_where
also return aResult<i64>
.
65-67
: Inconsistent Asynchronicity Between Dispatch MethodsThe method
dispatch_pull_request
is asynchronous (async fn
), whiledispatch_pop_pull_request
is synchronous (fn
). If both methods perform operations that could be asynchronous, consider makingdispatch_pop_pull_request
asynchronous for consistency and maintainability.rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1)
470-476
: Use appropriate logging level for successful operationsIn the success case of the
unlock
method, you are usingwarn!
to log the message indicating a successful unlock:warn!( "unlock messageQueue. group:{}, clientId:{}, mq:{}", self.rebalance_impl_inner.consumer_group.as_ref().unwrap(), client.client_id, mq )Logging successful operations at the warning level can be misleading. Consider using
info!
instead to indicate normal operation.Apply this diff to change the logging level:
} else { - warn!( + info!( "unlock messageQueue. group:{}, clientId:{}, mq:{}", self.rebalance_impl_inner.consumer_group.as_ref().unwrap(), client.client_id, mq ) }rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (7)
20-20
: ImportDerefMut
may not be necessaryThe import
use std::ops::DerefMut;
is added, but it's only used once withderef_mut()
in the code. Consider removing the explicit call toderef_mut()
and use pattern matching or adjust the code to avoid needing this import if possible.
Line range hint
242-253
: Check for proper lock acquisition inupdate_process_queue_table_in_rebalance
At line 242, the code attempts to lock message queues if
is_order
is true. Thelock
method is now asynchronous and modifiesprocess_queue_table
. Ensure that the mutable borrow ofprocess_queue_table
does not conflict with other borrows in the loop.Consider restructuring the code to prevent mutable and immutable borrows from overlapping. Alternatively, move the lock acquisition outside the loop or collect the queues to lock and process them after releasing the write lock.
469-522
: New methodlock_all
lacks error handling for broker unavailabilityIn the
lock_all
method, iffind_broker_result
isNone
, the code silently continues without logging or handling the error. This might make debugging broker connectivity issues difficult.Consider adding a warning log when
find_broker_result
isNone
:if let Some(find_broker_result) = find_broker_result { // existing code } else { warn!( "Could not find broker address for broker: {}", broker_name ); }
445-463
: Improve error handling inlock
methodWhen handling the result of
lock_batch_mq
, the error case at lines 459-462 only logs the error but does not provide detailed context.Enhance the error logging to include more context:
error!( "Failed to lock batch MQ for {:?} at broker {}: {}", self.consumer_group, find_broker_result.broker_addr, e );
185-200
: Incorrect logging level and message formattingAt lines 194-200, an
error!
macro is used for logging. The message seems to indicate an attempt to fix an issue rather than an actual error.Consider changing the logging level to
warn!
and improving the message clarity.-error!( +warn!( "doRebalance, {:?}, removing unnecessary mq: {}, because pull is paused.", self.consumer_group, mq.get_topic() );
Line range hint
242-253
: Avoid holding write lock during asynchronous operationsAt line 239, a write lock on
process_queue_table
is acquired. Inside the loop starting at line 241, asynchronous operations are performed, including calls toself.lock
. Holding the write lock across awaits can lead to contention and potential deadlocks.Refactor the code to minimize the time the write lock is held. Collect necessary data, release the lock, and then perform asynchronous operations.
469-522
: Lack of timeout handling forlock_batch_mq
requestsThe
lock_batch_mq
method is called with a timeout of1_000
milliseconds. If the broker does not respond in time, the method will fail silently for that broker.Consider implementing retries or exponential backoff when a timeout occurs to improve reliability.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (14)
- rocketmq-client/src/consumer/consumer_impl/process_queue.rs (5 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance.rs (2 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (5 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (4 hunks)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (3 hunks)
- rocketmq-remoting/src/protocol/body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/request.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/response.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (2 hunks)
- rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs (1 hunks)
🔇 Additional comments (24)
rocketmq-remoting/src/protocol/body/request.rs (1)
17-17
: LGTM! New module added for batch locking requests.The addition of the
lock_batch_request_body
module aligns with the PR objectives of optimizing and improving consume logic. This new module likely encapsulates functionality for handling batch lock requests, which could potentially improve efficiency in message queue management.To ensure the proper implementation of this new module, please run the following verification script:
This script will help verify:
- The existence of the implementation file
- The basic structure of the
LockBatchRequestBody
- Any usage of the new module in other parts of the codebase
Please review the results to ensure the new module is properly implemented and integrated.
✅ Verification successful
It seems there was an error in the verification script when checking for the implementation file. Let's correct the
fd
command and rerun the tests to ensure everything is in order.
Verification Successful 🚀
The
lock_batch_request_body
module is properly implemented and integrated into the codebase. TheLockBatchRequestBody
struct is correctly defined, and the module is actively used in the following locations:
rocketmq-client/src/implementation/mq_client_api_impl.rs
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
All tests passed, and no issues were found.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation of the new lock_batch_request_body module # Test 1: Check if the implementation file exists echo "Checking for the implementation file..." fd --type f --full-path "*/lock_batch_request_body.rs" # Test 2: Verify the contents of the implementation file echo "Checking the contents of the implementation file..." rg --type rust "pub struct LockBatchRequestBody" -A 10 # Test 3: Check for any usage of the new module echo "Checking for usage of the new module..." rg --type rust "use .*lock_batch_request_body::LockBatchRequestBody"Length of output: 2210
Script:
#!/bin/bash # Description: Corrected verification of the new lock_batch_request_body module # Test 1: Check if the implementation file exists echo "Checking for the implementation file..." fd --glob '*lock_batch_request_body.rs' # Test 2: Verify the contents of the implementation file echo "Checking the contents of the implementation file..." rg --type rust "pub struct LockBatchRequestBody" -A 10 # Test 3: Check for any usage of the new module echo "Checking for usage of the new module..." rg --type rust "use .*lock_batch_request_body::LockBatchRequestBody"Length of output: 2023
rocketmq-remoting/src/protocol/body/response.rs (2)
1-17
: LGTM: License header is correctly included.The Apache License 2.0 header is properly included at the beginning of the file, which is essential for maintaining compliance with open-source licensing requirements.
17-17
: Verify the implementation of the new module.The addition of the
lock_batch_response_body
module is a good step towards expanding the functionality for batch locking operations. However, it's important to ensure that the implementation of this module is complete and consistent with the project's standards.To verify the implementation, please run the following script:
This script will help verify:
- The existence of the module file.
- The presence of expected structures or types within the module.
- The existence of related unit tests.
Please review the output of this script to ensure the module is properly implemented and tested.
rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs (2)
1-16
: LGTM: Appropriate license header included.The file includes the correct Apache License, Version 2.0 header, which is essential for open-source projects and consistent with the RocketMQ project's licensing.
17-21
: LGTM: Appropriate imports.The imports are concise and relevant to the struct being defined:
HashSet
from the standard library for the main field type.MessageQueue
from a custom module, likely representing the elements in the set.Serialize
andDeserialize
traits from serde for JSON serialization/deserialization.These imports provide all necessary types and traits for the struct's functionality.
rocketmq-remoting/src/protocol/body.rs (1)
33-34
: Approved: New modules align with PR objectives. Please provide more context.The addition of
request
,response
, andunlock_batch_request_body
modules aligns with the PR objectives to optimize and improve consume logic. These new modules likely introduce structures for enhanced request-response handling and batch unlocking operations, which could significantly improve the efficiency of message queue management.To better understand the impact of these changes, could you please provide more information about the contents and purpose of these new modules? This will help ensure they integrate well with the existing codebase.
To verify the integration and usage of these new modules, please run the following script:
This script will help us understand how these new modules are being used throughout the codebase, ensuring they're properly integrated and not introducing any unintended side effects.
Also applies to: 37-37
✅ Verification successful
Verified: New modules are properly integrated and utilized within the codebase.
The
request
,response
, andunlock_batch_request_body
modules are being used appropriately in the following locations:
rocketmq-client/src/implementation/mq_client_api_impl.rs
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs
No issues were found regarding the integration of these modules.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for usage of new modules in the codebase echo "Checking usage of new request module:" rg --type rust -g '!src/protocol/body.rs' 'use .*protocol::body::request' echo "Checking usage of new response module:" rg --type rust -g '!src/protocol/body.rs' 'use .*protocol::body::response' echo "Checking usage of new unlock_batch_request_body module:" rg --type rust -g '!src/protocol/body.rs' 'use .*protocol::body::unlock_batch_request_body' echo "Checking for any direct references to the new modules:" rg --type rust -g '!src/protocol/body.rs' 'body::(request|response|unlock_batch_request_body)'Length of output: 2331
rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1)
1-45
: LGTM! Well-structured and follows best practices.The overall structure of the file is good, and it adheres to Rust best practices:
- Appropriate license header is included.
- Imports are well-organized and grouped.
- The code follows Rust naming conventions.
- The use of external crates and types is appropriate.
- The visibility of the struct and its implementation is correctly set to public.
rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs (2)
1-16
: LGTM: License header is correct and properly formatted.The Apache License 2.0 header is present and correctly formatted at the beginning of the file.
17-22
: LGTM: Imports are correct and well-organized.The necessary imports from std, rocketmq_common, and serde are present and logically grouped. There are no unused imports.
rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs (2)
1-24
: LGTM: License and imports are correctly implemented.The file includes the appropriate Apache 2.0 license header, and the import statements are well-organized and relevant to the functionality implemented in this file.
1-52
: Overall assessment: Good implementation with room for minor improvements.The
UnlockBatchMqRequestHeader
struct and its trait implementations provide a solid foundation for handling batch unlock requests in RocketMQ. The code is generally well-structured and follows Rust conventions.To further enhance the quality of this implementation, consider:
- Adding documentation comments to explain the purpose and usage of the struct and its methods.
- Optimizing the
to_map
method in theCommandCustomHeader
implementation for better efficiency.- Improving error handling in the
FromMap
implementation to handle cases whereRpcRequestHeader
creation fails.These improvements will make the code more robust, maintainable, and easier to use for other developers working on the project.
rocketmq-remoting/src/protocol/header.rs (3)
36-36
: Request for additional context on batch locking/unlocking improvements.The additions of
lock_batch_mq_request_header
andunlock_batch_mq_request_header
modules suggest improvements in batch operations for message queue locking. However, the PR description lacks details about the specific enhancements these new modules bring.Could you please provide more information on:
- The motivation behind introducing batch locking/unlocking.
- The expected performance improvements or other benefits.
- Any potential impacts on existing consume logic.
This context will help in better understanding the optimization efforts and their alignment with the project's goals.
Also applies to: 50-50
36-36
: LGTM: New module for batch locking of message queues.The addition of the
lock_batch_mq_request_header
module aligns with the PR objective of optimizing and improving consume logic. Its placement in the file follows the alphabetical order convention.To ensure the module is properly implemented, please run the following script:
#!/bin/bash # Description: Verify the implementation of the lock_batch_mq_request_header module # Test: Check if the module file exists and contains expected content rg --type rust -g 'lock_batch_mq_request_header.rs' -C 5 'struct LockBatchMqRequestHeader'
50-50
: LGTM: New module for batch unlocking of message queues.The addition of the
unlock_batch_mq_request_header
module complements thelock_batch_mq_request_header
module and aligns with the PR objective of optimizing and improving consume logic. Its placement in the file follows the alphabetical order convention.To ensure the module is properly implemented, please run the following script:
✅ Verification successful
Verified:
unlock_batch_mq_request_header
module is correctly implemented.The
unlock_batch_mq_request_header
module contains the expectedUnlockBatchMqRequestHeader
struct, confirming its proper implementation and alignment with the project's objectives.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation of the unlock_batch_mq_request_header module # Test: Check if the module file exists and contains expected content rg --type rust -g 'unlock_batch_mq_request_header.rs' -C 5 'struct UnlockBatchMqRequestHeader'Length of output: 1256
rocketmq-client/src/consumer/consumer_impl/process_queue.rs (4)
128-130
: LGTM: New methodinc_try_unlock_times
looks good.The new
inc_try_unlock_times
method is a clean and thread-safe way to increment thetry_unlock_times
counter. It uses atomic operations, which is appropriate for this use case.
316-319
: LGTM: New methodset_last_lock_timestamp
is well-implemented.The
set_last_lock_timestamp
method is a clean and thread-safe way to update thelast_lock_timestamp
. It correctly uses an atomic store operation, which is appropriate for this concurrent scenario.
Line range hint
1-334
: Summary: Changes align well with PR objectives.The modifications in this file, including the switch to
RocketMQTokioRwLock
and the addition of methods for tracking unlock attempts and lock timestamps, are consistent with the PR's goal of optimizing and improving the consume logic. These changes enhance the locking mechanism and provide better management of concurrent operations, which should contribute to improved performance and reliability of the message consumption process.
33-33
: Approve the change toRocketMQTokioRwLock
and verify its impact.The change from
Arc<RwLock<()>>
toArc<RocketMQTokioRwLock<()>>
for theconsume_lock
field is a good optimization for asynchronous operations. This custom lock is likely designed to work more efficiently with the Tokio runtime.To ensure this change doesn't introduce any unexpected behavior, please run the following verification:
This will help identify any places where the standard
RwLock
is still being used, ensuring consistency across the codebase.Also applies to: 60-60, 81-81
rocketmq-client/src/consumer/consumer_impl/re_balance.rs (1)
76-76
: Updated Signature ofunlock
Requires VerificationThe method
unlock
has been changed to be asynchronous and now takes&mut self
instead of&self
, andmq
is now passed by reference&MessageQueue
instead of by valueMessageQueue
. Ensure that all implementations of theRebalanceLocal
trait and all calls tounlock
are updated to match the new signature.Run the following script to verify all usages of
unlock
:✅ Verification successful
Verification Successful: All
unlock
Method Usages Are Correctly UpdatedAll implementations of the
RebalanceLocal
trait and calls to theunlock
method have been updated to match the new asynchronous signature.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all implementations and calls of `unlock` that may need updating. # Find all implementations of `RebalanceLocal` and inspect `unlock` method signatures. rg --type rust 'impl.*RebalanceLocal' -A 20 | rg 'fn unlock' # Find all calls to the `unlock` method. rg --type rust 'unlock\('Length of output: 635
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (2)
20-20
: ImportingDuration
for timeout handlingThe addition of
use std::time::Duration;
is appropriate and necessary for handling timeouts when attempting to acquire locks.
32-32
: ImportingUnlockBatchRequestBody
for unlock requestsThe import of
UnlockBatchRequestBody
is essential for constructing unlock batch requests to the broker.rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (3)
421-467
:⚠️ Potential issueModified
lock
method signature impacts existing implementationsThe
lock
method signature has changed to:pub async fn lock( &mut self, mq: &MessageQueue, process_queue_table: &mut HashMap<MessageQueue, Arc<ProcessQueue>>, ) -> boolThis modification requires updating all calls to
lock
to passprocess_queue_table
. Ensure that all callers are updated accordingly to prevent compilation errors.Run the following script to find all usages of
lock
and verify they are updated:#!/bin/bash # Find all usages of the `lock` method rg --type rust 'self\.lock\('
207-227
: Potential race condition when modifyingprocess_queue_table
When removing items from
process_queue_table
at lines 219-220, ensure that no other asynchronous tasks are modifying the same data concurrently without proper synchronization. Since the write lock is acquired, this should prevent concurrent writes, but verify that all accesses are properly synchronized.Automate verification by checking all accesses to
process_queue_table
:Ensure that all accesses are protected by appropriate locks.
421-467
: Potential infinite recursion inlock
methodThe
lock
method callsclient.get_broker_name_from_message_queue(mq).await
, which may internally rely on methods that eventually calllock
again, leading to potential infinite recursion.Check the implementation of
get_broker_name_from_message_queue
to ensure it doesn't create a recursive loop.
impl Display for LockBatchRequestBody { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!( | ||
f, | ||
"UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ | ||
mq_set={:?}]", | ||
self.consumer_group.as_ref().unwrap_or(&"".to_string()), | ||
self.client_id.as_ref().unwrap_or(&"".to_string()), | ||
self.only_this_broker, | ||
self.mq_set | ||
) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect struct name in Display implementation.
The Display
implementation for LockBatchRequestBody
is generally well-implemented. However, there's a minor issue in the output string:
The struct name in the output string is incorrect. It says "UnlockBatchRequestBody" instead of "LockBatchRequestBody". Please apply the following fix:
- "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
+ "LockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
mq_set={:?}]",
This will ensure that the string representation accurately reflects the struct name.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
impl Display for LockBatchRequestBody { | |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
write!( | |
f, | |
"UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ | |
mq_set={:?}]", | |
self.consumer_group.as_ref().unwrap_or(&"".to_string()), | |
self.client_id.as_ref().unwrap_or(&"".to_string()), | |
self.only_this_broker, | |
self.mq_set | |
) | |
} | |
} | |
impl Display for LockBatchRequestBody { | |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
write!( | |
f, | |
"LockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ | |
mq_set={:?}]", | |
self.consumer_group.as_ref().unwrap_or(&"".to_string()), | |
self.client_id.as_ref().unwrap_or(&"".to_string()), | |
self.only_this_broker, | |
self.mq_set | |
) | |
} | |
} |
impl FromMap for LockBatchMqRequestHeader { | ||
type Target = Self; | ||
|
||
fn from(map: &HashMap<String, String>) -> Option<Self::Target> { | ||
let rpc_request_header = <RpcRequestHeader as FromMap>::from(map); | ||
Some(LockBatchMqRequestHeader { rpc_request_header }) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling and consistency with to_map
.
While the implementation of FromMap
for LockBatchMqRequestHeader
is functional, there are opportunities for improvement:
-
Error Handling: The current implementation always returns
Some(...)
, even ifRpcRequestHeader::from
returnsNone
. This could lead to unexpected behavior. -
Consistency: The
to_map
method returnsNone
for an empty map (after the suggested improvement), butfrom
always returnsSome(...)
. This inconsistency could be confusing.
Consider the following implementation to address these issues:
impl FromMap for LockBatchMqRequestHeader {
type Target = Self;
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
if map.is_empty() {
None
} else {
Some(LockBatchMqRequestHeader {
rpc_request_header: <RpcRequestHeader as FromMap>::from(map),
})
}
}
}
This implementation:
- Returns
None
if the input map is empty, maintaining consistency with theto_map
method. - Preserves the
Option
returned byRpcRequestHeader::from
, allowing for better error handling downstream.
impl CommandCustomHeader for UnlockBatchMqRequestHeader { | ||
fn to_map(&self) -> Option<HashMap<String, String>> { | ||
let mut map = HashMap::new(); | ||
if let Some(value) = self.rpc_request_header.as_ref() { | ||
if let Some(value) = value.to_map() { | ||
map.extend(value); | ||
} | ||
} | ||
Some(map) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider optimizing the to_map
method implementation.
The current implementation of to_map
is functional but can be improved for better efficiency and idiomatic Rust.
Consider refactoring the to_map
method as follows:
impl CommandCustomHeader for UnlockBatchMqRequestHeader {
fn to_map(&self) -> Option<HashMap<String, String>> {
self.rpc_request_header
.as_ref()
.and_then(|header| header.to_map())
}
}
This implementation:
- Directly returns
None
ifrpc_request_header
isNone
. - Only creates a
HashMap
ifrpc_request_header
isSome
and itsto_map
method returnsSome
. - Avoids creating an empty
HashMap
when it's not needed.
impl FromMap for UnlockBatchMqRequestHeader { | ||
type Target = Self; | ||
|
||
fn from(map: &HashMap<String, String>) -> Option<Self::Target> { | ||
let rpc_request_header = <RpcRequestHeader as FromMap>::from(map); | ||
Some(UnlockBatchMqRequestHeader { rpc_request_header }) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling in the FromMap
implementation.
The current implementation of from
always returns Some
, which might not be appropriate for all cases.
Consider refactoring the from
method to handle potential errors:
impl FromMap for UnlockBatchMqRequestHeader {
type Target = Self;
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
let rpc_request_header = <RpcRequestHeader as FromMap>::from(map);
// Only create the UnlockBatchMqRequestHeader if RpcRequestHeader was successfully created
rpc_request_header.map(|header| UnlockBatchMqRequestHeader {
rpc_request_header: Some(header),
})
}
}
This implementation:
- Only creates an
UnlockBatchMqRequestHeader
ifRpcRequestHeader
was successfully created from the map. - Returns
None
ifRpcRequestHeader
couldn't be created, which is more appropriate for error cases.
@@ -53,22 +54,34 @@ pub trait RebalanceLocal { | |||
} | |||
|
|||
fn consume_type(&self) -> ConsumeType; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider Changing &self
to &mut self
in remove_dirty_offset
The method remove_dirty_offset
may modify internal state when removing offsets. If this is the case, it should take a mutable reference &mut self
instead of an immutable reference &self
to reflect that it mutates the receiver.
use rocketmq_common::WeakCellWrapper; | ||
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import LockBatchRequestBody
The import LockBatchRequestBody
is added but not used in the code provided. If it's intended for future use, consider adding a comment explaining its purpose. Otherwise, remove the unused import.
Apply this diff to remove the unused import:
-use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; |
async fn build_process_queue_table_by_broker_name( | ||
&self, | ||
) -> HashMap<String /* brokerName */, HashSet<MessageQueue>> { | ||
let mut result = HashMap::new(); | ||
let process_queue_table = self.process_queue_table.read().await; | ||
let client = self.client_instance.as_ref().unwrap(); | ||
for (mq, pq) in process_queue_table.iter() { | ||
if pq.is_dropped() { | ||
continue; | ||
} | ||
let broker_name = client.get_broker_name_from_message_queue(mq).await; | ||
let entry = result.entry(broker_name).or_insert(HashSet::new()); | ||
entry.insert(mq.clone()); | ||
} | ||
result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Inefficient cloning in build_process_queue_table_by_broker_name
At line 536, message queues are cloned and inserted into a HashSet
. If MessageQueue
implements Copy
, cloning can be avoided. If not, consider if it's necessary to clone here or if references can be used instead.
Adjust the code to avoid unnecessary cloning:
- entry.insert(mq.clone());
+ entry.insert(mq.to_owned());
Ensure that to_owned()
is implemented efficiently for MessageQueue
.
Committable suggestion was skipped due to low confidence.
pub async fn unlock_batch_mq( | ||
&mut self, | ||
addr: &str, | ||
request_body: UnlockBatchRequestBody, | ||
timeout_millis: u64, | ||
oneway: bool, | ||
) -> Result<()> { | ||
let mut request = RemotingCommand::create_request_command( | ||
RequestCode::UnlockBatchMq, | ||
UnlockBatchMqRequestHeader::default(), | ||
); | ||
request.set_body_mut_ref(Some(request_body.encode())); | ||
if oneway { | ||
self.remoting_client | ||
.invoke_oneway(addr.to_string(), request, timeout_millis) | ||
.await; | ||
Ok(()) | ||
} else { | ||
let response = self | ||
.remoting_client | ||
.invoke_async( | ||
Some(mix_all::broker_vip_channel( | ||
self.client_config.vip_channel_enabled, | ||
addr, | ||
)), | ||
request, | ||
timeout_millis, | ||
) | ||
.await?; | ||
if ResponseCode::from(response.code()) == ResponseCode::Success { | ||
Ok(()) | ||
} else { | ||
Err(MQBrokerError( | ||
response.code(), | ||
response.remark().map_or("".to_string(), |s| s.to_string()), | ||
addr.to_string(), | ||
)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor to reduce code duplication in unlock_batch_mq
Both branches of the if oneway
condition in unlock_batch_mq
share similar logic for creating the request and handling the response. Consider refactoring to reduce code duplication and improve maintainability.
You could extract the common request creation into a separate function or reorganize the code to minimize repetition.
LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref()) | ||
.map(|body| body.lock_ok_mq_set) | ||
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid potential panic due to unwrap()
on response.body()
In lock_batch_mq
, the call to response.body().as_ref().unwrap()
may panic if response.body()
is None
. This can happen if the response does not contain a body, leading to a runtime panic.
To prevent a potential panic, you should check if response.body()
is Some
before attempting to use it. Handle the None
case appropriately.
Apply this diff:
- LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref())
+ if let Some(body) = response.body() {
+ LockBatchResponseBody::decode(body.as_ref())
.map(|body| body.lock_ok_mq_set)
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string()))
+ } else {
+ Err(MQBrokerError(
+ response.code(),
+ "Response body is empty".to_string(),
+ addr.to_string(),
+ ))
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref()) | |
.map(|body| body.lock_ok_mq_set) | |
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) | |
if let Some(body) = response.body() { | |
LockBatchResponseBody::decode(body.as_ref()) | |
.map(|body| body.lock_ok_mq_set) | |
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) | |
} else { | |
Err(MQBrokerError( | |
response.code(), | |
"Response body is empty".to_string(), | |
addr.to_string(), | |
)) | |
} |
self.remoting_client | ||
.invoke_oneway(addr.to_string(), request, timeout_millis) | ||
.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle errors from invoke_oneway
in unlock_batch_mq
In the oneway branch of unlock_batch_mq
, the result of invoke_oneway
is awaited, but any potential errors are not being handled. This could lead to unhandled errors if the method fails.
To properly handle errors, you should propagate the error by using await?
, so that any errors are returned to the caller.
Apply this diff:
self.remoting_client
- .invoke_oneway(addr.to_string(), request, timeout_millis)
- .await;
+ .invoke_oneway(addr.to_string(), request, timeout_millis)
+ .await?;
Ok(())
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
self.remoting_client | |
.invoke_oneway(addr.to_string(), request, timeout_millis) | |
.await; | |
self.remoting_client | |
.invoke_oneway(addr.to_string(), request, timeout_millis) | |
.await?; | |
Ok(()) |
Ensure consistent use of VIP channel in invoke_oneway
In the oneway branch, invoke_oneway
is called with addr.to_string()
, while in the else branch, invoke_async
uses mix_all::broker_vip_channel
to potentially adjust the address based on the VIP channel configuration. This inconsistency may cause issues when vip_channel_enabled
is true.
Consider modifying the invoke_oneway
call to use mix_all::broker_vip_channel
for consistent address handling.
Apply this diff:
self.remoting_client
- .invoke_oneway(addr.to_string(), request, timeout_millis)
+ .invoke_oneway(
+ mix_all::broker_vip_channel(self.client_config.vip_channel_enabled, addr),
+ request,
+ timeout_millis,
+ )
.await?;
Ok(())
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
self.remoting_client | |
.invoke_oneway(addr.to_string(), request, timeout_millis) | |
.await; | |
self.remoting_client | |
.invoke_oneway( | |
mix_all::broker_vip_channel(self.client_config.vip_channel_enabled, addr), | |
request, | |
timeout_millis, | |
) | |
.await; |
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Which Issue(s) This PR Fixes(Closes)
Fixes #1004
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
ProcessQueue
struct with methods for better tracking of unlock attempts and timestamps.RebalanceLocal
trait with several new asynchronous methods for improved message queue management.Bug Fixes
Documentation
These updates aim to enhance the overall user experience by improving the efficiency and reliability of message queue operations.