diff --git a/rocketmq-remoting/src/protocol/remoting_command.rs b/rocketmq-remoting/src/protocol/remoting_command.rs index e9e52cd4..b7b3877a 100644 --- a/rocketmq-remoting/src/protocol/remoting_command.rs +++ b/rocketmq-remoting/src/protocol/remoting_command.rs @@ -284,11 +284,13 @@ impl RemotingCommand { self } + #[inline] pub fn set_opaque(mut self, opaque: i32) -> Self { self.opaque = opaque; self } + #[inline] pub fn set_opaque_mut(&mut self, opaque: i32) { self.opaque = opaque; } diff --git a/rocketmq-remoting/src/remoting_server/server.rs b/rocketmq-remoting/src/remoting_server/server.rs index 805d17d2..764cca8d 100644 --- a/rocketmq-remoting/src/remoting_server/server.rs +++ b/rocketmq-remoting/src/remoting_server/server.rs @@ -106,6 +106,7 @@ impl ConnectionHandler { impl ConnectionHandler { async fn handle(&mut self) -> Result<()> { while !self.shutdown.is_shutdown { + //Get the next frame from the connection. let frame = tokio::select! { res = self.connection_handler_context.channel.connection.reader.next() => res, _ = self.shutdown.recv() =>{ @@ -135,38 +136,100 @@ impl ConnectionHandler { } continue; } - - //handle request - let mut exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) { + let opaque = cmd.opaque(); + let oneway_rpc = cmd.is_oneway_rpc(); + //before handle request hooks + let exception = match self.do_before_rpc_hooks(&self.channel, &mut cmd) { Ok(_) => None, Err(error) => Some(error), }; - let opaque = cmd.opaque(); - let oneway_rpc = cmd.is_oneway_rpc(); - let mut response = if exception.is_some() { - Some(RemotingCommand::create_remoting_command( - ResponseCode::SystemError, - )) - } else { + //handle error if return have + match self.handle_error(oneway_rpc, opaque, exception).await { + HandleErrorResult::Continue => continue, + HandleErrorResult::ReturnMethod => return Ok(()), + HandleErrorResult::GoHead => {} + } + + let mut response = { let channel = self.channel.clone(); let ctx = ArcRefCellWrapper::downgrade(&self.connection_handler_context); tokio::select! { - result = self.request_processor.process_request(channel,ctx,cmd) => result?, + result = self.request_processor.process_request(channel,ctx,cmd) => match result{ + Ok(value) => value, + Err(_err) => Some(RemotingCommand::create_response_command_with_code( + ResponseCode::SystemError, + )), + }, } }; - if response.is_none() { + + let exception = + match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) { + Ok(_) => None, + Err(error) => Some(error), + }; + + match self.handle_error(oneway_rpc, opaque, exception).await { + HandleErrorResult::Continue => continue, + HandleErrorResult::ReturnMethod => return Ok(()), + HandleErrorResult::GoHead => {} + } + if response.is_none() || oneway_rpc { continue; } - exception = match self.do_before_rpc_hooks(&self.channel, response.as_mut().unwrap()) { - Ok(_) => None, - Err(error) => Some(error), - }; + let response = response.unwrap(); + tokio::select! { + result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ + Ok(_) =>{}, + Err(err) => { + match err { + Error::Io(io_error) => { + error!("connection disconnect: {}", io_error); + return Ok(()) + } + _ => { error!("send response failed: {}", err);} + } + }, + }, + } + } + Ok(()) + } - if let Some(exception_inner) = exception { - match exception_inner { - Error::AbortProcessException(code, message) => { + async fn handle_error( + &mut self, + oneway_rpc: bool, + opaque: i32, + exception: Option, + ) -> HandleErrorResult { + if let Some(exception_inner) = exception { + match exception_inner { + Error::AbortProcessException(code, message) => { + if oneway_rpc { + return HandleErrorResult::Continue; + } + let response = + RemotingCommand::create_response_command_with_code_remark(code, message); + tokio::select! { + result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ + Ok(_) =>{}, + Err(err) => { + match err { + Error::Io(io_error) => { + error!("send response failed: {}", io_error); + return HandleErrorResult::ReturnMethod; + } + _ => { error!("send response failed: {}", err);} + } + }, + }, + } + } + _ => { + if !oneway_rpc { let response = RemotingCommand::create_response_command_with_code_remark( - code, message, + ResponseCode::SystemError, + exception_inner.to_string(), ); tokio::select! { result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ @@ -175,7 +238,7 @@ impl ConnectionHandler { match err { Error::Io(io_error) => { error!("send response failed: {}", io_error); - return Ok(()) + return HandleErrorResult::ReturnMethod; } _ => { error!("send response failed: {}", err);} } @@ -183,53 +246,21 @@ impl ConnectionHandler { }, } } - _ => { - if !oneway_rpc { - let response = - RemotingCommand::create_response_command_with_code_remark( - ResponseCode::SystemError, - exception_inner.to_string(), - ); - tokio::select! { - result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ - Ok(_) =>{}, - Err(err) => { - match err { - Error::Io(io_error) => { - error!("send response failed: {}", io_error); - return Ok(()) - } - _ => { error!("send response failed: {}", err);} - } - }, - }, - } - } - } } - continue; - } - - let response = response.unwrap(); - tokio::select! { - result =self.connection_handler_context.channel.connection.writer.send(response.set_opaque(opaque)) => match result{ - Ok(_) =>{}, - Err(err) => { - match err { - Error::Io(io_error) => { - error!("send response failed: {}", io_error); - return Ok(()) - } - _ => { error!("send response failed: {}", err);} - } - }, - }, } + HandleErrorResult::Continue + } else { + HandleErrorResult::GoHead } - Ok(()) } } +enum HandleErrorResult { + Continue, + ReturnMethod, + GoHead, +} + /// Server listener state. Created in the `run` call. It includes a `run` method /// which performs the TCP listening and initialization of per-connection state. struct ConnectionListener {