From 8a4c26410b2c781ef4a29641dcb825a6927a94c1 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 17 Feb 2024 00:43:03 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[ISSUE=20#169]=F0=9F=9A=80Implementing=20ne?= =?UTF-8?q?twork=20communication=20for=20the=20Broker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/bin/broker_bootstrap_server.rs | 3 +- rocketmq-broker/src/broker_controller.rs | 2 +- rocketmq-broker/src/lib.rs | 1 + rocketmq-broker/src/mqtrace.rs | 18 ++ .../src/mqtrace/send_message_context.rs | 56 ++++++ .../src/processor/send_message_processor.rs | 34 +++- rocketmq-common/src/common/message.rs | 1 + .../src/common/message/message_enum.rs | 111 +++++++++++ rocketmq-remoting/src/lib.rs | 2 + rocketmq-remoting/src/protocol/header.rs | 1 + .../header/message_operation_header.rs | 17 ++ .../send_message_request_header.rs | 176 ++++++++++++++++++ .../src/protocol/static_topic.rs | 1 + .../topic_queue_mapping_context.rs | 48 +++++ rocketmq-remoting/src/rpc.rs | 17 ++ .../src/rpc/rpc_request_header.rs | 32 ++++ .../src/server/rocketmq_server.rs | 38 ++-- rocketmq-store/src/status.rs | 15 ++ 18 files changed, 550 insertions(+), 23 deletions(-) create mode 100644 rocketmq-broker/src/mqtrace.rs create mode 100644 rocketmq-broker/src/mqtrace/send_message_context.rs create mode 100644 rocketmq-common/src/common/message/message_enum.rs create mode 100644 rocketmq-remoting/src/protocol/header/message_operation_header.rs create mode 100644 rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs create mode 100644 rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_context.rs create mode 100644 rocketmq-remoting/src/rpc.rs create mode 100644 rocketmq-remoting/src/rpc/rpc_request_header.rs diff --git a/rocketmq-broker/src/bin/broker_bootstrap_server.rs b/rocketmq-broker/src/bin/broker_bootstrap_server.rs index 7fa4a420..d4af2b12 100644 --- a/rocketmq-broker/src/bin/broker_bootstrap_server.rs +++ b/rocketmq-broker/src/bin/broker_bootstrap_server.rs @@ -67,7 +67,6 @@ async fn start_broker_controller(broker_controller: BrokerController) -> anyhow: "Rocketmq name server(Rust) running on {}:{}", broker_controller.broker_config.broker_ip1, broker_controller.broker_config.listen_port, ); - let future = broker_controller.start(); - join!(future); + join!(broker_controller.start()); Ok(()) } diff --git a/rocketmq-broker/src/broker_controller.rs b/rocketmq-broker/src/broker_controller.rs index fd1c2b57..1a6f43e7 100644 --- a/rocketmq-broker/src/broker_controller.rs +++ b/rocketmq-broker/src/broker_controller.rs @@ -174,7 +174,7 @@ impl BrokerController { if let Some(ref mut broker_server) = self.broker_server { broker_server.start().await; - }; + } //other service start } diff --git a/rocketmq-broker/src/lib.rs b/rocketmq-broker/src/lib.rs index 2b975180..adedd074 100644 --- a/rocketmq-broker/src/lib.rs +++ b/rocketmq-broker/src/lib.rs @@ -26,6 +26,7 @@ mod coldctr; mod controller; mod filter; mod longpolling; +mod mqtrace; mod offset; mod processor; mod schedule; diff --git a/rocketmq-broker/src/mqtrace.rs b/rocketmq-broker/src/mqtrace.rs new file mode 100644 index 00000000..11befc06 --- /dev/null +++ b/rocketmq-broker/src/mqtrace.rs @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub(crate) mod send_message_context; diff --git a/rocketmq-broker/src/mqtrace/send_message_context.rs b/rocketmq-broker/src/mqtrace/send_message_context.rs new file mode 100644 index 00000000..f2a495e3 --- /dev/null +++ b/rocketmq-broker/src/mqtrace/send_message_context.rs @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::{any::Any, collections::HashMap}; + +use rocketmq_common::common::message::message_enum::MessageType; +use rocketmq_store::status::StatsType; + +#[derive(Debug, Default)] +pub struct SendMessageContext { + pub namespace: String, + pub producer_group: String, + pub topic: String, + pub msg_id: String, + pub origin_msg_id: String, + pub queue_id: Option, + pub queue_offset: Option, + pub broker_addr: String, + pub born_host: String, + pub body_length: i32, + pub code: i32, + pub error_msg: String, + pub msg_props: String, + pub mq_trace_context: Option>, + pub ext_props: HashMap, + pub broker_region_id: String, + pub msg_unique_key: String, + pub born_time_stamp: i64, + pub request_time_stamp: i64, + pub msg_type: MessageType, + pub is_success: bool, + pub account_auth_type: String, + pub account_owner_parent: String, + pub account_owner_self: String, + pub send_msg_num: i32, + pub send_msg_size: i32, + pub send_stat: StatsType, + pub commercial_send_msg_num: i32, + pub commercial_owner: String, + pub commercial_send_stats: StatsType, + pub commercial_send_size: i32, + pub commercial_send_times: i32, +} diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 916c4146..2545a9ad 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -15,19 +15,45 @@ * limitations under the License. */ use rocketmq_remoting::{ - protocol::remoting_command::RemotingCommand, + code::request_code::RequestCode, + protocol::{ + header::message_operation_header::send_message_request_header::parse_request_header, + remoting_command::RemotingCommand, + }, runtime::{processor::RequestProcessor, server::ConnectionHandlerContext}, }; #[derive(Default)] -pub struct SendMessageProcessor {} +pub struct SendMessageProcessor { + inner: SendMessageProcessorInner, +} impl RequestProcessor for SendMessageProcessor { fn process_request( &mut self, - _ctx: ConnectionHandlerContext, - _request: RemotingCommand, + ctx: ConnectionHandlerContext, + request: RemotingCommand, ) -> RemotingCommand { + let request_code = RequestCode::from(request.code()); + match request_code { + RequestCode::ConsumerSendMsgBack => self.inner.consumer_send_msg_back(&ctx, &request), + _ => { + let _request_header = parse_request_header(&request).unwrap(); + } + } + RemotingCommand::create_response_command() + } +} + +#[derive(Default)] +struct SendMessageProcessorInner {} + +impl SendMessageProcessorInner { + fn consumer_send_msg_back( + &mut self, + _ctx: &ConnectionHandlerContext, + _request: &RemotingCommand, + ) { todo!() } } diff --git a/rocketmq-common/src/common/message.rs b/rocketmq-common/src/common/message.rs index 8e7ea80c..a5251420 100644 --- a/rocketmq-common/src/common/message.rs +++ b/rocketmq-common/src/common/message.rs @@ -23,6 +23,7 @@ use std::{ use lazy_static::lazy_static; pub mod message_batch; +pub mod message_enum; pub mod message_id; pub mod message_queue; pub mod message_single; diff --git a/rocketmq-common/src/common/message/message_enum.rs b/rocketmq-common/src/common/message/message_enum.rs new file mode 100644 index 00000000..c247db9f --- /dev/null +++ b/rocketmq-common/src/common/message/message_enum.rs @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#[derive(Debug, PartialEq, Copy, Clone, Default)] +pub enum MessageType { + #[default] + NormalMsg, + TransMsgHalf, + TransMsgCommit, + DelayMsg, + OrderMsg, +} + +impl MessageType { + pub fn get_short_name(&self) -> &'static str { + match self { + MessageType::NormalMsg => "Normal", + MessageType::TransMsgHalf => "Trans", + MessageType::TransMsgCommit => "TransCommit", + MessageType::DelayMsg => "Delay", + MessageType::OrderMsg => "Order", + } + } + + pub fn get_by_short_name(short_name: &str) -> MessageType { + match short_name { + "Normal" => MessageType::NormalMsg, + "Trans" => MessageType::TransMsgHalf, + "TransCommit" => MessageType::TransMsgCommit, + "Delay" => MessageType::DelayMsg, + "Order" => MessageType::OrderMsg, + _ => MessageType::NormalMsg, + } + } +} + +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum MessageRequestMode { + Pull, + Pop, +} + +impl MessageRequestMode { + pub fn get_name(&self) -> &'static str { + match self { + MessageRequestMode::Pull => "PULL", + MessageRequestMode::Pop => "POP", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_short_name() { + assert_eq!(MessageType::NormalMsg.get_short_name(), "Normal"); + assert_eq!(MessageType::TransMsgHalf.get_short_name(), "Trans"); + assert_eq!(MessageType::TransMsgCommit.get_short_name(), "TransCommit"); + assert_eq!(MessageType::DelayMsg.get_short_name(), "Delay"); + assert_eq!(MessageType::OrderMsg.get_short_name(), "Order"); + } + + #[test] + fn test_get_by_short_name() { + assert_eq!( + MessageType::get_by_short_name("Normal"), + MessageType::NormalMsg + ); + assert_eq!( + MessageType::get_by_short_name("Trans"), + MessageType::TransMsgHalf + ); + assert_eq!( + MessageType::get_by_short_name("TransCommit"), + MessageType::TransMsgCommit + ); + assert_eq!( + MessageType::get_by_short_name("Delay"), + MessageType::DelayMsg + ); + assert_eq!( + MessageType::get_by_short_name("Order"), + MessageType::OrderMsg + ); + assert_eq!( + MessageType::get_by_short_name("Invalid"), + MessageType::NormalMsg + ); + } + + #[test] + fn test_get_name() { + assert_eq!(MessageRequestMode::Pull.get_name(), "PULL"); + assert_eq!(MessageRequestMode::Pop.get_name(), "POP"); + } +} diff --git a/rocketmq-remoting/src/lib.rs b/rocketmq-remoting/src/lib.rs index 0b8a1e4a..59a001b3 100644 --- a/rocketmq-remoting/src/lib.rs +++ b/rocketmq-remoting/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(future_join)] /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -23,6 +24,7 @@ pub mod error; pub mod net; pub mod protocol; pub mod remoting; +pub mod rpc; pub mod runtime; pub mod server; diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index 68f6b719..5b89d31c 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -16,4 +16,5 @@ */ pub mod broker; pub mod client_request_header; +pub mod message_operation_header; pub mod namesrv; diff --git a/rocketmq-remoting/src/protocol/header/message_operation_header.rs b/rocketmq-remoting/src/protocol/header/message_operation_header.rs new file mode 100644 index 00000000..9cbffaaa --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/message_operation_header.rs @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub mod send_message_request_header; diff --git a/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs b/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs new file mode 100644 index 00000000..785d015a --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq_macros::{RemotingSerializable, RequestHeaderCodec}; +use serde::{Deserialize, Serialize}; + +use crate::{code::request_code::RequestCode, protocol::remoting_command::RemotingCommand}; + +#[derive(Debug, Clone, Serialize, Deserialize, RemotingSerializable, RequestHeaderCodec)] +#[serde(rename_all = "camelCase")] +pub struct SendMessageRequestHeader { + // the namespace name + pub ns: Option, + // if the data has been namespaced + pub nsd: Option, + // the abstract remote addr name, usually the physical broker name + pub bname: Option, + // oneway + pub oway: Option, + + pub lo: Option, + + pub producer_group: String, + pub topic: String, + pub default_topic: String, + pub default_topic_queue_nums: i32, + pub queue_id: i32, + pub sys_flag: i32, + pub born_timestamp: i64, + pub flag: i32, + pub properties: Option, + pub reconsume_times: Option, + pub unit_mode: Option, + pub batch: Option, + pub max_reconsume_times: Option, +} + +impl SendMessageRequestHeader { + pub fn new( + producer_group: String, + topic: String, + default_topic: String, + default_topic_queue_nums: i32, + queue_id: i32, + sys_flag: i32, + born_timestamp: i64, + flag: i32, + ) -> Self { + SendMessageRequestHeader { + ns: None, + nsd: None, + bname: None, + oway: None, + lo: None, + producer_group, + topic, + default_topic, + default_topic_queue_nums, + queue_id, + sys_flag, + born_timestamp, + flag, + properties: None, + reconsume_times: None, + unit_mode: None, + batch: None, + max_reconsume_times: None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, RemotingSerializable, RequestHeaderCodec)] +#[serde(rename_all = "camelCase")] +pub struct SendMessageRequestHeaderV2 { + // the namespace name + pub ns: Option, + // if the data has been namespaced + pub nsd: Option, + // the abstract remote addr name, usually the physical broker name + pub bname: Option, + // oneway + pub oway: Option, + + pub lo: Option, + + pub a: String, // producerGroup + pub b: String, // topic + pub c: String, // defaultTopic + pub d: i32, // defaultTopicQueueNums + pub e: i32, // queueId + pub f: i32, // sysFlag + pub g: i64, // bornTimestamp + pub h: i32, // flag + pub i: Option, // properties + pub j: Option, // reconsumeTimes + pub k: Option, // unitMode + pub l: Option, // consumeRetryTimes + pub m: Option, // batch + pub n: Option, // brokerName +} + +impl SendMessageRequestHeaderV2 { + pub fn new(a: String, b: String, c: String, d: i32, e: i32, f: i32, g: i64, h: i32) -> Self { + SendMessageRequestHeaderV2 { + ns: None, + nsd: None, + bname: None, + oway: None, + lo: None, + a, + b, + c, + d, + e, + f, + g, + h, + i: None, + j: None, + k: None, + l: None, + m: None, + n: None, + } + } + + pub fn create_send_message_request_header_v1(&self) -> SendMessageRequestHeader { + SendMessageRequestHeader { + ns: self.ns.as_ref().cloned(), + nsd: self.nsd.as_ref().cloned(), + bname: self.n.as_ref().cloned(), + oway: self.oway.as_ref().cloned(), + lo: self.lo.as_ref().cloned(), + producer_group: self.a.clone(), + topic: self.b.clone(), + default_topic: self.c.clone(), + default_topic_queue_nums: self.d, + queue_id: self.e, + sys_flag: self.f, + born_timestamp: self.g, + flag: self.h, + properties: self.i.as_ref().cloned(), + reconsume_times: self.j, + unit_mode: self.k, + batch: self.m, + max_reconsume_times: self.l, + } + } +} + +pub fn parse_request_header(request: &RemotingCommand) -> Option { + let mut request_header_v2 = None; + if RequestCode::SendMessageV2.to_i32() == request.code() + || RequestCode::SendBatchMessage.to_i32() == request.code() + { + request_header_v2 = request.decode_command_custom_header::(); + } + + match request_header_v2 { + Some(header) => Some(header.create_send_message_request_header_v1()), + None => request.decode_command_custom_header::(), + } +} diff --git a/rocketmq-remoting/src/protocol/static_topic.rs b/rocketmq-remoting/src/protocol/static_topic.rs index f06344be..d19facd4 100644 --- a/rocketmq-remoting/src/protocol/static_topic.rs +++ b/rocketmq-remoting/src/protocol/static_topic.rs @@ -17,4 +17,5 @@ pub mod logic_queue_mapping_item; pub mod topic_queue_info; +mod topic_queue_mapping_context; pub mod topic_queue_mapping_detail; diff --git a/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_context.rs b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_context.rs new file mode 100644 index 00000000..b55b6cb8 --- /dev/null +++ b/rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_context.rs @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::protocol::static_topic::{ + logic_queue_mapping_item::LogicQueueMappingItem, + topic_queue_mapping_detail::TopicQueueMappingDetail, +}; + +pub struct TopicQueueMappingContext { + pub topic: String, + pub global_id: Option, + pub mapping_detail: Option, + pub mapping_item_list: Vec, + pub leader_item: Option, + pub current_item: Option, +} + +impl TopicQueueMappingContext { + pub fn new( + topic: impl Into, + global_id: Option, + mapping_detail: Option, + mapping_item_list: Vec, + leader_item: Option, + ) -> Self { + Self { + topic: topic.into(), + global_id, + mapping_detail, + mapping_item_list, + leader_item, + current_item: None, + } + } +} diff --git a/rocketmq-remoting/src/rpc.rs b/rocketmq-remoting/src/rpc.rs new file mode 100644 index 00000000..289d1f87 --- /dev/null +++ b/rocketmq-remoting/src/rpc.rs @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub mod rpc_request_header; diff --git a/rocketmq-remoting/src/rpc/rpc_request_header.rs b/rocketmq-remoting/src/rpc/rpc_request_header.rs new file mode 100644 index 00000000..50f360cb --- /dev/null +++ b/rocketmq-remoting/src/rpc/rpc_request_header.rs @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RpcRequestHeader { + // the namespace name + pub ns: Option, + // if the data has been namespaced + pub nsd: Option, + // the abstract remote addr name, usually the physical broker name + pub bname: Option, + // oneway + pub oway: Option, + + pub lo: Option, +} diff --git a/rocketmq-remoting/src/server/rocketmq_server.rs b/rocketmq-remoting/src/server/rocketmq_server.rs index e076fde2..722ab800 100644 --- a/rocketmq-remoting/src/server/rocketmq_server.rs +++ b/rocketmq-remoting/src/server/rocketmq_server.rs @@ -18,8 +18,9 @@ use std::{error::Error, net::SocketAddr, sync::Arc}; +use futures::executor::block_on; use rocketmq_common::TokioExecutorService; -use tokio::{net::TcpListener, sync::broadcast}; +use tokio::{net::TcpListener, sync::broadcast, task::JoinHandle}; use crate::{ protocol::remoting_command::RemotingCommand, @@ -31,6 +32,7 @@ use crate::{ pub struct RocketmqDefaultServer { pub(crate) broker_server_config: BrokerServerConfig, pub(crate) server_inner: ServerInner, + pub future: Option>, } impl RocketmqDefaultServer { @@ -38,32 +40,36 @@ impl RocketmqDefaultServer { Self { broker_server_config, server_inner: ServerInner::new(), + future: None, } } } impl RemotingService for RocketmqDefaultServer { - async fn start(&mut self) { - let listener = TcpListener::bind(&format!( - "{}:{}", - self.broker_server_config.bind_address.as_str(), - self.broker_server_config.listen_port - )) - .await - .unwrap(); + fn start(&mut self) -> impl std::future::Future + Send { + let address = self.broker_server_config.bind_address.as_str(); + let port = self.broker_server_config.listen_port; + let listener = block_on(async move { + TcpListener::bind(&format!("{}:{}", address, port)) + .await + .unwrap() + }); let (notify_conn_disconnect, _) = broadcast::channel::(100); + let default_request_processor = self + .server_inner + .default_request_processor_pair + .as_ref() + .unwrap() + .clone(); + let processor_table = self.server_inner.processor_table.as_ref().unwrap().clone(); + run( listener, tokio::signal::ctrl_c(), - self.server_inner - .default_request_processor_pair - .as_ref() - .unwrap() - .clone(), - self.server_inner.processor_table.as_ref().unwrap().clone(), + default_request_processor, + processor_table, Some(notify_conn_disconnect), ) - .await } fn shutdown(&mut self) { diff --git a/rocketmq-store/src/status.rs b/rocketmq-store/src/status.rs index 9cb480ca..74920efa 100644 --- a/rocketmq-store/src/status.rs +++ b/rocketmq-store/src/status.rs @@ -15,3 +15,18 @@ * limitations under the License. */ pub mod manager; + +#[derive(Debug, PartialEq, Copy, Clone, Default)] +pub enum StatsType { + #[default] + SendSuccess, + SendFailure, + RcvSuccess, + RcvEpolls, + SendBack, + SendBackToDlq, + SendOrder, + SendTimer, + SendTransaction, + PermFailure, +} From 0acba7a63b1cde0346a86f7a40b330c838827264 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 17 Feb 2024 00:56:00 +0800 Subject: [PATCH 2/2] add test case --- .../send_message_request_header.rs | 90 +++++++++++++++++++ rocketmq-store/src/status.rs | 60 +++++++++++++ 2 files changed, 150 insertions(+) diff --git a/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs b/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs index 785d015a..6d922171 100644 --- a/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs +++ b/rocketmq-remoting/src/protocol/header/message_operation_header/send_message_request_header.rs @@ -174,3 +174,93 @@ pub fn parse_request_header(request: &RemotingCommand) -> Option request.decode_command_custom_header::(), } } + +#[cfg(test)] +mod tests { + use RemotingCommand; + + use super::*; + + #[test] + fn test_send_message_request_header_new() { + let header = SendMessageRequestHeader::new( + String::from("group"), + String::from("topic"), + String::from("default_topic"), + 1, + 0, + 0, + 0, + 0, + ); + assert_eq!(header.producer_group, "group"); + assert_eq!(header.topic, "topic"); + assert_eq!(header.default_topic, "default_topic"); + assert_eq!(header.default_topic_queue_nums, 1); + assert_eq!(header.queue_id, 0); + assert_eq!(header.sys_flag, 0); + assert_eq!(header.born_timestamp, 0); + assert_eq!(header.flag, 0); + } + + #[test] + fn test_send_message_request_header_v2_new() { + let header_v2 = SendMessageRequestHeaderV2::new( + String::from("group"), + String::from("topic"), + String::from("default_topic"), + 1, + 0, + 0, + 0, + 0, + ); + assert_eq!(header_v2.a, "group"); + assert_eq!(header_v2.b, "topic"); + assert_eq!(header_v2.c, "default_topic"); + assert_eq!(header_v2.d, 1); + assert_eq!(header_v2.e, 0); + assert_eq!(header_v2.f, 0); + assert_eq!(header_v2.g, 0); + assert_eq!(header_v2.h, 0); + } + + #[test] + fn test_send_message_request_header_v2_create_v1() { + let header_v2 = SendMessageRequestHeaderV2::new( + String::from("group"), + String::from("topic"), + String::from("default_topic"), + 1, + 0, + 0, + 0, + 0, + ); + let header_v1 = header_v2.create_send_message_request_header_v1(); + assert_eq!(header_v1.producer_group, "group"); + assert_eq!(header_v1.topic, "topic"); + assert_eq!(header_v1.default_topic, "default_topic"); + assert_eq!(header_v1.default_topic_queue_nums, 1); + assert_eq!(header_v1.queue_id, 0); + assert_eq!(header_v1.sys_flag, 0); + assert_eq!(header_v1.born_timestamp, 0); + assert_eq!(header_v1.flag, 0); + } + + #[test] + fn test_parse_request_header_v1() { + let mut request = RemotingCommand::create_response_command(); + request = request.set_code(RequestCode::SendMessage.to_i32()); + let header = parse_request_header(&request); + assert_eq!(header.is_none(), true); + } + + #[test] + fn test_parse_request_header_v2() { + let mut request = RemotingCommand::create_response_command(); + request = request.set_code(RequestCode::SendMessage.to_i32()); + let header = parse_request_header(&request); + assert_eq!(header.is_none(), true); + } +} diff --git a/rocketmq-store/src/status.rs b/rocketmq-store/src/status.rs index 74920efa..58b448d9 100644 --- a/rocketmq-store/src/status.rs +++ b/rocketmq-store/src/status.rs @@ -30,3 +30,63 @@ pub enum StatsType { SendTransaction, PermFailure, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default() { + assert_eq!(StatsType::default(), StatsType::SendSuccess); + } + + #[test] + fn test_send_success() { + assert_eq!(StatsType::SendSuccess, StatsType::SendSuccess); + } + + #[test] + fn test_send_failure() { + assert_eq!(StatsType::SendFailure, StatsType::SendFailure); + } + + #[test] + fn test_rcv_success() { + assert_eq!(StatsType::RcvSuccess, StatsType::RcvSuccess); + } + + #[test] + fn test_rcv_epolls() { + assert_eq!(StatsType::RcvEpolls, StatsType::RcvEpolls); + } + + #[test] + fn test_send_back() { + assert_eq!(StatsType::SendBack, StatsType::SendBack); + } + + #[test] + fn test_send_back_to_dlq() { + assert_eq!(StatsType::SendBackToDlq, StatsType::SendBackToDlq); + } + + #[test] + fn test_send_order() { + assert_eq!(StatsType::SendOrder, StatsType::SendOrder); + } + + #[test] + fn test_send_timer() { + assert_eq!(StatsType::SendTimer, StatsType::SendTimer); + } + + #[test] + fn test_send_transaction() { + assert_eq!(StatsType::SendTransaction, StatsType::SendTransaction); + } + + #[test] + fn test_perm_failure() { + assert_eq!(StatsType::PermFailure, StatsType::PermFailure); + } +}