diff --git a/Cargo.lock b/Cargo.lock index f310fa46..cdd0710f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1880,6 +1880,7 @@ name = "rocketmq-client" version = "0.3.0" dependencies = [ "bytes", + "futures", "lazy_static", "num_cpus", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 749d74ac..a846d46a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,4 +76,7 @@ cfg-if = "1.0.0" sysinfo = "0.31.4" uuid = { version = "1.10.0", features = ["v4", # Lets you generate random UUIDs "fast-rng", # Use a faster (but still sufficiently random) RNG - "macro-diagnostics", ] } \ No newline at end of file + "macro-diagnostics", ] } + + +futures = "0.3" \ No newline at end of file diff --git a/rocketmq-client/Cargo.toml b/rocketmq-client/Cargo.toml index a6ba04c6..645ec29a 100644 --- a/rocketmq-client/Cargo.toml +++ b/rocketmq-client/Cargo.toml @@ -39,6 +39,9 @@ regex = { version = "1.10.6", features = [] } parking_lot = { workspace = true } once_cell = { workspace = true } bytes = { workspace = true } + + +futures = { workspace = true } [[example]] name = "simple-producer" path = "examples/producer/simple_producer.rs" diff --git a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs index 13fea726..72c9bea9 100644 --- a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs +++ b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs @@ -30,6 +30,7 @@ use rocketmq_common::MessageAccessor::MessageAccessor; use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo; +use rocketmq_rust::RocketMQTokioRwLock; use tokio::sync::RwLock; use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; @@ -56,7 +57,7 @@ pub(crate) struct ProcessQueue { Arc>>>, pub(crate) msg_count: Arc, pub(crate) msg_size: Arc, - pub(crate) consume_lock: Arc>, + pub(crate) consume_lock: Arc>, pub(crate) consuming_msg_orderly_tree_map: Arc>>>, pub(crate) try_unlock_times: Arc, @@ -77,7 +78,7 @@ impl ProcessQueue { msg_tree_map: Arc::new(RwLock::new(std::collections::BTreeMap::new())), msg_count: Arc::new(AtomicU64::new(0)), msg_size: Arc::new(AtomicU64::new(0)), - consume_lock: Arc::new(RwLock::new(())), + consume_lock: Arc::new(RocketMQTokioRwLock::new(())), consuming_msg_orderly_tree_map: Arc::new( RwLock::new(std::collections::BTreeMap::new()), ), @@ -124,6 +125,10 @@ impl ProcessQueue { > *REBALANCE_LOCK_MAX_LIVE_TIME } + pub(crate) fn inc_try_unlock_times(&self) { + self.try_unlock_times.fetch_add(1, Ordering::AcqRel); + } + pub(crate) async fn clean_expired_msg( &self, push_consumer: Option>, @@ -308,6 +313,11 @@ impl ProcessQueue { .store(last_pull_timestamp, std::sync::atomic::Ordering::Release); } + pub(crate) fn set_last_lock_timestamp(&self, last_lock_timestamp: u64) { + self.last_lock_timestamp + .store(last_lock_timestamp, std::sync::atomic::Ordering::Release); + } + pub fn msg_count(&self) -> u64 { self.msg_count.load(std::sync::atomic::Ordering::Acquire) } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs index c41eb56b..be2a1d42 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance.rs @@ -44,6 +44,7 @@ pub trait RebalanceLocal { ) -> bool; fn remove_unnecessary_pop_message_queue(&self, mq: MessageQueue, pq: ProcessQueue) -> bool; + fn remove_unnecessary_pop_message_queue_pop( &self, _mq: MessageQueue, @@ -53,22 +54,34 @@ pub trait RebalanceLocal { } fn consume_type(&self) -> ConsumeType; - async fn remove_dirty_offset(&self, mq: &MessageQueue); + + async fn remove_dirty_offset(&mut self, mq: &MessageQueue); async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result; + async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64; fn get_consume_init_mode(&self) -> i32; + async fn dispatch_pull_request(&self, pull_request_list: Vec, delay: u64); + fn dispatch_pop_pull_request(&self, pull_request_list: Vec, delay: u64); + fn create_process_queue(&self) -> ProcessQueue; + fn create_pop_process_queue(&self) -> PopProcessQueue; + async fn remove_process_queue(&mut self, mq: &MessageQueue); - fn unlock(&self, mq: MessageQueue, oneway: bool); + + async fn unlock(&mut self, mq: &MessageQueue, oneway: bool); + fn lock_all(&self); + fn unlock_all(&self, oneway: bool); + async fn do_rebalance(&mut self, is_order: bool) -> bool; fn client_rebalance(&mut self, topic: &str) -> bool; + fn destroy(&mut self); } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 048755fd..86cff0c1 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -17,12 +17,15 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::ops::DerefMut; use std::sync::Arc; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; +use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; @@ -173,45 +176,55 @@ where let mut changed = false; let mut remove_queue_map = HashMap::new(); let process_queue_table_cloned = self.process_queue_table.clone(); - let mut process_queue_table = process_queue_table_cloned.write().await; - // Drop process queues no longer belong to me - for (mq, pq) in process_queue_table.iter() { - if mq.get_topic() == topic { - if !mq_set.contains(mq) { - pq.set_dropped(true); - remove_queue_map.insert(mq.clone(), pq.clone()); - } else if pq.is_pull_expired() { - if let Some(sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade() - { - if sub_rebalance.consume_type() == ConsumeType::ConsumePassively { - pq.set_dropped(true); - remove_queue_map.insert(mq.clone(), pq.clone()); - error!( - "[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, because \ - pull is pause, so try to fixed it", - self.consumer_group, - mq.get_topic() - ); + { + let process_queue_table = process_queue_table_cloned.read().await; + // Drop process queues no longer belong to me + for (mq, pq) in process_queue_table.iter() { + if mq.get_topic() == topic { + if !mq_set.contains(mq) { + pq.set_dropped(true); + remove_queue_map.insert(mq.clone(), pq.clone()); + } else if pq.is_pull_expired() { + if let Some(sub_rebalance) = + self.sub_rebalance_impl.as_mut().unwrap().upgrade() + { + if sub_rebalance.consume_type() == ConsumeType::ConsumePassively { + pq.set_dropped(true); + remove_queue_map.insert(mq.clone(), pq.clone()); + error!( + "[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, \ + because pull is pause, so try to fixed it", + self.consumer_group, + mq.get_topic() + ); + } } } } } } - // Remove message queues no longer belong to me - for (mq, pq) in remove_queue_map { - if let Some(mut sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade() { - if sub_rebalance - .remove_unnecessary_message_queue(&mq, &pq) - .await - { - process_queue_table.remove(&mq); - changed = true; - info!( - "doRebalance, {:?}, remove unnecessary mq, {}", - self.consumer_group, - mq.get_topic() - ); + { + if !remove_queue_map.is_empty() { + let mut process_queue_table = process_queue_table_cloned.write().await; + // Remove message queues no longer belong to me + for (mq, pq) in remove_queue_map { + if let Some(mut sub_rebalance) = + self.sub_rebalance_impl.as_mut().unwrap().upgrade() + { + if sub_rebalance + .remove_unnecessary_message_queue(&mq, &pq) + .await + { + process_queue_table.remove(&mq); + changed = true; + info!( + "doRebalance, {:?}, remove unnecessary mq, {}", + self.consumer_group, + mq.get_topic() + ); + } + } } } } @@ -223,9 +236,10 @@ where return false; } let mut sub_rebalance_impl = sub_rebalance_impl.unwrap(); + let mut process_queue_table = process_queue_table_cloned.write().await; for mq in mq_set { if !process_queue_table.contains_key(mq) { - if is_order && !self.lock(mq) { + if is_order && !self.lock(mq, process_queue_table.deref_mut()).await { warn!( "doRebalance, {:?}, add a new mq failed, {}, because lock failed", self.consumer_group, @@ -236,7 +250,7 @@ where } sub_rebalance_impl.remove_dirty_offset(mq).await; - let pq = Arc::new(ProcessQueue::new()); + let pq = Arc::new(sub_rebalance_impl.create_process_queue()); pq.set_locked(true); let next_offset = sub_rebalance_impl.compute_pull_from_where(mq).await; if next_offset >= 0 { @@ -404,7 +418,193 @@ where queue_set } - pub fn lock(&self, mq: &MessageQueue) -> bool { - unimplemented!() + pub async fn lock( + &mut self, + mq: &MessageQueue, + process_queue_table: &mut HashMap>, + ) -> bool { + let client = self.client_instance.as_mut().unwrap(); + let broker_name = client.get_broker_name_from_message_queue(mq).await; + let find_broker_result = client + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true) + .await; + if let Some(find_broker_result) = find_broker_result { + let mut request_body = LockBatchRequestBody { + consumer_group: Some(self.consumer_group.clone().unwrap()), + client_id: Some(client.client_id.clone()), + ..Default::default() + }; + request_body.mq_set.insert(mq.clone()); + let result = client + .mq_client_api_impl + .as_mut() + .unwrap() + .lock_batch_mq(find_broker_result.broker_addr.as_str(), request_body, 1_000) + .await; + match result { + Ok(locked_mq) => { + for mq in &locked_mq { + if let Some(pq) = process_queue_table.get(mq) { + pq.set_locked(true); + pq.set_last_pull_timestamp(get_current_millis()); + } + } + let lock_ok = locked_mq.contains(mq); + info!( + "message queue lock {}, {:?} {}", + lock_ok, self.consumer_group, mq + ); + lock_ok + } + Err(e) => { + error!("lockBatchMQ exception {},{}", mq, e); + false + } + } + } else { + false + } + } + + pub async fn lock_all(&mut self) { + let broker_mqs = self.build_process_queue_table_by_broker_name().await; + + let map = broker_mqs + .into_iter() + .map(|(broker_name, mqs)| { + let mut client_instance = self.client_instance.clone(); + let process_queue_table = self.process_queue_table.clone(); + let consumer_group = self.consumer_group.clone().unwrap(); + async move { + if mqs.is_empty() { + return; + } + let client = client_instance.as_mut().unwrap(); + let find_broker_result = client + .find_broker_address_in_subscribe( + broker_name.as_str(), + mix_all::MASTER_ID, + true, + ) + .await; + if let Some(find_broker_result) = find_broker_result { + let request_body = LockBatchRequestBody { + consumer_group: Some(consumer_group.to_owned()), + client_id: Some(client.client_id.clone()), + mq_set: mqs.clone(), + ..Default::default() + }; + let result = client + .mq_client_api_impl + .as_mut() + .unwrap() + .lock_batch_mq( + find_broker_result.broker_addr.as_str(), + request_body, + 1_000, + ) + .await; + match result { + Ok(lock_okmqset) => { + let process_queue_table = process_queue_table.read().await; + for mq in &mqs { + if let Some(pq) = process_queue_table.get(mq) { + if lock_okmqset.contains(mq) { + if pq.is_locked() { + info!( + "the message queue locked OK, Group: {:?} {}", + consumer_group, mq + ); + } + pq.set_locked(true); + pq.set_last_lock_timestamp(get_current_millis()); + } else { + pq.set_locked(false); + warn!( + "the message queue locked Failed, Group: {:?} {}", + consumer_group, mq + ); + } + } + } + } + Err(e) => { + error!("lockBatchMQ exception {}", e); + } + } + } + } + }) + .collect::>(); + futures::future::join_all(map).await; + + /* for (broker_name, mqs) in broker_mqs { + if mqs.is_empty() { + continue; + } + let client = self.client_instance.as_mut().unwrap(); + let find_broker_result = client + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true) + .await; + if let Some(find_broker_result) = find_broker_result { + let request_body = LockBatchRequestBody { + consumer_group: Some(self.consumer_group.clone().unwrap()), + client_id: Some(client.client_id.clone()), + mq_set: mqs.clone(), + ..Default::default() + }; + let result = client + .mq_client_api_impl + .as_mut() + .unwrap() + .lock_batch_mq(find_broker_result.broker_addr.as_str(), request_body, 1_000) + .await; + match result { + Ok(lock_okmqset) => { + let process_queue_table = self.process_queue_table.read().await; + for mq in &mqs { + if let Some(pq) = process_queue_table.get(mq) { + if lock_okmqset.contains(mq) { + if pq.is_locked() { + info!( + "the message queue locked OK, Group: {:?} {}", + self.consumer_group, mq + ); + } + pq.set_locked(true); + pq.set_last_lock_timestamp(get_current_millis()); + } else { + pq.set_locked(false); + warn!( + "the message queue locked Failed, Group: {:?} {}", + self.consumer_group, mq + ); + } + } + } + } + Err(e) => { + error!("lockBatchMQ exception {}", e); + } + } + } + }*/ + } + + async fn build_process_queue_table_by_broker_name( + &self, + ) -> HashMap> { + let mut result = HashMap::new(); + let process_queue_table = self.process_queue_table.read().await; + let client = self.client_instance.as_ref().unwrap(); + for (mq, pq) in process_queue_table.iter() { + if pq.is_dropped() { + continue; + } + let broker_name = client.get_broker_name_from_message_queue(mq).await; + let entry = result.entry(broker_name).or_insert(HashSet::new()); + entry.insert(mq.to_owned()); + } + result } } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index 8fbac49c..659c0e14 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use once_cell::sync::Lazy; use rocketmq_common::common::constant::consume_init_mode::ConsumeInitMode; @@ -28,6 +29,7 @@ use rocketmq_common::ArcRefCellWrapper; use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::code::response_code::ResponseCode; +use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody; use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; @@ -136,17 +138,31 @@ impl RebalancePushImpl { { let force_unlock = pq.is_dropped() && (get_current_millis() > pq.get_last_lock_timestamp() + *UNLOCK_DELAY_TIME_MILLS); - if force_unlock { + let consume_lock = pq + .consume_lock + .try_write_timeout(Duration::from_millis(500)) + .await; + if force_unlock || consume_lock.is_some() { let offset_store = default_mqpush_consumer_impl.offset_store.as_mut().unwrap(); offset_store.persist(mq).await; offset_store.remove_offset(mq).await; + pq.set_locked(true); + self.unlock(mq, true).await; + return true; + } else { + pq.inc_try_unlock_times(); + warn!( + "Failed to acquire consume_lock for {}, incrementing try_unlock_times.", + mq + ); } } - false } } +const UNLOCK_BATCH_MQ_TIMEOUT_MS: u64 = 1_000; + impl Rebalance for RebalancePushImpl { async fn message_queue_changed( &mut self, @@ -234,7 +250,7 @@ impl Rebalance for RebalancePushImpl { ConsumeType::ConsumePassively } - async fn remove_dirty_offset(&self, mq: &MessageQueue) { + async fn remove_dirty_offset(&mut self, mq: &MessageQueue) { if let Some(mut default_mqpush_consumer_impl) = self .default_mqpush_consumer_impl .as_ref() @@ -430,8 +446,47 @@ impl Rebalance for RebalancePushImpl { } } - fn unlock(&self, mq: MessageQueue, oneway: bool) { - todo!() + async fn unlock(&mut self, mq: &MessageQueue, oneway: bool) { + let client = match self.rebalance_impl_inner.client_instance.as_mut() { + Some(client) => client, + None => { + warn!("Client instance is not available."); + return; + } + }; + let broker_name = client.get_broker_name_from_message_queue(mq).await; + let find_broker_result = client + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true) + .await; + if let Some(find_broker_result) = find_broker_result { + let mut request_body = UnlockBatchRequestBody { + consumer_group: Some(self.rebalance_impl_inner.consumer_group.clone().unwrap()), + client_id: Some(client.client_id.clone()), + ..Default::default() + }; + request_body.mq_set.insert(mq.clone()); + let result = client + .mq_client_api_impl + .as_mut() + .unwrap() + .unlock_batch_mq( + find_broker_result.broker_addr.as_str(), + request_body, + UNLOCK_BATCH_MQ_TIMEOUT_MS, + oneway, + ) + .await; + if let Err(e) = result { + warn!("unlockBatchMQ exception, {},{}", mq, e); + } else { + warn!( + "unlock messageQueue. group:{}, clientId:{}, mq:{}", + self.rebalance_impl_inner.consumer_group.as_ref().unwrap(), + client.client_id, + mq + ) + } + } } fn lock_all(&self) { diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 16197642..26b61f5d 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashSet; use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Instant; @@ -40,10 +41,14 @@ use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody; use rocketmq_remoting::protocol::body::get_consumer_listby_group_response_body::GetConsumerListByGroupResponseBody; +use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; +use rocketmq_remoting::protocol::body::response::lock_batch_response_body::LockBatchResponseBody; +use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody; use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader; use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader; use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader; use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader; +use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2; use rocketmq_remoting::protocol::header::message_operation_header::send_message_response_header::SendMessageResponseHeader; @@ -51,6 +56,7 @@ use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessag use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader; use rocketmq_remoting::protocol::header::query_consumer_offset_request_header::QueryConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::header::query_consumer_offset_response_header::QueryConsumerOffsetResponseHeader; +use rocketmq_remoting::protocol::header::unlock_batch_mq_request_header::UnlockBatchMqRequestHeader; use rocketmq_remoting::protocol::header::unregister_client_request_header::UnregisterClientRequestHeader; use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader; use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData; @@ -1035,4 +1041,88 @@ impl MQClientAPIImpl { )) } } + + pub async fn unlock_batch_mq( + &mut self, + addr: &str, + request_body: UnlockBatchRequestBody, + timeout_millis: u64, + oneway: bool, + ) -> Result<()> { + let mut request = RemotingCommand::create_request_command( + RequestCode::UnlockBatchMq, + UnlockBatchMqRequestHeader::default(), + ); + request.set_body_mut_ref(Some(request_body.encode())); + if oneway { + self.remoting_client + .invoke_oneway(addr.to_string(), request, timeout_millis) + .await; + Ok(()) + } else { + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + addr, + )), + request, + timeout_millis, + ) + .await?; + if ResponseCode::from(response.code()) == ResponseCode::Success { + Ok(()) + } else { + Err(MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + } + } + + pub async fn lock_batch_mq( + &mut self, + addr: &str, + request_body: LockBatchRequestBody, + timeout_millis: u64, + ) -> Result> { + let mut request = RemotingCommand::create_request_command( + RequestCode::LockBatchMq, + LockBatchMqRequestHeader::default(), + ); + request.set_body_mut_ref(Some(request_body.encode())); + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + addr, + )), + request, + timeout_millis, + ) + .await?; + if ResponseCode::from(response.code()) == ResponseCode::Success { + if let Some(body) = response.body() { + LockBatchResponseBody::decode(body.as_ref()) + .map(|body| body.lock_ok_mq_set) + .map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) + } else { + Err(MQBrokerError( + response.code(), + "Response body is empty".to_string(), + addr.to_string(), + )) + } + } else { + Err(MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } + } } diff --git a/rocketmq-remoting/src/protocol/body.rs b/rocketmq-remoting/src/protocol/body.rs index 49ed1db7..35ac52e5 100644 --- a/rocketmq-remoting/src/protocol/body.rs +++ b/rocketmq-remoting/src/protocol/body.rs @@ -30,5 +30,8 @@ pub mod group_list; pub mod kv_table; pub mod pop_process_queue_info; pub mod process_queue_info; +pub mod request; +pub mod response; pub mod topic; pub mod topic_info_wrapper; +pub mod unlock_batch_request_body; diff --git a/rocketmq-remoting/src/protocol/body/request.rs b/rocketmq-remoting/src/protocol/body/request.rs new file mode 100644 index 00000000..c35a5fba --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/request.rs @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub mod lock_batch_request_body; diff --git a/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs b/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs new file mode 100644 index 00000000..712b2f57 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashSet; +use std::fmt::Display; + +use rocketmq_common::common::message::message_queue::MessageQueue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct LockBatchRequestBody { + pub consumer_group: Option, + pub client_id: Option, + pub only_this_broker: bool, + pub mq_set: HashSet, +} + +impl Display for LockBatchRequestBody { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ + mq_set={:?}]", + self.consumer_group.as_ref().unwrap_or(&"".to_string()), + self.client_id.as_ref().unwrap_or(&"".to_string()), + self.only_this_broker, + self.mq_set + ) + } +} diff --git a/rocketmq-remoting/src/protocol/body/response.rs b/rocketmq-remoting/src/protocol/body/response.rs new file mode 100644 index 00000000..46cb9a50 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/response.rs @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub mod lock_batch_response_body; diff --git a/rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs b/rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs new file mode 100644 index 00000000..b7255321 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashSet; + +use rocketmq_common::common::message::message_queue::MessageQueue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct LockBatchResponseBody { + #[serde(rename = "lockOKMQSet")] + pub lock_ok_mq_set: HashSet, +} diff --git a/rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs b/rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs new file mode 100644 index 00000000..3b561310 --- /dev/null +++ b/rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashSet; +use std::fmt::Display; + +use rocketmq_common::common::message::message_queue::MessageQueue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct UnlockBatchRequestBody { + pub consumer_group: Option, + pub client_id: Option, + pub only_this_broker: bool, + pub mq_set: HashSet, +} + +impl Display for UnlockBatchRequestBody { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ + mq_set={:?}]", + self.consumer_group.as_ref().unwrap_or(&"".to_string()), + self.client_id.as_ref().unwrap_or(&"".to_string()), + self.only_this_broker, + self.mq_set + ) + } +} diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index 310797d6..868ee896 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -33,6 +33,7 @@ pub mod get_min_offset_response_header; pub mod get_topic_config_request_header; pub mod get_topic_stats_request_header; pub mod heartbeat_request_header; +pub mod lock_batch_mq_request_header; pub mod message_operation_header; pub mod namesrv; pub mod notify_consumer_ids_changed_request_header; @@ -46,6 +47,7 @@ pub mod query_topic_consume_by_who_request_header; pub mod query_topics_by_consumer_request_header; pub mod reply_message_request_header; pub mod search_offset_response_header; +pub mod unlock_batch_mq_request_header; pub mod unregister_client_request_header; pub mod update_consumer_offset_header; pub mod view_message_request_header; diff --git a/rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs b/rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs new file mode 100644 index 00000000..23683fda --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; + +use serde::Deserialize; +use serde::Serialize; + +use crate::protocol::command_custom_header::CommandCustomHeader; +use crate::protocol::command_custom_header::FromMap; +use crate::rpc::rpc_request_header::RpcRequestHeader; + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct LockBatchMqRequestHeader { + #[serde(flatten)] + pub rpc_request_header: Option, +} + +impl CommandCustomHeader for LockBatchMqRequestHeader { + fn to_map(&self) -> Option> { + let mut map = HashMap::new(); + if let Some(value) = self.rpc_request_header.as_ref() { + if let Some(value) = value.to_map() { + map.extend(value); + } + } + Some(map) + } +} + +impl FromMap for LockBatchMqRequestHeader { + type Target = Self; + + fn from(map: &HashMap) -> Option { + let rpc_request_header = ::from(map); + Some(LockBatchMqRequestHeader { rpc_request_header }) + } +} diff --git a/rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs b/rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs new file mode 100644 index 00000000..4ed90472 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; + +use serde::Deserialize; +use serde::Serialize; + +use crate::protocol::command_custom_header::CommandCustomHeader; +use crate::protocol::command_custom_header::FromMap; +use crate::rpc::rpc_request_header::RpcRequestHeader; + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct UnlockBatchMqRequestHeader { + #[serde(flatten)] + pub rpc_request_header: Option, +} + +impl CommandCustomHeader for UnlockBatchMqRequestHeader { + fn to_map(&self) -> Option> { + let mut map = HashMap::new(); + if let Some(value) = self.rpc_request_header.as_ref() { + if let Some(value) = value.to_map() { + map.extend(value); + } + } + Some(map) + } +} + +impl FromMap for UnlockBatchMqRequestHeader { + type Target = Self; + + fn from(map: &HashMap) -> Option { + let rpc_request_header = ::from(map); + Some(UnlockBatchMqRequestHeader { rpc_request_header }) + } +}