Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #981]🔥Optimize client clusting consume⚡️ #985

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub const TAG: &str = "*";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();
//rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQPushConsumer::builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_runtime::RocketMQRuntime;
use tracing::info;
use tracing::warn;

Expand All @@ -53,7 +52,7 @@
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerConcurrently,
pub(crate) consume_runtime: Arc<RocketMQRuntime>,
// pub(crate) consume_runtime: Arc<RocketMQRuntime>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out field consume_runtime for cleaner code

The field consume_runtime is commented out in the struct definition. If it's no longer required, consider removing it entirely to maintain code cleanliness and readability.

Apply this diff to remove the commented-out field:

-// pub(crate) consume_runtime: Arc<RocketMQRuntime>,
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// pub(crate) consume_runtime: Arc<RocketMQRuntime>,

}

impl ConsumeMessageConcurrentlyService {
Expand All @@ -71,10 +70,10 @@
consumer_config,
consumer_group: Arc::new(consumer_group),
message_listener,
consume_runtime: Arc::new(RocketMQRuntime::new_multi(
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
consume_thread as usize,
"ConsumeMessageThread_",
)),
)),*/
Comment on lines +73 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eliminate commented-out initialization code

The initialization of consume_runtime within the new method is commented out. If this code is obsolete, removing it can reduce clutter and improve maintainability.

Apply this diff to remove the commented-out code:

-/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
-    consume_thread as usize,
-    "ConsumeMessageThread_",
-)),*/
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
consume_thread as usize,
"ConsumeMessageThread_",
)),
)),*/

}
}
}
Expand Down Expand Up @@ -140,12 +139,6 @@
}
if !msg_back_failed.is_empty() {
consume_request.msgs.append(&mut msg_back_success);
/* let msg_back_failed_switched = msg_back_failed
.into_iter()
.map(|msg| MessageClientExt {
message_ext_inner: msg,
})
.collect();*/
self.submit_consume_request_later(
msg_back_failed,
consume_request.process_queue.clone(),
Expand Down Expand Up @@ -182,7 +175,7 @@
message_queue: MessageQueue,
) {
let this = self.clone();
self.consume_runtime.get_handle().spawn(async move {
tokio::spawn(async move {

Check warning on line 178 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L178

Added line #L178 was not covered by tests
tokio::time::sleep(Duration::from_secs(5)).await;
this.submit_consume_request(msgs, process_queue, message_queue, true)
.await;
Expand Down Expand Up @@ -231,8 +224,8 @@
});
}

fn shutdown(&self, await_terminate_millis: u64) {
todo!()
fn shutdown(&mut self, await_terminate_millis: u64) {

Check warning on line 227 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L227

Added line #L227 was not covered by tests
// todo!()
Comment on lines +227 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirm necessity of mutable reference in shutdown method

The shutdown method's receiver has changed from &self to &mut self. Verify that this modification is intentional and necessary, as it implies that the method will mutate the instance. Ensure this does not introduce unintended side effects or borrowing conflicts elsewhere in the code.

}

fn update_core_pool_size(&self, core_pool_size: usize) {
Expand Down Expand Up @@ -267,7 +260,7 @@
dispatch_to_consume: bool,
) {
let consume_batch_size = self.consumer_config.consume_message_batch_max_size;
if msgs.len() < consume_batch_size as usize {
if msgs.len() <= consume_batch_size as usize {

Check warning on line 263 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L263

Added line #L263 was not covered by tests
let mut consume_request = ConsumeRequest {
msgs: msgs.clone(),
message_listener: self.message_listener.clone(),
Expand All @@ -278,7 +271,8 @@
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
};
let consume_message_concurrently_service = self.clone();
self.consume_runtime.get_handle().spawn(async move {

tokio::spawn(async move {

Check warning on line 275 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L275

Added line #L275 was not covered by tests
consume_request
.run(consume_message_concurrently_service)
.await
Expand All @@ -301,7 +295,12 @@
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
};
let consume_message_concurrently_service = self.clone();
self.consume_runtime.get_handle().spawn(async move {
/* self.consume_runtime.get_handle().spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
});*/
tokio::spawn(async move {

Check warning on line 303 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L303

Added line #L303 was not covered by tests
Comment on lines +298 to +303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove obsolete commented-out task spawning code

The previous code using self.consume_runtime.get_handle().spawn is commented out. If the custom runtime is deprecated or no longer needed, consider removing this code to enhance clarity.

Apply this diff to remove the commented-out spawning code:

-/*                self.consume_runtime.get_handle().spawn(async move {
-    consume_request
-        .run(consume_message_concurrently_service)
-        .await
-});*/
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* self.consume_runtime.get_handle().spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
});*/
tokio::spawn(async move {
tokio::spawn(async move {

consume_request
.run(consume_message_concurrently_service)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
todo!()
}

fn shutdown(&self, await_terminate_millis: u64) {
todo!()
fn shutdown(&mut self, await_terminate_millis: u64) {
unimplemented!("shutdown")

Check warning on line 37 in rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L36-L37

Added lines #L36 - L37 were not covered by tests
Comment on lines +36 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the shutdown logic.

The function signature change to take a mutable reference to self is appropriate, as the shutdown function is expected to modify the state of the ConsumeMessageOrderlyService instance.

However, the function is currently unimplemented. Please implement the shutdown logic, considering the following:

  • The function should wait for the specified await_terminate_millis duration before terminating.
  • The function should gracefully shutdown any running tasks or processes.
  • The function should update the state of the ConsumeMessageOrderlyService instance to reflect the shutdown.

}

fn update_core_pool_size(&self, core_pool_size: usize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
// nothing to do
}

fn shutdown(&self, await_terminate_millis: u64) {
fn shutdown(&mut self, await_terminate_millis: u64) {

Check warning on line 64 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L64

Added line #L64 was not covered by tests
todo!()
}
Comment on lines +64 to 66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the shutdown function implementation.

The function signature change to take a mutable reference to self is appropriate, as it allows the function to modify the state of the ConsumeMessagePopConcurrentlyService instance during shutdown.

However, the function body is currently unimplemented and contains a todo!() macro. Please complete the implementation to ensure the shutdown functionality works as intended.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
todo!()
}

fn shutdown(&self, await_terminate_millis: u64) {
fn shutdown(&mut self, await_terminate_millis: u64) {

Check warning on line 36 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs#L36

Added line #L36 was not covered by tests
todo!()
}
Comment on lines +36 to 38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change: shutdown now requires a mutable reference to self.

The function signature has been modified to take a mutable reference to self, which is a breaking change for any existing code that calls this function. Please ensure that this change is necessary and update the calling code accordingly.

Additionally, the function body is not implemented and contains a todo!() macro. Please provide more context on the planned implementation and ensure that the function is fully implemented before merging this pull request.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
pub trait ConsumeMessageServiceTrait {
fn start(&mut self);

fn shutdown(&self, await_terminate_millis: u64);
fn shutdown(&mut self, await_terminate_millis: u64);

fn update_core_pool_size(&self, core_pool_size: usize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use rocketmq_remoting::runtime::RPCHook;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tracing::error;
use tracing::info;
use tracing::warn;
Expand Down Expand Up @@ -96,6 +97,7 @@

#[derive(Clone)]
pub struct DefaultMQPushConsumerImpl {
pub(crate) global_lock: Arc<Mutex<()>>,

Check warning on line 100 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L100

Added line #L100 was not covered by tests
pub(crate) pull_time_delay_mills_when_exception: u64,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
Expand Down Expand Up @@ -139,6 +141,7 @@
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
) -> Self {
let mut this = Self {
global_lock: Arc::new(Default::default()),

Check warning on line 144 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L144

Added line #L144 was not covered by tests
pull_time_delay_mills_when_exception: 3_000,
client_config: ArcRefCellWrapper::new(client_config.clone()),
consumer_config: consumer_config.clone(),
Expand Down Expand Up @@ -372,6 +375,52 @@
Ok(())
}

pub async fn shutdown(&mut self, await_terminate_millis: u64) {

Check warning on line 378 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L378

Added line #L378 was not covered by tests
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::Running => {
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
}
}
drop(_lock);
}

Check warning on line 422 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L422

Added line #L422 was not covered by tests
Comment on lines +378 to +422
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid holding the global_lock across .await points in shutdown method

In the shutdown method, the global_lock is acquired and held across several .await calls. Holding an async mutex lock across .await points can lead to performance bottlenecks and potential deadlocks, as it prevents other tasks from acquiring the lock while the current task is suspended.

Suggestion:

Refactor the shutdown method to limit the scope of the global_lock. Acquire the lock only for the critical section that modifies the service_state, and release it before making any asynchronous calls.

Proposed change:

 pub async fn shutdown(&mut self, await_terminate_millis: u64) {
-    let _lock = self.global_lock.lock().await;
+    {
+        let _lock = self.global_lock.lock().await;
+        match *self.service_state {
+            ServiceState::CreateJust => {
+                warn!(
+                    "the consumer [{}] do not start, so do nothing",
+                    self.consumer_config.consumer_group
+                );
+                return;
+            }
+            ServiceState::Running => {
+                *self.service_state = ServiceState::ShutdownAlready;
+            }
+            ServiceState::ShutdownAlready => {
+                warn!(
+                    "the consumer [{}] has been shutdown, do nothing",
+                    self.consumer_config.consumer_group
+                );
+                return;
+            }
+            ServiceState::StartFailed => {
+                warn!(
+                    "the consumer [{}] start failed, do nothing",
+                    self.consumer_config.consumer_group
+                );
+                return;
+            }
+        }
+    } // The lock is released here

    if let Some(consume_message_concurrently_service) =
        self.consume_message_concurrently_service.as_mut()
    {
        consume_message_concurrently_service
            .consume_message_concurrently_service
            .shutdown(await_terminate_millis);
    }
    self.persist_consumer_offset().await;
    let client = self.client_instance.as_mut().unwrap();
    client
        .unregister_consumer(self.consumer_config.consumer_group.as_str())
        .await;
    client.shutdown().await;
    info!(
        "the consumer [{}] shutdown OK",
        self.consumer_config.consumer_group.as_str()
    );
    self.rebalance_impl.destroy();
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::Running => {
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
}
}
drop(_lock);
}
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
{
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
return;
}
ServiceState::Running => {
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
return;
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
return;
}
}
} // The lock is released here
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
}


async fn update_topic_subscribe_info_when_subscription_changed(&mut self) {
if DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED {
return;
Expand Down Expand Up @@ -1239,8 +1288,27 @@
Ok(false)
}

fn persist_consumer_offset(&self) {
todo!()
async fn persist_consumer_offset(&self) {

Check warning on line 1291 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1291

Added line #L1291 was not covered by tests
if let Err(err) = self.make_sure_state_ok() {
error!(
"group: {} persistConsumerOffset exception:{}",
self.consumer_config.consumer_group, err
);
} else {
let guard = self
.rebalance_impl
.rebalance_impl_inner
.process_queue_table
.read()
.await;
let allocate_mq = guard.keys().cloned().collect::<HashSet<_>>();
self.offset_store
.as_ref()
.unwrap()
.mut_from_ref()
.persist_all(&allocate_mq)
.await;
}
}

async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
Expand Down
25 changes: 14 additions & 11 deletions rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::factory::mq_client_instance::MQClientInstance;
use crate::hook::filter_message_context::FilterMessageContext;
use crate::hook::filter_message_hook::FilterMessageHook;
use crate::implementation::communication_mode::CommunicationMode;
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::Result;

#[derive(Clone)]
Expand Down Expand Up @@ -107,6 +108,7 @@ impl PullAPIWrapper {
self.client_instance.client_config.decode_read_body,
self.client_instance.client_config.decode_decompress_body,
);

let mut need_decode_inner_message = false;
for msg in &msg_vec {
if MessageSysFlag::check(
Expand Down Expand Up @@ -191,6 +193,7 @@ impl PullAPIWrapper {
msg.message_ext_inner.queue_offset += offset_delta;
}
}

pull_result_ext.pull_result.msg_found_list = msg_list_filter_again
.into_iter()
.map(ArcRefCellWrapper::new)
Expand Down Expand Up @@ -233,7 +236,7 @@ impl PullAPIWrapper {
pull_callback: PCB,
) -> Result<Option<PullResultExt>>
where
PCB: PullCallback,
PCB: PullCallback + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential over-restriction with 'static lifetime bound on PCB

Adding a 'static lifetime bound to PCB (the PullCallback trait) may unnecessarily restrict implementations by requiring that callbacks have a 'static lifetime. This could limit flexibility, especially if closures or references with shorter lifetimes are needed.

Consider refactoring to remove the 'static lifetime bound if it isn't strictly necessary. If the bound is required due to async lifetime restrictions, adding a comment explaining this necessity would improve code clarity.

{
let broker_name = self
.client_instance
Expand Down Expand Up @@ -320,16 +323,16 @@ impl PullAPIWrapper {
.compute_pull_from_which_filter_server(mq.get_topic(), broker_addr.as_str())
.await?;
}
self.client_instance
.get_mq_client_api_impl()
.pull_message(
broker_addr.as_str(),
request_header,
timeout_millis,
communication_mode,
pull_callback,
)
.await

MQClientAPIImpl::pull_message(
self.client_instance.get_mq_client_api_impl(),
broker_addr,
request_header,
timeout_millis,
communication_mode,
pull_callback,
)
.await
Comment on lines +326 to +335
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure correct invocation of pull_message method

The pull_message method is called as an associated function of MQClientAPIImpl, passing in the client API instance as a parameter. Verify that pull_message is designed to be used as a static method and that this change aligns with the intended design.

If pull_message is an instance method, it should be called directly on the instance. Consider modifying the call:

- MQClientAPIImpl::pull_message(
-     self.client_instance.get_mq_client_api_impl(),
+ self.client_instance.get_mq_client_api_impl().pull_message(
      broker_addr,
      request_header,
      timeout_millis,
      communication_mode,
      pull_callback,
  )
  .await

Committable suggestion was skipped due to low confidence.

} else {
Err(MQClientErr(
-1,
Expand Down
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer/consumer_impl/re_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ pub trait RebalanceLocal {
async fn do_rebalance(&mut self, is_order: bool) -> bool;

fn client_rebalance(&mut self, topic: &str) -> bool;
fn destroy(&mut self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a default implementation for the destroy method.

Adding a new method to the trait is a breaking change, as it requires all existing implementers of the trait to implement this method. To avoid breaking changes and provide a fallback behavior for implementers who may not need any cleanup, consider adding a default implementation to the trait.

Here's an example of how you can add a default implementation:

fn destroy(&mut self) {
    // Default implementation that does nothing
}

This way, implementers who need cleanup behavior can override the default implementation, while others can rely on the default implementation.

}
Loading
Loading