Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client::receive_message is not cancel safe #38

Open
Ddystopia opened this issue Apr 5, 2024 · 2 comments
Open

Client::receive_message is not cancel safe #38

Ddystopia opened this issue Apr 5, 2024 · 2 comments

Comments

@Ddystopia
Copy link

Hello, Client::receive_message is taking a &mut self, so it is not possible to send message if I want to listen for a messages, without cancelling the future via, for example, select. That way if I need to send a message, I quit listening network and sending a message. But looking at the source code I found out that it is not cancel safe - data may be lost.

#[cfg(not(feature = "tls"))]
async fn receive_packet<'c, T: Read + Write>(
    buffer: &mut [u8],
    buffer_len: usize,
    recv_buffer: &mut [u8],
    conn: &'c mut NetworkConnection<T>,
) -> Result<usize, ReasonCode> {
    use crate::utils::buffer_writer::RemLenError;

    let target_len: usize;
    let mut rem_len: Result<VariableByteInteger, RemLenError>;
    let mut writer = BuffWriter::new(buffer, buffer_len);
    let mut i = 0;

    // Get len of packet
    trace!("Reading lenght of packet");
    loop {
        trace!("    Reading in loop!");
        let len: usize = conn
            .receive(&mut recv_buffer[writer.position..(writer.position + 1)])
            .await?;
        trace!("    Received data!");
        if len == 0 {
            trace!("Zero byte len packet received, dropping connection.");
            return Err(ReasonCode::NetworkError);
        }
        i += len;
        if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) {
            error!("Error occurred during write to buffer!");
            return Err(ReasonCode::BuffError);
        }
        if i > 1 {
            rem_len = writer.get_rem_len();
            if rem_len.is_ok() {
                break;
            }
            if i >= 5 {
                error!("Could not read len of packet!");
                return Err(ReasonCode::NetworkError);
            }
        }
    }
    trace!("Lenght done!");
    let rem_len_len = i;
    i = 0;
    if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) {
        trace!("Reading packet with target len {}", l);
        target_len = l as usize;
    } else {
        error!("Could not decode len of packet!");
        return Err(ReasonCode::BuffError);
    }

    loop {
        if writer.position == target_len + rem_len_len {
            trace!("Received packet with len: {}", (target_len + rem_len_len));
            return Ok(target_len + rem_len_len);
        }
        let len: usize = conn
            .receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)])
            .await?;
        i += len;
        if let Err(_e) =
            writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)])
        {
            error!("Error occurred during write to buffer!");
            return Err(ReasonCode::BuffError);
        }
    }
}

For example, if somewhere during the loop, in between awaits future will be canceled, all data previously read to buffer will remain inside, but writer has it's state saved on stack, so writer.position will be lost, as current stack frame will be destroyed during cancellation. But conn will continue to give bytes from the packet, so any receive_packet will likely end with and Error.

@Ddystopia
Copy link
Author

Rumqttc does not have that issue, because they have an event loop and publish is taking &self instead of rust-mqtt's send_message(&mut self) by pushing request into a queue, and that request will be handled already after everything is received.

@obabec
Copy link
Owner

obabec commented Apr 17, 2024

@Ddystopia Hi, thanks for the issue. Feel free to open a PR. I haven't had much time lately to take a look into that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants