diff --git a/rocketmq-client/examples/quickstart/consumer.rs b/rocketmq-client/examples/quickstart/consumer.rs index 8f258b5f..eaa2b004 100644 --- a/rocketmq-client/examples/quickstart/consumer.rs +++ b/rocketmq-client/examples/quickstart/consumer.rs @@ -32,7 +32,7 @@ pub const TAG: &str = "*"; #[rocketmq::main] pub async fn main() -> Result<()> { //init logger - rocketmq_common::log::init_logger(); + //rocketmq_common::log::init_logger(); // create a producer builder with default configuration let builder = DefaultMQPushConsumer::builder(); diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs index f541b2de..abe18e06 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs @@ -30,7 +30,6 @@ use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; -use rocketmq_runtime::RocketMQRuntime; use tracing::info; use tracing::warn; @@ -53,7 +52,7 @@ pub struct ConsumeMessageConcurrentlyService { pub(crate) consumer_config: ArcRefCellWrapper, pub(crate) consumer_group: Arc, pub(crate) message_listener: ArcBoxMessageListenerConcurrently, - pub(crate) consume_runtime: Arc, + // pub(crate) consume_runtime: Arc, } impl ConsumeMessageConcurrentlyService { @@ -71,10 +70,10 @@ impl ConsumeMessageConcurrentlyService { consumer_config, consumer_group: Arc::new(consumer_group), message_listener, - consume_runtime: Arc::new(RocketMQRuntime::new_multi( + /*consume_runtime: Arc::new(RocketMQRuntime::new_multi( consume_thread as usize, "ConsumeMessageThread_", - )), + )),*/ } } } @@ -140,12 +139,6 @@ impl ConsumeMessageConcurrentlyService { } if !msg_back_failed.is_empty() { consume_request.msgs.append(&mut msg_back_success); - /* let msg_back_failed_switched = msg_back_failed - .into_iter() - .map(|msg| MessageClientExt { - message_ext_inner: msg, - }) - .collect();*/ self.submit_consume_request_later( msg_back_failed, consume_request.process_queue.clone(), @@ -182,7 +175,7 @@ impl ConsumeMessageConcurrentlyService { message_queue: MessageQueue, ) { let this = self.clone(); - self.consume_runtime.get_handle().spawn(async move { + tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(5)).await; this.submit_consume_request(msgs, process_queue, message_queue, true) .await; @@ -231,8 +224,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { }); } - fn shutdown(&self, await_terminate_millis: u64) { - todo!() + fn shutdown(&mut self, await_terminate_millis: u64) { + // todo!() } fn update_core_pool_size(&self, core_pool_size: usize) { @@ -267,7 +260,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { dispatch_to_consume: bool, ) { let consume_batch_size = self.consumer_config.consume_message_batch_max_size; - if msgs.len() < consume_batch_size as usize { + if msgs.len() <= consume_batch_size as usize { let mut consume_request = ConsumeRequest { msgs: msgs.clone(), message_listener: self.message_listener.clone(), @@ -278,7 +271,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), }; let consume_message_concurrently_service = self.clone(); - self.consume_runtime.get_handle().spawn(async move { + + tokio::spawn(async move { consume_request .run(consume_message_concurrently_service) .await @@ -301,7 +295,12 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), }; let consume_message_concurrently_service = self.clone(); - self.consume_runtime.get_handle().spawn(async move { + /* self.consume_runtime.get_handle().spawn(async move { + consume_request + .run(consume_message_concurrently_service) + .await + });*/ + tokio::spawn(async move { consume_request .run(consume_message_concurrently_service) .await diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs index feb8c9e6..7137682f 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs @@ -33,8 +33,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService { todo!() } - fn shutdown(&self, await_terminate_millis: u64) { - todo!() + fn shutdown(&mut self, await_terminate_millis: u64) { + unimplemented!("shutdown") } fn update_core_pool_size(&self, core_pool_size: usize) { diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs index f11f10c4..f2ba89cb 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs @@ -61,7 +61,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService { // nothing to do } - fn shutdown(&self, await_terminate_millis: u64) { + fn shutdown(&mut self, await_terminate_millis: u64) { todo!() } diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs index 68a90486..40ee7f47 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs @@ -33,7 +33,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService { todo!() } - fn shutdown(&self, await_terminate_millis: u64) { + fn shutdown(&mut self, await_terminate_millis: u64) { todo!() } diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs index 1d124735..ecf58a89 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs @@ -45,7 +45,7 @@ where pub trait ConsumeMessageServiceTrait { fn start(&mut self); - fn shutdown(&self, await_terminate_millis: u64); + fn shutdown(&mut self, await_terminate_millis: u64); fn update_core_pool_size(&self, core_pool_size: usize); diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index e00ae46d..b7e215c6 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -47,6 +47,7 @@ use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; use rocketmq_remoting::runtime::RPCHook; use tokio::runtime::Handle; +use tokio::sync::Mutex; use tracing::error; use tracing::info; use tracing::warn; @@ -96,6 +97,7 @@ const _1MB: u64 = 1024 * 1024; #[derive(Clone)] pub struct DefaultMQPushConsumerImpl { + pub(crate) global_lock: Arc>, pub(crate) pull_time_delay_mills_when_exception: u64, pub(crate) client_config: ArcRefCellWrapper, pub(crate) consumer_config: ArcRefCellWrapper, @@ -139,6 +141,7 @@ impl DefaultMQPushConsumerImpl { rpc_hook: Option>>, ) -> Self { let mut this = Self { + global_lock: Arc::new(Default::default()), pull_time_delay_mills_when_exception: 3_000, client_config: ArcRefCellWrapper::new(client_config.clone()), consumer_config: consumer_config.clone(), @@ -372,6 +375,52 @@ impl DefaultMQPushConsumerImpl { Ok(()) } + pub async fn shutdown(&mut self, await_terminate_millis: u64) { + let _lock = self.global_lock.lock().await; + match *self.service_state { + ServiceState::CreateJust => { + warn!( + "the consumer [{}] do not start, so do nothing", + self.consumer_config.consumer_group + ); + } + ServiceState::Running => { + if let Some(consume_message_concurrently_service) = + self.consume_message_concurrently_service.as_mut() + { + consume_message_concurrently_service + .consume_message_concurrently_service + .shutdown(await_terminate_millis); + } + self.persist_consumer_offset().await; + let client = self.client_instance.as_mut().unwrap(); + client + .unregister_consumer(self.consumer_config.consumer_group.as_str()) + .await; + client.shutdown().await; + info!( + "the consumer [{}] shutdown OK", + self.consumer_config.consumer_group.as_str() + ); + self.rebalance_impl.destroy(); + *self.service_state = ServiceState::ShutdownAlready; + } + ServiceState::ShutdownAlready => { + warn!( + "the consumer [{}] has been shutdown, do nothing", + self.consumer_config.consumer_group + ); + } + ServiceState::StartFailed => { + warn!( + "the consumer [{}] start failed, do nothing", + self.consumer_config.consumer_group + ); + } + } + drop(_lock); + } + async fn update_topic_subscribe_info_when_subscription_changed(&mut self) { if DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED { return; @@ -1239,8 +1288,27 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl { Ok(false) } - fn persist_consumer_offset(&self) { - todo!() + async fn persist_consumer_offset(&self) { + if let Err(err) = self.make_sure_state_ok() { + error!( + "group: {} persistConsumerOffset exception:{}", + self.consumer_config.consumer_group, err + ); + } else { + let guard = self + .rebalance_impl + .rebalance_impl_inner + .process_queue_table + .read() + .await; + let allocate_mq = guard.keys().cloned().collect::>(); + self.offset_store + .as_ref() + .unwrap() + .mut_from_ref() + .persist_all(&allocate_mq) + .await; + } } async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet) { diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs index 1f37d4f4..5069ff03 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs @@ -43,6 +43,7 @@ use crate::factory::mq_client_instance::MQClientInstance; use crate::hook::filter_message_context::FilterMessageContext; use crate::hook::filter_message_hook::FilterMessageHook; use crate::implementation::communication_mode::CommunicationMode; +use crate::implementation::mq_client_api_impl::MQClientAPIImpl; use crate::Result; #[derive(Clone)] @@ -107,6 +108,7 @@ impl PullAPIWrapper { self.client_instance.client_config.decode_read_body, self.client_instance.client_config.decode_decompress_body, ); + let mut need_decode_inner_message = false; for msg in &msg_vec { if MessageSysFlag::check( @@ -191,6 +193,7 @@ impl PullAPIWrapper { msg.message_ext_inner.queue_offset += offset_delta; } } + pull_result_ext.pull_result.msg_found_list = msg_list_filter_again .into_iter() .map(ArcRefCellWrapper::new) @@ -233,7 +236,7 @@ impl PullAPIWrapper { pull_callback: PCB, ) -> Result> where - PCB: PullCallback, + PCB: PullCallback + 'static, { let broker_name = self .client_instance @@ -320,16 +323,16 @@ impl PullAPIWrapper { .compute_pull_from_which_filter_server(mq.get_topic(), broker_addr.as_str()) .await?; } - self.client_instance - .get_mq_client_api_impl() - .pull_message( - broker_addr.as_str(), - request_header, - timeout_millis, - communication_mode, - pull_callback, - ) - .await + + MQClientAPIImpl::pull_message( + self.client_instance.get_mq_client_api_impl(), + broker_addr, + request_header, + timeout_millis, + communication_mode, + pull_callback, + ) + .await } else { Err(MQClientErr( -1, diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs index 93fc6751..c41eb56b 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs @@ -70,4 +70,5 @@ pub trait RebalanceLocal { async fn do_rebalance(&mut self, is_order: bool) -> bool; fn client_rebalance(&mut self, topic: &str) -> bool; + fn destroy(&mut self); } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 0ef777d8..e0de96ba 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -273,7 +273,11 @@ where if !all_mq_locked { self.client_instance.as_mut().unwrap().rebalance_later(500); } - + println!( + "PullRequestList============================={}================{}", + topic, + pull_request_list.len() + ); sub_rebalance_impl .dispatch_pull_request(pull_request_list, 500) .await; @@ -290,13 +294,14 @@ where let topic_sub_cloned = self.topic_subscribe_info_table.clone(); let topic_subscribe_info_table_inner = topic_sub_cloned.write().await; let mq_set = topic_subscribe_info_table_inner.get(topic); - let ci_all = self + //get consumer id list from broker + let cid_all = self .client_instance .as_mut() .unwrap() .find_consumer_id_list(topic, self.consumer_group.as_ref().unwrap()) .await; - if ci_all.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { + if cid_all.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { if let Some(mut sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() { @@ -310,30 +315,28 @@ where ); } } - if ci_all.is_none() { + if cid_all.is_none() { warn!( "doRebalance, {}, but the topic[{}] not exist.", self.consumer_group.as_ref().unwrap(), topic ); } - if mq_set.is_some() && ci_all.is_some() { + if mq_set.is_some() && cid_all.is_some() { let mq_set = mq_set.unwrap(); let mut mq_all = mq_set.iter().cloned().collect::>(); mq_all.sort(); - let mut ci_all = ci_all.unwrap(); + let mut ci_all = cid_all.unwrap(); ci_all.sort(); - let allocate_result = match self - .allocate_message_queue_strategy - .as_ref() - .unwrap() - .allocate( - self.consumer_group.as_ref().unwrap(), - self.client_instance.as_ref().unwrap().client_id.as_ref(), - mq_all.as_slice(), - ci_all.as_slice(), - ) { + let strategy = self.allocate_message_queue_strategy.as_ref().unwrap(); + let strategy_name = strategy.get_name(); + let allocate_result = match strategy.allocate( + self.consumer_group.as_ref().unwrap(), + self.client_instance.as_ref().unwrap().client_id.as_ref(), + mq_all.as_slice(), + ci_all.as_slice(), + ) { Ok(value) => value, Err(e) => { error!( @@ -358,7 +361,21 @@ where ) .await; if changed { - // info + info!( + "client rebalanced result changed. \ + allocateMessageQueueStrategyName={}, group={}, topic={}, \ + clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, \ + rebalanceResultSet={:?}", + strategy_name, + self.consumer_group.as_ref().unwrap(), + topic, + self.client_instance.as_ref().unwrap().client_id, + mq_set.len(), + ci_all.len(), + allocate_result_set.len(), + allocate_result_set + ); + if let Some(mut sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() { diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index f3a3dd59..34256bc9 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -459,4 +459,8 @@ impl Rebalance for RebalancePushImpl { false } } + + fn destroy(&mut self) { + unimplemented!() + } } diff --git a/rocketmq-client/src/consumer/mq_consumer_inner.rs b/rocketmq-client/src/consumer/mq_consumer_inner.rs index 4e34b7c5..3328d9b5 100644 --- a/rocketmq-client/src/consumer/mq_consumer_inner.rs +++ b/rocketmq-client/src/consumer/mq_consumer_inner.rs @@ -41,7 +41,7 @@ pub trait MQConsumerInnerLocal: MQConsumerInnerAny + Sync + 'static { async fn try_rebalance(&self) -> Result; - fn persist_consumer_offset(&self); + async fn persist_consumer_offset(&self); async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet); diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index b1d81772..f1c9f7d0 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -253,6 +253,8 @@ where Ok(()) } + pub async fn shutdown(&mut self) {} + pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool { if group.is_empty() { return false; @@ -551,7 +553,10 @@ where } pub async fn persist_all_consumer_offset(&mut self) { - println!("updateTopicRouteInfoFromNameServer") + let consumer_table = self.consumer_table.read().await; + for (_, value) in consumer_table.iter() { + value.persist_consumer_offset().await; + } } pub async fn clean_offline_broker(&mut self) { @@ -935,6 +940,43 @@ where let consumer_table = self.consumer_table.read().await; consumer_table.get(group).cloned() } + + pub async fn unregister_consumer(&mut self, group: impl Into) { + self.unregister_client(None, Some(group.into())).await; + } + pub async fn unregister_producer(&mut self, group: impl Into) { + self.unregister_client(Some(group.into()), None).await; + } + + async fn unregister_client( + &mut self, + producer_group: Option, + consumer_group: Option, + ) { + let broker_addr_table = self.broker_addr_table.read().await; + for (broker_name, broker_addrs) in broker_addr_table.iter() { + for (id, addr) in broker_addrs.iter() { + if let Err(err) = self + .mq_client_api_impl + .unregister_client( + addr, + self.client_id.as_str(), + producer_group.clone(), + consumer_group.clone(), + self.client_config.mq_client_api_timeout, + ) + .await + { + } else { + info!( + "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \ + success", + producer_group, consumer_group, broker_name, id, addr, + ); + } + } + } + } } pub fn topic_route_data2topic_publish_info( diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 9ab2cc50..5cf467bf 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -51,6 +51,7 @@ use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessag use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader; use rocketmq_remoting::protocol::header::query_consumer_offset_request_header::QueryConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader; +use rocketmq_remoting::protocol::header::unregister_client_request_header::UnregisterClientRequestHeader; use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; @@ -832,15 +833,15 @@ impl MQClientAPIImpl { } pub async fn pull_message( - &mut self, - addr: &str, + mut this: ArcRefCellWrapper, + addr: String, request_header: PullMessageRequestHeader, timeout_millis: u64, communication_mode: CommunicationMode, pull_callback: PCB, ) -> Result> where - PCB: PullCallback, + PCB: PullCallback + 'static, { let request = if PullSysFlag::has_lite_pull_flag(request_header.sys_flag as u32) { RemotingCommand::create_request_command(RequestCode::LitePullMessage, request_header) @@ -849,14 +850,22 @@ impl MQClientAPIImpl { }; match communication_mode { CommunicationMode::Sync => { - let result_ext = self - .pull_message_sync(addr, request, timeout_millis) + let result_ext = this + .pull_message_sync(addr.as_str(), request, timeout_millis) .await?; Ok(Some(result_ext)) } CommunicationMode::Async => { - self.pull_message_async(addr, request, timeout_millis, pull_callback) - .await?; + tokio::spawn(async move { + let instant = Instant::now(); + let _ = this + .pull_message_async(addr.as_str(), request, timeout_millis, pull_callback) + .await; + println!( + ">>>>>>>>>>>>>>>>>>pull_message_async cost: {:?}", + instant.elapsed() + ); + }); Ok(None) } CommunicationMode::Oneway => Ok(None), @@ -892,7 +901,12 @@ impl MQClientAPIImpl { .await { Ok(response) => { + println!( + "++++++++++++++++++++++++pull_message_async response: {}", + response + ); let result = self.process_pull_response(response, addr).await; + match result { Ok(pull_result) => { pull_callback.on_success(pull_result).await; @@ -994,4 +1008,35 @@ impl MQClientAPIImpl { )) } } + + pub async fn unregister_client( + &mut self, + addr: &str, + client_id: &str, + producer_group: Option, + consumer_group: Option, + timeout_millis: u64, + ) -> Result<()> { + let request_header = UnregisterClientRequestHeader { + client_id: client_id.to_string(), + producer_group, + consumer_group, + rpc_request_header: None, + }; + let request = + RemotingCommand::create_request_command(RequestCode::UnregisterClient, request_header); + let response = self + .remoting_client + .invoke_async(Some(addr.to_string()), request, timeout_millis) + .await?; + if ResponseCode::from(response.code()) == ResponseCode::Success { + Ok(()) + } else { + Err(MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + } } diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index e673b89c..5a09123e 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -373,7 +373,8 @@ impl RemotingClient for RocketmqD Some(mut client) => { self.client_runtime.get_handle().spawn(async move { match time::timeout(Duration::from_millis(timeout_millis), async move { - //client.lock().await.send(request).await + let mut request = request; + request.mark_oneway_rpc_ref(); client.send(request).await }) .await diff --git a/rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs b/rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs index fa54572c..df6568c5 100644 --- a/rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs +++ b/rocketmq-remoting/src/protocol/header/unregister_client_request_header.rs @@ -31,7 +31,7 @@ pub struct UnregisterClientRequestHeader { pub producer_group: Option, pub consumer_group: Option, #[serde(flatten)] - pub rpc_request_header: RpcRequestHeader, + pub rpc_request_header: Option, } impl UnregisterClientRequestHeader { @@ -55,27 +55,33 @@ impl FromMap for UnregisterClientRequestHeader { consumer_group: map .get(UnregisterClientRequestHeader::CONSUMER_GROUP) .cloned(), - rpc_request_header: ::from(map).unwrap(), + rpc_request_header: ::from(map), }) } } impl CommandCustomHeader for UnregisterClientRequestHeader { fn to_map(&self) -> Option> { - let mut map = self.rpc_request_header.to_map(); - map.as_mut() - .unwrap() - .insert(Self::CLIENT_ID.to_string(), self.client_id.clone()); - if let Some(ref producer_group) = self.producer_group { - map.as_mut() - .unwrap() - .insert(Self::PRODUCER_GROUP.to_string(), producer_group.clone()); + let mut map = HashMap::new(); + map.insert( + UnregisterClientRequestHeader::CLIENT_ID.to_string(), + self.client_id.clone(), + ); + if let Some(producer_group) = &self.producer_group { + map.insert( + UnregisterClientRequestHeader::PRODUCER_GROUP.to_string(), + producer_group.clone(), + ); } - if let Some(ref consumer_group) = self.consumer_group { - map.as_mut() - .unwrap() - .insert(Self::CONSUMER_GROUP.to_string(), consumer_group.clone()); + if let Some(consumer_group) = &self.consumer_group { + map.insert( + UnregisterClientRequestHeader::CONSUMER_GROUP.to_string(), + consumer_group.clone(), + ); } - map + if let Some(rpc_request_header) = &self.rpc_request_header { + map.extend(rpc_request_header.to_map()?); + } + Some(map) } } diff --git a/rocketmq-remoting/src/protocol/remoting_command.rs b/rocketmq-remoting/src/protocol/remoting_command.rs index b7b3877a..3f40723b 100644 --- a/rocketmq-remoting/src/protocol/remoting_command.rs +++ b/rocketmq-remoting/src/protocol/remoting_command.rs @@ -357,6 +357,10 @@ impl RemotingCommand { self.flag |= mark; self } + pub fn mark_oneway_rpc_ref(&mut self) { + let mark = 1 << Self::RPC_ONEWAY; + self.flag |= mark; + } pub fn get_serialize_type(&self) -> SerializeType { self.serialize_type