Skip to content

Commit

Permalink
[ISSUE #981]🔥Optimize client clusting consume⚡️ (#985)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Sep 22, 2024
1 parent ba55978 commit f96d323
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 77 deletions.
2 changes: 1 addition & 1 deletion rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub const TAG: &str = "*";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();
//rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQPushConsumer::builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_runtime::RocketMQRuntime;
use tracing::info;
use tracing::warn;

Expand All @@ -53,7 +52,7 @@ pub struct ConsumeMessageConcurrentlyService {
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerConcurrently,
pub(crate) consume_runtime: Arc<RocketMQRuntime>,
// pub(crate) consume_runtime: Arc<RocketMQRuntime>,
}

impl ConsumeMessageConcurrentlyService {
Expand All @@ -71,10 +70,10 @@ impl ConsumeMessageConcurrentlyService {
consumer_config,
consumer_group: Arc::new(consumer_group),
message_listener,
consume_runtime: Arc::new(RocketMQRuntime::new_multi(
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
consume_thread as usize,
"ConsumeMessageThread_",
)),
)),*/
}
}
}
Expand Down Expand Up @@ -140,12 +139,6 @@ impl ConsumeMessageConcurrentlyService {
}
if !msg_back_failed.is_empty() {
consume_request.msgs.append(&mut msg_back_success);
/* let msg_back_failed_switched = msg_back_failed
.into_iter()
.map(|msg| MessageClientExt {
message_ext_inner: msg,
})
.collect();*/
self.submit_consume_request_later(
msg_back_failed,
consume_request.process_queue.clone(),
Expand Down Expand Up @@ -182,7 +175,7 @@ impl ConsumeMessageConcurrentlyService {
message_queue: MessageQueue,
) {
let this = self.clone();
self.consume_runtime.get_handle().spawn(async move {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
this.submit_consume_request(msgs, process_queue, message_queue, true)
.await;
Expand Down Expand Up @@ -231,8 +224,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
});
}

fn shutdown(&self, await_terminate_millis: u64) {
todo!()
fn shutdown(&mut self, await_terminate_millis: u64) {
// todo!()
}

fn update_core_pool_size(&self, core_pool_size: usize) {
Expand Down Expand Up @@ -267,7 +260,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
dispatch_to_consume: bool,
) {
let consume_batch_size = self.consumer_config.consume_message_batch_max_size;
if msgs.len() < consume_batch_size as usize {
if msgs.len() <= consume_batch_size as usize {
let mut consume_request = ConsumeRequest {
msgs: msgs.clone(),
message_listener: self.message_listener.clone(),
Expand All @@ -278,7 +271,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
};
let consume_message_concurrently_service = self.clone();
self.consume_runtime.get_handle().spawn(async move {

tokio::spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
Expand All @@ -301,7 +295,12 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
};
let consume_message_concurrently_service = self.clone();
self.consume_runtime.get_handle().spawn(async move {
/* self.consume_runtime.get_handle().spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
});*/
tokio::spawn(async move {
consume_request
.run(consume_message_concurrently_service)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService {
todo!()
}

fn shutdown(&self, await_terminate_millis: u64) {
todo!()
fn shutdown(&mut self, await_terminate_millis: u64) {
unimplemented!("shutdown")
}

fn update_core_pool_size(&self, core_pool_size: usize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
// nothing to do
}

fn shutdown(&self, await_terminate_millis: u64) {
fn shutdown(&mut self, await_terminate_millis: u64) {
todo!()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
todo!()
}

fn shutdown(&self, await_terminate_millis: u64) {
fn shutdown(&mut self, await_terminate_millis: u64) {
todo!()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
pub trait ConsumeMessageServiceTrait {
fn start(&mut self);

fn shutdown(&self, await_terminate_millis: u64);
fn shutdown(&mut self, await_terminate_millis: u64);

fn update_core_pool_size(&self, core_pool_size: usize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use rocketmq_remoting::runtime::RPCHook;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tracing::error;
use tracing::info;
use tracing::warn;
Expand Down Expand Up @@ -96,6 +97,7 @@ const _1MB: u64 = 1024 * 1024;

#[derive(Clone)]
pub struct DefaultMQPushConsumerImpl {
pub(crate) global_lock: Arc<Mutex<()>>,
pub(crate) pull_time_delay_mills_when_exception: u64,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
Expand Down Expand Up @@ -139,6 +141,7 @@ impl DefaultMQPushConsumerImpl {
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
) -> Self {
let mut this = Self {
global_lock: Arc::new(Default::default()),
pull_time_delay_mills_when_exception: 3_000,
client_config: ArcRefCellWrapper::new(client_config.clone()),
consumer_config: consumer_config.clone(),
Expand Down Expand Up @@ -372,6 +375,52 @@ impl DefaultMQPushConsumerImpl {
Ok(())
}

pub async fn shutdown(&mut self, await_terminate_millis: u64) {
let _lock = self.global_lock.lock().await;
match *self.service_state {
ServiceState::CreateJust => {
warn!(
"the consumer [{}] do not start, so do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::Running => {
if let Some(consume_message_concurrently_service) =
self.consume_message_concurrently_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
}
self.persist_consumer_offset().await;
let client = self.client_instance.as_mut().unwrap();
client
.unregister_consumer(self.consumer_config.consumer_group.as_str())
.await;
client.shutdown().await;
info!(
"the consumer [{}] shutdown OK",
self.consumer_config.consumer_group.as_str()
);
self.rebalance_impl.destroy();
*self.service_state = ServiceState::ShutdownAlready;
}
ServiceState::ShutdownAlready => {
warn!(
"the consumer [{}] has been shutdown, do nothing",
self.consumer_config.consumer_group
);
}
ServiceState::StartFailed => {
warn!(
"the consumer [{}] start failed, do nothing",
self.consumer_config.consumer_group
);
}
}
drop(_lock);
}

async fn update_topic_subscribe_info_when_subscription_changed(&mut self) {
if DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED {
return;
Expand Down Expand Up @@ -1239,8 +1288,27 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl {
Ok(false)
}

fn persist_consumer_offset(&self) {
todo!()
async fn persist_consumer_offset(&self) {
if let Err(err) = self.make_sure_state_ok() {
error!(
"group: {} persistConsumerOffset exception:{}",
self.consumer_config.consumer_group, err
);
} else {
let guard = self
.rebalance_impl
.rebalance_impl_inner
.process_queue_table
.read()
.await;
let allocate_mq = guard.keys().cloned().collect::<HashSet<_>>();
self.offset_store
.as_ref()
.unwrap()
.mut_from_ref()
.persist_all(&allocate_mq)
.await;
}
}

async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
Expand Down
25 changes: 14 additions & 11 deletions rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::factory::mq_client_instance::MQClientInstance;
use crate::hook::filter_message_context::FilterMessageContext;
use crate::hook::filter_message_hook::FilterMessageHook;
use crate::implementation::communication_mode::CommunicationMode;
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::Result;

#[derive(Clone)]
Expand Down Expand Up @@ -107,6 +108,7 @@ impl PullAPIWrapper {
self.client_instance.client_config.decode_read_body,
self.client_instance.client_config.decode_decompress_body,
);

let mut need_decode_inner_message = false;
for msg in &msg_vec {
if MessageSysFlag::check(
Expand Down Expand Up @@ -191,6 +193,7 @@ impl PullAPIWrapper {
msg.message_ext_inner.queue_offset += offset_delta;
}
}

pull_result_ext.pull_result.msg_found_list = msg_list_filter_again
.into_iter()
.map(ArcRefCellWrapper::new)
Expand Down Expand Up @@ -233,7 +236,7 @@ impl PullAPIWrapper {
pull_callback: PCB,
) -> Result<Option<PullResultExt>>
where
PCB: PullCallback,
PCB: PullCallback + 'static,
{
let broker_name = self
.client_instance
Expand Down Expand Up @@ -320,16 +323,16 @@ impl PullAPIWrapper {
.compute_pull_from_which_filter_server(mq.get_topic(), broker_addr.as_str())
.await?;
}
self.client_instance
.get_mq_client_api_impl()
.pull_message(
broker_addr.as_str(),
request_header,
timeout_millis,
communication_mode,
pull_callback,
)
.await

MQClientAPIImpl::pull_message(
self.client_instance.get_mq_client_api_impl(),
broker_addr,
request_header,
timeout_millis,
communication_mode,
pull_callback,
)
.await
} else {
Err(MQClientErr(
-1,
Expand Down
1 change: 1 addition & 0 deletions rocketmq-client/src/consumer/consumer_impl/re_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ pub trait RebalanceLocal {
async fn do_rebalance(&mut self, is_order: bool) -> bool;

fn client_rebalance(&mut self, topic: &str) -> bool;
fn destroy(&mut self);
}
Loading

0 comments on commit f96d323

Please sign in to comment.