Skip to content

Commit

Permalink
[ISSUE #811]🎨Refactor Client error handle🔥 (#812)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jul 19, 2024
1 parent 6c65f51 commit 4c14e4a
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 101 deletions.
18 changes: 14 additions & 4 deletions rocketmq-namesrv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::time::Duration;

use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
use rocketmq_remoting::runtime::server::RocketMQServer;
use rocketmq_runtime::RocketMQRuntime;
use tokio::select;
Expand All @@ -42,10 +44,12 @@ pub struct Builder {

struct NameServerRuntime {
name_server_config: Arc<NamesrvConfig>,
tokio_client_config: Arc<TokioClientConfig>,
server_config: Arc<ServerConfig>,
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
name_server_runtime: Option<RocketMQRuntime>,
remoting_client: RocketmqDefaultClient,
}

impl NameServerBootstrap {
Expand Down Expand Up @@ -78,7 +82,7 @@ impl NameServerRuntime {
self.name_server_config.clone(),
self.kvconfig_manager.clone(),
);
let default_request_processor = DefaultRequestProcessor::new_with(
let default_request_processor = DefaultRequestProcessor::new(
self.route_info_manager.clone(),
self.kvconfig_manager.clone(),
);
Expand Down Expand Up @@ -136,17 +140,23 @@ impl Builder {
pub fn build(self) -> NameServerBootstrap {
let name_server_config = Arc::new(self.name_server_config.unwrap());
let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread");
let tokio_client_config = Arc::new(TokioClientConfig::default());
let remoting_client = RocketmqDefaultClient::new(tokio_client_config.clone());

NameServerBootstrap {
name_server_runtime: NameServerRuntime {
name_server_config: name_server_config.clone(),
tokio_client_config,
server_config: Arc::new(self.server_config.unwrap()),
route_info_manager: Arc::new(parking_lot::RwLock::new(
RouteInfoManager::new_with_config(name_server_config.clone()),
)),
route_info_manager: Arc::new(parking_lot::RwLock::new(RouteInfoManager::new(
name_server_config.clone(),
Arc::new(remoting_client.clone()),
))),
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
name_server_config,
))),
name_server_runtime: Some(runtime),
remoting_client,
},
}
}
Expand Down
14 changes: 2 additions & 12 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::Arc;
use bytes::Bytes;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::mq_version::RocketMqVersion;
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::CRC32Utils;
use rocketmq_remoting::code::request_code::RequestCode;
Expand Down Expand Up @@ -61,7 +60,7 @@ use tracing::warn;
use crate::route::route_info_manager::RouteInfoManager;
use crate::KVConfigManager;

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct DefaultRequestProcessor {
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
Expand Down Expand Up @@ -212,16 +211,7 @@ impl DefaultRequestProcessor {

#[allow(clippy::new_without_default)]
impl DefaultRequestProcessor {
pub fn new(namesrv_config: Arc<NamesrvConfig>) -> Self {
Self {
route_info_manager: Arc::new(parking_lot::RwLock::new(RouteInfoManager::new())),
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
namesrv_config,
))),
}
}

pub fn new_with(
pub fn new(
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
) -> Self {
Expand Down
41 changes: 23 additions & 18 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;

use rocketmq_common::common::config::TopicConfig;
Expand All @@ -29,7 +28,8 @@ use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_common::common::TopicSysFlag;
use rocketmq_common::TimeUtils;
use rocketmq_remoting::clients::RemoteClient;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
Expand Down Expand Up @@ -65,7 +65,6 @@ type FilterServerTable =
type TopicQueueMappingInfoTable =
HashMap<String /* topic */, HashMap<String /* brokerName */, TopicQueueMappingInfo>>;

#[derive(Debug, Default)]
pub struct RouteInfoManager {
pub(crate) topic_queue_table: TopicQueueTable,
pub(crate) broker_addr_table: BrokerAddrTable,
Expand All @@ -74,16 +73,15 @@ pub struct RouteInfoManager {
pub(crate) filter_server_table: FilterServerTable,
pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable,
pub(crate) namesrv_config: Arc<NamesrvConfig>,
pub(crate) remote_client: RemoteClient,
pub(crate) remoting_client: Arc<RocketmqDefaultClient>,
}

#[allow(private_interfaces)]
impl RouteInfoManager {
pub fn new() -> Self {
Self::default()
}

pub fn new_with_config(namesrv_config: Arc<NamesrvConfig>) -> Self {
pub fn new(
namesrv_config: Arc<NamesrvConfig>,
remoting_client: Arc<RocketmqDefaultClient>,
) -> Self {
RouteInfoManager {
topic_queue_table: HashMap::new(),
broker_addr_table: HashMap::new(),
Expand All @@ -92,7 +90,7 @@ impl RouteInfoManager {
filter_server_table: HashMap::new(),
topic_queue_mapping_info_table: HashMap::new(),
namesrv_config,
remote_client: RemoteClient::new(),
remoting_client,
}
}
}
Expand Down Expand Up @@ -599,14 +597,21 @@ impl RouteInfoManager {
self.choose_broker_addrs_to_notify(broker_addr_map, offline_broker_addr)
{
for broker_addr in broker_addrs_notify {
let _ = self.remote_client.invoke_oneway(
broker_addr,
RemotingCommand::create_request_command(
RequestCode::NotifyMinBrokerIdChange,
request_header.clone(),
),
Duration::from_millis(3000),
);
let remoting_client = self.remoting_client.clone();
let requst_header = request_header.clone();
let broker_addr = broker_addr.clone();
tokio::spawn(async move {
let _ = remoting_client
.invoke_oneway(
broker_addr,
RemotingCommand::create_request_command(
RequestCode::NotifyMinBrokerIdChange,
requst_header,
),
3000,
)
.await;
});
}
}
}
Expand Down
45 changes: 0 additions & 45 deletions rocketmq-remoting/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
* limitations under the License.
*/

use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::time::Duration;

pub use blocking_client::BlockingClient;
pub use client::Client;

Expand All @@ -35,46 +30,6 @@ mod blocking_client;
mod client;
pub mod rocketmq_default_impl;

#[derive(Default)]
pub struct RemoteClient {
inner: HashMap<String, BlockingClient>,
}

impl Clone for RemoteClient {
fn clone(&self) -> Self {
Self {
inner: HashMap::new(),
}
}
}

impl Debug for RemoteClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("RemoteClient")
}
}

impl RemoteClient {
/// Create a new `RemoteClient` instance.
pub fn new() -> Self {
Self {
inner: HashMap::new(),
}
}

pub fn invoke_oneway(
&mut self,
addr: String,
request: RemotingCommand,
timeout: Duration,
) -> anyhow::Result<()> {
self.inner
.entry(addr.clone())
.or_insert_with(|| BlockingClient::connect(addr).unwrap())
.invoke_oneway(request, timeout)
}
}

/// `RemotingClient` trait extends `RemotingService` to provide client-specific remote interaction
/// functionalities.
///
Expand Down
7 changes: 3 additions & 4 deletions rocketmq-remoting/src/clients/blocking_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ impl BlockingClient {
&mut self,
request: RemotingCommand,
timeout: Duration,
) -> anyhow::Result<()> {
) -> crate::Result<()> {
match self
.rt
.block_on(tokio::time::timeout(timeout, self.inner.send(request)))
{
Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()),
Ok(value) => value,
Err(err) => Err(crate::error::Error::Elapsed(err)),
}
}
}
46 changes: 28 additions & 18 deletions rocketmq-remoting/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use anyhow::anyhow;

use futures_util::SinkExt;
use tokio_stream::StreamExt;

use crate::connection::Connection;
use crate::error::Error::ConnectionInvalid;
use crate::error::Error::Io;
use crate::protocol::remoting_command::RemotingCommand;
use crate::Result;

pub struct Client {
/// The TCP connection decorated with the rocketmq remoting protocol encoder / decoder
Expand All @@ -42,12 +45,13 @@ impl Client {
/// # Returns
///
/// A new `Client` instance wrapped in a `Result`. Returns an error if the connection fails.
pub async fn connect<T: tokio::net::ToSocketAddrs>(addr: T) -> anyhow::Result<Client> {
let tcp_stream = tokio::net::TcpStream::connect(addr).await?;
// let tcp_stream = timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await??;
//let socket_addr = tcp_stream.peer_addr()?;
pub async fn connect<T: tokio::net::ToSocketAddrs>(addr: T) -> Result<Client> {
let tcp_stream = tokio::net::TcpStream::connect(addr).await;
if tcp_stream.is_err() {
return Err(Io(tcp_stream.err().unwrap()));
}
Ok(Client {
connection: Connection::new(tcp_stream),
connection: Connection::new(tcp_stream.unwrap()),
})
}

Expand All @@ -61,7 +65,7 @@ impl Client {
///
/// The `RemotingCommand` representing the response, wrapped in a `Result`. Returns an error if
/// the invocation fails.
pub async fn send_read(&mut self, request: RemotingCommand) -> anyhow::Result<RemotingCommand> {
pub async fn send_read(&mut self, request: RemotingCommand) -> Result<RemotingCommand> {
self.send(request).await?;
let response = self.read().await?;
Ok(response)
Expand Down Expand Up @@ -91,13 +95,16 @@ impl Client {
/// # Returns
///
/// A `Result` indicating success or failure in sending the request.
pub async fn send(&mut self, request: RemotingCommand) -> anyhow::Result<()> {
pub async fn send(&mut self, request: RemotingCommand) -> Result<()> {
match self.connection.framed.send(request).await {
Ok(_) => Ok(()),
Err(_) => {
self.connection.ok = false;
Err(anyhow!("Failed to send request"))
}
Err(error) => match error {
Io(value) => {
self.connection.ok = false;
Err(ConnectionInvalid(value.to_string()))
}
_ => Err(error),
},
}
}

Expand All @@ -107,18 +114,21 @@ impl Client {
///
/// The `RemotingCommand` representing the response, wrapped in a `Result`. Returns an error if
/// reading the response fails.
async fn read(&mut self) -> anyhow::Result<RemotingCommand> {
async fn read(&mut self) -> Result<RemotingCommand> {
match self.connection.framed.next().await {
None => {
self.connection.ok = false;
Err(anyhow!("Failed to read response"))
Err(ConnectionInvalid("connection disconnection".to_string()))
}
Some(result) => match result {
Ok(response) => Ok(response),
Err(err) => {
self.connection.ok = false;
Err(anyhow!(err))
}
Err(error) => match error {
Io(value) => {
self.connection.ok = false;
Err(ConnectionInvalid(value.to_string()))
}
_ => Err(error),
},
},
}
}
Expand Down
6 changes: 6 additions & 0 deletions rocketmq-remoting/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub enum Error {

#[error("Not support serialize type: {0}")]
NotSupportSerializeType(u8),

#[error("{0}")]
ConnectionInvalid(String),

#[error("{0}")]
Elapsed(#[from] tokio::time::error::Elapsed),
}

#[cfg(test)]
Expand Down

0 comments on commit 4c14e4a

Please sign in to comment.