You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello, Client::receive_message is taking a &mut self, so it is not possible to send message if I want to listen for a messages, without cancelling the future via, for example, select. That way if I need to send a message, I quit listening network and sending a message. But looking at the source code I found out that it is not cancel safe - data may be lost.
#[cfg(not(feature = "tls"))]asyncfnreceive_packet<'c,T:Read + Write>(buffer:&mut[u8],buffer_len:usize,recv_buffer:&mut[u8],conn:&'c mutNetworkConnection<T>,) -> Result<usize,ReasonCode>{usecrate::utils::buffer_writer::RemLenError;let target_len:usize;letmut rem_len:Result<VariableByteInteger,RemLenError>;letmut writer = BuffWriter::new(buffer, buffer_len);letmut i = 0;// Get len of packettrace!("Reading lenght of packet");loop{trace!(" Reading in loop!");let len:usize = conn
.receive(&mut recv_buffer[writer.position..(writer.position + 1)]).await?;trace!(" Received data!");if len == 0{trace!("Zero byte len packet received, dropping connection.");returnErr(ReasonCode::NetworkError);}
i += len;ifletErr(_e) = writer.insert_ref(len,&recv_buffer[writer.position..i]){error!("Error occurred during write to buffer!");returnErr(ReasonCode::BuffError);}if i > 1{
rem_len = writer.get_rem_len();if rem_len.is_ok(){break;}if i >= 5{error!("Could not read len of packet!");returnErr(ReasonCode::NetworkError);}}}trace!("Lenght done!");let rem_len_len = i;
i = 0;ifletOk(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()){trace!("Reading packet with target len {}", l);
target_len = l asusize;}else{error!("Could not decode len of packet!");returnErr(ReasonCode::BuffError);}loop{if writer.position == target_len + rem_len_len {trace!("Received packet with len: {}", (target_len + rem_len_len));returnOk(target_len + rem_len_len);}let len:usize = conn
.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]).await?;
i += len;ifletErr(_e) =
writer.insert_ref(len,&recv_buffer[writer.position..(writer.position + i)]){error!("Error occurred during write to buffer!");returnErr(ReasonCode::BuffError);}}}
For example, if somewhere during the loop, in between awaits future will be canceled, all data previously read to buffer will remain inside, but writer has it's state saved on stack, so writer.position will be lost, as current stack frame will be destroyed during cancellation. But conn will continue to give bytes from the packet, so any receive_packet will likely end with and Error.
The text was updated successfully, but these errors were encountered:
Rumqttc does not have that issue, because they have an event loop and publish is taking &self instead of rust-mqtt's send_message(&mut self) by pushing request into a queue, and that request will be handled already after everything is received.
Hello,
Client::receive_message
is taking a&mut self
, so it is not possible to send message if I want to listen for a messages, without cancelling the future via, for example,select
. That way if I need to send a message, I quit listening network and sending a message. But looking at the source code I found out that it is not cancel safe - data may be lost.For example, if somewhere during the loop, in between awaits future will be canceled, all data previously read to
buffer
will remain inside, butwriter
has it's state saved on stack, sowriter.position
will be lost, as current stack frame will be destroyed during cancellation. Butconn
will continue to give bytes from the packet, so anyreceive_packet
will likely end with and Error.The text was updated successfully, but these errors were encountered: