Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #981]🔥Optimize client clusting consume⚡️ #985

Merged
merged 1 commit into from
Sep 22, 2024
Merged

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Sep 22, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #981

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced a global lock mechanism for enhanced thread safety during shutdown operations in the consumer implementation.
    • Added new asynchronous methods for managing client registrations and offsets, including unregister_consumer and unregister_producer.
  • Bug Fixes

    • Updated method signatures for shutdown across multiple services to allow mutable references, enabling state modifications during shutdown.
  • Improvements

    • Enhanced logging and error handling in the rebalance process for better traceability.
    • Simplified message pulling logic in the PullAPIWrapper for clearer structure and execution.
  • Documentation

    • Updated method signatures and added comments for clarity on asynchronous operations and their implications.

Copy link
Contributor

coderabbitai bot commented Sep 22, 2024

Walkthrough

The changes in this pull request involve significant modifications to the RocketMQ client, primarily focusing on the consumer functionality. Key updates include the removal of the RocketMQRuntime dependency, enhancements to asynchronous task handling, and the introduction of new methods for managing consumer and producer registrations. Additionally, several method signatures have been updated to support asynchronous operations, and logging has been improved for better traceability. Overall, these changes aim to optimize the client’s message consumption and handling processes.

Changes

Files Change Summary
rocketmq-client/examples/quickstart/consumer.rs Commented out logger initialization in the main function.
rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Removed consume_runtime field, updated task spawning to use tokio::spawn, modified shutdown method signature to mutable reference, and adjusted conditional checks in dispatch_to_consume.
rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs Changed shutdown method signature from immutable to mutable reference and updated implementation to unimplemented!().
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs Modified shutdown method signature from immutable to mutable reference.
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs Changed shutdown method signature from immutable to mutable reference.
rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs Updated shutdown method signature from immutable to mutable reference.
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs Added a global lock for thread safety, introduced a new shutdown method, modified persist_consumer_offset to be asynchronous, and enhanced error handling.
rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs Added new import for MQClientAPIImpl, initialized need_decode_inner_message, simplified handling of msg_found_list, and updated pull_message method signature to require 'static lifetime.
rocketmq-client/src/consumer/consumer_impl/re_balance.rs Added fn destroy(&mut self); method to RebalanceLocal trait.
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs Enhanced logging, updated variable names for clarity, and retained rebalance logic with new variable names.
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs Added destroy method to RebalancePushImpl struct, currently unimplemented.
rocketmq-client/src/consumer/mq_consumer_inner.rs Updated persist_consumer_offset method to be asynchronous.
rocketmq-client/src/factory/mq_client_instance.rs Introduced new asynchronous methods for client management, including shutdown, unregister_consumer, and unregister_producer.
rocketmq-client/src/implementation/mq_client_api_impl.rs Added unregister_client method for client unregistration and modified pull_message method signature for better async handling.
rocketmq-remoting/src/clients/rocketmq_default_impl.rs Updated request handling to mark requests as one-way RPC before sending.
rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs Changed rpc_request_header field to be optional, updated conversion methods accordingly.
rocketmq-remoting/src/protocol/remoting_command.rs Added mark_oneway_rpc_ref method to mark commands as one-way RPC.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Consumer
    participant Broker

    Client->>Consumer: Request to consume messages
    Consumer->>Broker: Pull messages
    Broker-->>Consumer: Return messages
    Consumer->>Client: Deliver messages
Loading

Assessment against linked issues

Objective Addressed Explanation
Optimize client clustering consume ( #981 )

Possibly related PRs

Suggested labels

approved, auto merge

Poem

🐰 In the meadow, changes bloom,
With consumers now in full zoom.
Async tasks dance in the light,
Pulling messages, oh what a sight!
Clusters unite, in harmony they play,
RocketMQ hops into a brighter day! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    -- I pushed a fix in commit <commit_id>, please review it.
    -- Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    -- @coderabbitai generate unit testing code for this file.
    -- @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    -- @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    -- @coderabbitai read src/utils.ts and generate unit testing code.
    -- @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    -- @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Sep 22, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Sep 22, 2024

Codecov Report

Attention: Patch coverage is 0% with 45 lines in your changes missing coverage. Please review.

Project coverage is 19.83%. Comparing base (ba55978) to head (9c7a87e).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...rotocol/header/unregister_client_request_header.rs 0.00% 20 Missing ⚠️
...sumer_impl/consume_message_concurrently_service.rs 0.00% 5 Missing ⚠️
...mer/consumer_impl/default_mq_push_consumer_impl.rs 0.00% 5 Missing ⚠️
rocketmq-remoting/src/protocol/remoting_command.rs 0.00% 4 Missing ⚠️
...mq-client/src/implementation/mq_client_api_impl.rs 0.00% 3 Missing ⚠️
...r/consumer_impl/consume_message_orderly_service.rs 0.00% 2 Missing ⚠️
...er/consumer_impl/re_balance/rebalance_push_impl.rs 0.00% 2 Missing ⚠️
...etmq-remoting/src/clients/rocketmq_default_impl.rs 0.00% 2 Missing ⚠️
...r_impl/consume_message_pop_concurrently_service.rs 0.00% 1 Missing ⚠️
...nsumer_impl/consume_message_pop_orderly_service.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #985      +/-   ##
==========================================
- Coverage   19.84%   19.83%   -0.02%     
==========================================
  Files         410      410              
  Lines       33645    33665      +20     
==========================================
  Hits         6676     6676              
- Misses      26969    26989      +20     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mxsm mxsm merged commit f96d323 into main Sep 22, 2024
16 of 18 checks passed
@SpaceXCN SpaceXCN deleted the feature-981 branch September 22, 2024 12:39
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

Outside diff range and nitpick comments (2)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (1)

276-280: Improve logging statement.

Consider making the following changes to the logging statement:

  • Replace println! with a proper logging macro like debug!, info!, warn!, or error! from the tracing crate for consistency with the rest of the codebase.
  • Improve the formatting of the log message to make it more readable. For example:
    info!(
        "Dispatching {} pull requests for topic {}",
        pull_request_list.len(),
        topic
    );
  • Guard the logging statement with a feature flag or log level check to allow controlling verbosity in production builds. For example:
    #[cfg(debug_assertions)]
    info!(...);
    or
    if log_enabled!(Level::Info) {
        info!(...);
    }
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)

836-837: Reconsider the Change to the Method Signature of pull_message

  • Receiver Change: Changing the method receiver from &mut self to mut this: ArcRefCellWrapper<Self> introduces additional complexity due to reference counting and interior mutability. Ensure this change is necessary and that it doesn't introduce unintended side effects. If the goal is to allow the method to be called in contexts requiring ownership or to be sent across threads, consider documenting this rationale.

  • Parameter Type Change: Changing addr from &str to String results in an ownership transfer. If the function does not need to own addr, consider keeping it as &str to avoid unnecessary cloning and improve performance.

Also applies to: 844-844

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between ba55978 and 9c7a87e.

Files selected for processing (17)
  • rocketmq-client/examples/quickstart/consumer.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (7 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (5 hunks)
  • rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (5 hunks)
  • rocketmq-client/src/consumer/consumer_impl/re_balance.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (4 hunks)
  • rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1 hunks)
  • rocketmq-client/src/consumer/mq_consumer_inner.rs (1 hunks)
  • rocketmq-client/src/factory/mq_client_instance.rs (3 hunks)
  • rocketmq-client/src/implementation/mq_client_api_impl.rs (5 hunks)
  • rocketmq-remoting/src/clients/rocketmq_default_impl.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs (2 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
Additional comments not posted (25)
rocketmq-client/src/consumer/mq_consumer_inner.rs (1)

44-44: LGTM! Verify the asynchronous behavior is handled correctly.

The change from a synchronous to an asynchronous method signature for persist_consumer_offset aligns with the overall objective of optimizing the client's message consumption and handling processes.

Please ensure that the implementation of this method and its usage throughout the codebase are updated to properly handle the asynchronous behavior. Run the following script to verify the usage of persist_consumer_offset:

rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs (1)

36-37: Verify the impact of the function signature change.

The function signature change from fn shutdown(&self, await_terminate_millis: u64) to fn shutdown(&mut self, await_terminate_millis: u64) is a breaking change.

Please ensure that all calls to this function have been updated to pass a mutable reference to the ConsumeMessageOrderlyService instance.

Run the following script to verify the function usage:

Verification successful

No existing calls to ConsumeMessageOrderlyService::shutdown were found. Verification successful.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all function calls to `shutdown` pass a mutable reference.

# Test: Search for the function usage. Expect: Only occurrences of the new signature.
rg --type rust -A 5 $'ConsumeMessageOrderlyService::shutdown'

Length of output: 60


Script:

#!/bin/bash
# Description: Find all calls to the `shutdown` method and verify they use a mutable reference.

# Using ast-grep to find method calls to `shutdown` on mutable references.
ast-grep --lang rust --pattern '
    impl _ {
        $_:shutdown($_) {
            $$$
        }
    }
'

# Alternatively, using ripgrep to search for `shutdown` method calls with context.
rg --type rust 'shutdown\s*\(' -A 5

Length of output: 36430

rocketmq-client/examples/quickstart/consumer.rs (1)

35-35: Verify the reason for disabling the logger initialization.

The logger initialization has been commented out. This change disables logging during the execution of the main function, which could lead to missing important log messages that are useful for debugging or monitoring the application.

Please verify the reason for disabling the logger initialization. If it's intentional, consider adding a comment to explain the change.

rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (1)

48-48: LGTM!

The change to the shutdown method signature in the ConsumeMessageServiceTrait trait is appropriate and consistent with the method's purpose. Allowing the method to modify the state of the implementing struct during the shutdown process is a reasonable update.

Ensure that all implementations of the ConsumeMessageServiceTrait trait are updated to match the new method signature.

rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs (3)

34-34: LGTM!

The change to make rpc_request_header an optional field in the UnregisterClientRequestHeader struct is a good design decision. It provides flexibility to construct instances of the struct without always requiring an RpcRequestHeader.


58-58: LGTM!

The update to the from method in the FromMap implementation for UnregisterClientRequestHeader correctly handles the optional RpcRequestHeader value. It is consistent with the field type change and assigns the value obtained from the map appropriately.


65-85: LGTM!

The modifications to the to_map method in the CommandCustomHeader implementation for UnregisterClientRequestHeader are well-structured and handle the optional fields correctly. The method initializes a new HashMap, inserts the client_id, producer_group, and consumer_group fields conditionally, and extends the rpc_request_header map into the main map if present. The changes are consistent with the field type update and ensure that the method returns the expected Option<HashMap<String, String>>.

rocketmq-remoting/src/clients/rocketmq_default_impl.rs (1)

376-377: LGTM!

The code change correctly marks the request as a one-way RPC before sending it. This optimization is beneficial for scenarios where a response is not expected, as it avoids unnecessary response handling and can improve performance.

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (3)

297-298: LGTM!

The variable renaming from ci_all to cid_all improves code clarity.


Line range hint 304-318: LGTM!

The changes in this code segment, including the variable renaming and the warning log message, are appropriate and improve code clarity.


Line range hint 325-378: LGTM!

The changes in this code segment, including the variable renaming and the enhanced logging statement, are beneficial and improve code clarity and observability.

rocketmq-remoting/src/protocol/remoting_command.rs (1)

360-363: LGTM!

The mark_oneway_rpc_ref function correctly marks the command as a one-way RPC reference by setting the corresponding bit in the flag field. The implementation is consistent with the existing mark_oneway_rpc function and modifies the flag field appropriately using a mutable reference.

rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (3)

46-46: Import statement for MQClientAPIImpl is appropriate

The import of MQClientAPIImpl is necessary for the updated method calls and is correctly added.


Line range hint 111-196: Efficient handling of inner batched messages

The added logic correctly identifies and processes messages that require inner decoding. This ensures that messages with INNER_BATCH_FLAG and NEED_UNWRAP_FLAG are appropriately decoded.


196-196: Proper assignment to msg_found_list

The assignment of msg_found_list with messages wrapped in ArcRefCellWrapper aligns with the expected data structures and usage patterns.

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (3)

263-263: Review the logic change in batch size condition

The condition has been updated from msgs.len() < consume_batch_size to msgs.len() <= consume_batch_size. This change affects when messages are processed in batches. Confirm that this adjustment is intentional and that it correctly handles cases when msgs.len() equals consume_batch_size.


275-277: Task spawning with tokio::spawn is appropriate

The use of tokio::spawn to run the consume_request task aligns with asynchronous best practices in Rust. This change simplifies task management and is suitable within the Tokio runtime environment.


178-181: Verify appropriate use of tokio::spawn for task scheduling

The code now uses tokio::spawn directly to spawn asynchronous tasks. Ensure that the application is correctly configured with a Tokio runtime and that this change aligns with the overall task management strategy.

Run the following script to check for remaining usages of custom runtimes and confirm consistent task spawning:

Verification successful

Tokio::spawn Usage Verified

The use of tokio::spawn in consume_message_concurrently_service.rs is appropriate and aligns with the Tokio runtime configuration.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for any occurrences of `RocketMQRuntime` in the codebase.

rg --type rust 'RocketMQRuntime'

Length of output: 2989

rocketmq-client/src/implementation/mq_client_api_impl.rs (2)

54-54: Import UnregisterClientRequestHeader Correctly

The UnregisterClientRequestHeader is correctly imported to support the new unregister_client method.


1011-1041: unregister_client Method Implemented Correctly

The new unregister_client method is correctly implemented, constructing the request header and handling the response appropriately. Parameters are appropriately utilized.

rocketmq-client/src/factory/mq_client_instance.rs (1)

944-949: Methods unregister_consumer and unregister_producer are correctly implemented

The methods unregister_consumer and unregister_producer correctly delegate to unregister_client with the appropriate parameters.

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (4)

50-50: Importing tokio::sync::Mutex

The addition of tokio::sync::Mutex is appropriate for introducing asynchronous mutex functionality.


100-100: Introduction of global_lock field for thread safety

The new field global_lock: Arc<Mutex<()>> in DefaultMQPushConsumerImpl enhances concurrency control during shutdown operations.


144-144: Initializing global_lock correctly

The global_lock is correctly initialized in the constructor using Arc::new(Default::default()).


1291-1311: Ensure all calls to persist_consumer_offset are updated for asynchronous execution

The persist_consumer_offset method has been changed from a synchronous to an asynchronous function. Please verify that all calls to this method have been updated to await its result to prevent any runtime errors.

Run the following script to identify all calls to persist_consumer_offset that need to be updated:

Verification successful

All calls to persist_consumer_offset are correctly updated with .await. No issues found.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all calls to `persist_consumer_offset` that may need to be updated.

# Search for invocations of `persist_consumer_offset` in the codebase
rg --type rust 'persist_consumer_offset\('

# Expected: All calls should now use `.await` on `persist_consumer_offset()`.

Length of output: 492

Comment on lines +36 to 38
fn shutdown(&mut self, await_terminate_millis: u64) {
todo!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change: shutdown now requires a mutable reference to self.

The function signature has been modified to take a mutable reference to self, which is a breaking change for any existing code that calls this function. Please ensure that this change is necessary and update the calling code accordingly.

Additionally, the function body is not implemented and contains a todo!() macro. Please provide more context on the planned implementation and ensure that the function is fully implemented before merging this pull request.

Comment on lines +36 to +37
fn shutdown(&mut self, await_terminate_millis: u64) {
unimplemented!("shutdown")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the shutdown logic.

The function signature change to take a mutable reference to self is appropriate, as the shutdown function is expected to modify the state of the ConsumeMessageOrderlyService instance.

However, the function is currently unimplemented. Please implement the shutdown logic, considering the following:

  • The function should wait for the specified await_terminate_millis duration before terminating.
  • The function should gracefully shutdown any running tasks or processes.
  • The function should update the state of the ConsumeMessageOrderlyService instance to reflect the shutdown.

@@ -70,4 +70,5 @@ pub trait RebalanceLocal {
async fn do_rebalance(&mut self, is_order: bool) -> bool;

fn client_rebalance(&mut self, topic: &str) -> bool;
fn destroy(&mut self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a default implementation for the destroy method.

Adding a new method to the trait is a breaking change, as it requires all existing implementers of the trait to implement this method. To avoid breaking changes and provide a fallback behavior for implementers who may not need any cleanup, consider adding a default implementation to the trait.

Here's an example of how you can add a default implementation:

fn destroy(&mut self) {
    // Default implementation that does nothing
}

This way, implementers who need cleanup behavior can override the default implementation, while others can rely on the default implementation.

Comment on lines +64 to 66
fn shutdown(&mut self, await_terminate_millis: u64) {
todo!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the shutdown function implementation.

The function signature change to take a mutable reference to self is appropriate, as it allows the function to modify the state of the ConsumeMessagePopConcurrentlyService instance during shutdown.

However, the function body is currently unimplemented and contains a todo!() macro. Please complete the implementation to ensure the shutdown functionality works as intended.

Comment on lines +463 to +465
fn destroy(&mut self) {
unimplemented!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the destroy method.

The destroy method is currently unimplemented. Please add the necessary cleanup logic or resource deallocation code to ensure proper cleanup when the RebalancePushImpl instance is no longer needed.

Consider adding a TODO comment to track the pending implementation:

     fn destroy(&mut self) {
+        // TODO: Implement cleanup logic
         unimplemented!()
     }

Committable suggestion was skipped due to low confidence.

Comment on lines +904 to +907
println!(
"++++++++++++++++++++++++pull_message_async response: {}",
response
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace println! with Logging Macros for Response Logging

Replace the println! statement with a logging macro to adhere to consistent logging practices.

Apply this diff:

+use tracing::debug;

 ...

-    println!(
+    debug!(
         "++++++++++++++++++++++++pull_message_async response: {}",
         response
     );

Committable suggestion was skipped due to low confidence.

@@ -253,6 +253,8 @@ where
Ok(())
}

pub async fn shutdown(&mut self) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the shutdown method or mark it as unimplemented

The shutdown method currently has an empty implementation. This could lead to confusion or runtime errors if the method is called. Consider implementing the necessary shutdown logic or explicitly mark it as unimplemented to indicate that it's a work in progress.

Apply this diff to address the issue:

 pub async fn shutdown(&mut self) {
-    // Empty implementation
+    unimplemented!("Shutdown not implemented yet");
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn shutdown(&mut self) {}
pub async fn shutdown(&mut self) {
unimplemented!("Shutdown not implemented yet");
}

Comment on lines +556 to +559
let consumer_table = self.consumer_table.read().await;
for (_, value) in consumer_table.iter() {
value.persist_consumer_offset().await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid holding a lock across an .await point

Holding the read lock on consumer_table while awaiting persist_consumer_offset().await can lead to deadlocks or performance bottlenecks because the lock is held for the duration of the asynchronous call. To prevent this, collect the consumers into a list, release the lock, and then perform the asynchronous operations.

Apply this diff to fix the issue:

-        let consumer_table = self.consumer_table.read().await;
-        for (_, value) in consumer_table.iter() {
-            value.persist_consumer_offset().await;
-        }
+        let consumers = {
+            let consumer_table = self.consumer_table.read().await;
+            consumer_table.values().cloned().collect::<Vec<_>>()
+        };
+        for consumer in consumers {
+            consumer.persist_consumer_offset().await;
+        }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let consumer_table = self.consumer_table.read().await;
for (_, value) in consumer_table.iter() {
value.persist_consumer_offset().await;
}
let consumers = {
let consumer_table = self.consumer_table.read().await;
consumer_table.values().cloned().collect::<Vec<_>>()
};
for consumer in consumers {
consumer.persist_consumer_offset().await;
}

Comment on lines +951 to +979
async fn unregister_client(
&mut self,
producer_group: Option<String>,
consumer_group: Option<String>,
) {
let broker_addr_table = self.broker_addr_table.read().await;
for (broker_name, broker_addrs) in broker_addr_table.iter() {
for (id, addr) in broker_addrs.iter() {
if let Err(err) = self
.mq_client_api_impl
.unregister_client(
addr,
self.client_id.as_str(),
producer_group.clone(),
consumer_group.clone(),
self.client_config.mq_client_api_timeout,
)
.await
{
} else {
info!(
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \
success",
producer_group, consumer_group, broker_name, id, addr,
);
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle errors in the unregister_client call

The error handling block in the if let Err(err) statement is empty. This means that if an error occurs during the unregister_client call, it will be silently ignored, which can make debugging difficult. Consider logging the error to provide visibility into potential issues.

Apply this diff to handle the error:

                     )
                     .await
                 {
+                    error!(
+                        "Failed to unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}]: {}",
+                        producer_group, consumer_group, broker_name, id, addr, err
+                    );
                 } else {
                     info!(
                         "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] success",
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn unregister_client(
&mut self,
producer_group: Option<String>,
consumer_group: Option<String>,
) {
let broker_addr_table = self.broker_addr_table.read().await;
for (broker_name, broker_addrs) in broker_addr_table.iter() {
for (id, addr) in broker_addrs.iter() {
if let Err(err) = self
.mq_client_api_impl
.unregister_client(
addr,
self.client_id.as_str(),
producer_group.clone(),
consumer_group.clone(),
self.client_config.mq_client_api_timeout,
)
.await
{
} else {
info!(
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \
success",
producer_group, consumer_group, broker_name, id, addr,
);
}
}
}
}
async fn unregister_client(
&mut self,
producer_group: Option<String>,
consumer_group: Option<String>,
) {
let broker_addr_table = self.broker_addr_table.read().await;
for (broker_name, broker_addrs) in broker_addr_table.iter() {
for (id, addr) in broker_addrs.iter() {
if let Err(err) = self
.mq_client_api_impl
.unregister_client(
addr,
self.client_id.as_str(),
producer_group.clone(),
consumer_group.clone(),
self.client_config.mq_client_api_timeout,
)
.await
{
error!(
"Failed to unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}]: {}",
producer_group, consumer_group, broker_name, id, addr, err
);
} else {
info!(
"unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \
success",
producer_group, consumer_group, broker_name, id, addr,
);
}
}
}
}

Comment on lines +378 to +422
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::Running => {
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
}
}
drop(_lock);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid holding the global_lock across .await points in shutdown method

In the shutdown method, the global_lock is acquired and held across several .await calls. Holding an async mutex lock across .await points can lead to performance bottlenecks and potential deadlocks, as it prevents other tasks from acquiring the lock while the current task is suspended.

Suggestion:

Refactor the shutdown method to limit the scope of the global_lock. Acquire the lock only for the critical section that modifies the service_state, and release it before making any asynchronous calls.

Proposed change:

 pub async fn shutdown(&mut self, await_terminate_millis: u64) {
-    let _lock = self.global_lock.lock().await;
+    {
+        let _lock = self.global_lock.lock().await;
+        match *self.service_state {
+            ServiceState::CreateJust => {
+                warn!(
+                    "the consumer [{}] do not start, so do nothing",
+                    self.consumer_config.consumer_group
+                );
+                return;
+            }
+            ServiceState::Running => {
+                *self.service_state = ServiceState::ShutdownAlready;
+            }
+            ServiceState::ShutdownAlready => {
+                warn!(
+                    "the consumer [{}] has been shutdown, do nothing",
+                    self.consumer_config.consumer_group
+                );
+                return;
+            }
+            ServiceState::StartFailed => {
+                warn!(
+                    "the consumer [{}] start failed, do nothing",
+                    self.consumer_config.consumer_group
+                );
+                return;
+            }
+        }
+    } // The lock is released here

    if let Some(consume_message_concurrently_service) =
        self.consume_message_concurrently_service.as_mut()
    {
        consume_message_concurrently_service
            .consume_message_concurrently_service
            .shutdown(await_terminate_millis);
    }
    self.persist_consumer_offset().await;
    let client = self.client_instance.as_mut().unwrap();
    client
        .unregister_consumer(self.consumer_config.consumer_group.as_str())
        .await;
    client.shutdown().await;
    info!(
        "the consumer [{}] shutdown OK",
        self.consumer_config.consumer_group.as_str()
    );
    self.rebalance_impl.destroy();
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::Running => {
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
}
}
drop(_lock);
}
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
{
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
return;
}
ServiceState::Running => {
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
return;
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
return;
}
}
} // The lock is released here
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enhancement⚡️] Optimize client clusting consume
2 participants