Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Channel::into_io_parts() for AsyncRead and AsyncWrite data transfers. #181

Merged
merged 14 commits into from
Sep 22, 2023

Conversation

lowlevl
Copy link
Contributor

@lowlevl lowlevl commented Sep 12, 2023

Hi,

as mentioned in #162, I wanted to contribute to this crate by adding an easy way to pipe a tokio::process::Command I/O to a SSH Channel.

This PR introduces a few changes:

  • A structure ChannelTx, implementing AsyncWrite
  • A structure ChannelRx, implementing AsyncRead
  • Two new methods on the Channel struct:
/// Setup the [`Channel`] to be able to send and receive [`ChannelMsg::Data`]
/// through [`io::ChannelTx`] and [`io::ChannelRx`].
pub fn into_io_parts(&mut self) -> (io::ChannelTx<S>, io::ChannelRx<'_, S>) {
    self.into_io_parts_ext(None)
}

/// Setup the [`Channel`] to be able to send and receive [`ChannelMsg::Data`]
/// or [`ChannelMsg::ExtendedData`] through [`io::ChannelTx`] and [`io::ChannelRx`]
/// depending on the `ext` parameter.
pub fn into_io_parts_ext(
    &mut self,
    ext: Option<u32>,
) {
    /* ... */
}

This implementation allows to Drop either of the structs independently and regain access to the Channel struct when dropping the Rx side.

  • As a minor change, I also did create a directory for the channels module to house the original code as well as my new io module.
  • It optionally includes an auto-close mechanism for the SSH Channels, it improves a bit the handling of Channels but might be a breaking change: Edit: Reverted due to breaking-change and failing a test.
Code snippet
impl<S> Drop for Channel<S>
where
    S: From<(ChannelId, ChannelMsg)>,
{
    fn drop(&mut self) {
        let _ = futures::executor::block_on(
            self.sender
                .send((self.id, ChannelMsg::Close).into())
                .inspect_err(|err| {
                    log::error!("Failed to automatically close channel #{}: {err}", self.id)
                }),
        );
    }
}

Finally the usage of this is as such:

pub async fn execute(
    mut command: tokio::process::Command,
    channel: &mut russh::Channel<russh::server::Msg>,
) -> eyre::Result<std::process::ExitStatus> {
    let mut child = command.spawn()?;

    let (mut stdout, mut stdin) = (
        child
            .stdout
            .take()
            .expect("Unable to take service's `stdout` handle"),
        child
            .stdin
            .take()
            .expect("Unable to take service's `stdin` handle"),
    );
    let (mut tx, mut rx) = channel.into_io_parts();

    tokio::try_join!(
        async move {
            let res = tokio::io::copy(&mut rx, &mut stdin).await;
            drop(stdin); // Dropping stdin hints the process about the tty being disconnected

            res
        },
        tokio::io::copy(&mut stdout, &mut tx)
    )?;

    Ok(child.wait().await?)
}

Thanks for reading,
Feel free to ask questions, or add suggestions :)

@lowlevl lowlevl force-pushed the feature/channel-into-io-part branch 4 times, most recently from 4a9f54d to c797274 Compare September 13, 2023 12:37
@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 13, 2023

Upon reviewing the design and some issues here and there, I figured the issue with splitting rx and tx in the current design is the handling of the ChannelMsg::WindowAdjusted messages for data transfer, so I also made a proposal to move the handling of those messages in the Session::run loop to alleviate the need to manage it at multiple places, the advantage of this method is that it is synchronized even if no-one is polling the receive side.

See 0692d2f.

@lowlevl lowlevl force-pushed the feature/channel-into-io-part branch 3 times, most recently from 360a822 to 0692d2f Compare September 13, 2023 13:27
@Eugeny
Copy link
Owner

Eugeny commented Sep 13, 2023

Appreciate your work and the direction is looking good on the first look. I'm away until mid next week but should be able to properly review after that ✌️

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 13, 2023

Thanks for your quick response, have a great week until then 🎉

@lowlevl lowlevl force-pushed the feature/channel-into-io-part branch 2 times, most recently from 6ca4556 to 0f40451 Compare September 20, 2023 16:02
@Eugeny
Copy link
Owner

Eugeny commented Sep 20, 2023

Just checked it and everything looks good except a few nitpicks on the API.

  • You've removed Channel::writable_packet_size - can you readd it back to ChannelTx?
  • The into_io_parts is a bit wonky since it invites comparison with Socket::into_split, but doesn't actually consume the Channel as the into_ prefix implies. Moreover, the way to creating two Tx sides for stdout and stderr (calling into_io_parts twice on the same object) is not obvious and feels wrong to the user. I suggest splitting it into make_writer and make_reader, which makes it clear that the halves are independent and can be created separately and/or multiple times.

P.S. can you please also add tests to cover the new code, and also replace the now half-obsolete ChannelStream with a simple wrapper for ChannelTx+ChannelRx?

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 20, 2023

Hi,
thanks for your review, I plan to do the changes tomorrow.

  • Sure, I'll add the writable_packet_size method again:
    Done, added back to Channel though as ChannelRx & ChannelTx are private.
  • Agreed on the into_io_parts method, about the make_reader and make_writer, do you wish them to take the ext: Option<u32> parameter directly or have separate counterparts for extended data (make_reader_ext and make_writer_ext) ?:
    I did go with the make_{reader, writer} + make_{reader, writer}_ext design, don't hesitate to discuss it if you want a combined version.
  • Also, do you want me to remove the two commits about the close-on-drop from history as mentioned in the original comment ? (The commit itself and it's revert):
    I removed the commits.
  • About the ChannelStream wouldn't it be better to drop it altogether, since both ends are independent from each-other and can close at different moments ? this would be a breaking change though.

can you please also add tests to cover the new code

For sure, however I'm unsure about how to create such test, could it work with an integration test with a client and a server sending data to each other within a tokio::join ? Is there an example of a similar test ?
I think unit-testing could be a bit hard because of the nature of the WindowAdjusted packets.

Thanks !

@lowlevl lowlevl force-pushed the feature/channel-into-io-part branch 3 times, most recently from 26f58bd to b06f23e Compare September 21, 2023 10:00
@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

Kinda related topic, but is there a reason to take &mut self in the Channel methods now that the window_size is thread-safely handled ? Most (if not all) of these methods would only requiring a &self reference.

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

It's done 🎉, about my proposition to remove the ChannelStream altogether here, what do you think ?

Once this is sorted the last thing to do will be the tests :)

@Eugeny
Copy link
Owner

Eugeny commented Sep 21, 2023

I think we should keep the ChannelStream interface (even if as a simple wrapper) since it's used by e.g. russh_sftp which expects an AsyncRead + AsyncWrite

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

I have some troubles sorting the ChannelStream matter because of io::ChannelRx's lifetime, is there a russh chat channel somewhere I can join and discuss design ?

@Eugeny
Copy link
Owner

Eugeny commented Sep 21, 2023

I don't have a channel but I guess we can just discuss it in this thread :) I see two options:

  • Add a lifetime to ChannelStream
  • Change ChannelRx to consume Channel instead of referencing it: Channel::into_reader(self) -> ChannelRx + ChannelRx::into_channel(self) -> Channel

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

Okay, I wasn't sure if it was okay to flood here.

  • Adding the lifetime to ChannelStream is what I went for, but unfortunately, SftpSession::new requires S: 'static, I could bubble down this change if you agree.
  • I'd say changing ChannelRx to consume Channel would mean we would have no way to close the channel once we created a ChannelRx which is what I wanted to prevent when I set it to capture &mut Channel.

The other way this could be done would be with the help of owning_ref but it seems unmaintained and to have some RUSTSEC issues..

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

I'm not really fond of the into_reader() + into_channel() design because it requires owning the channel beforehand and is a pain to work with when it's part of a structure.

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

I ended up creating an enum that can either hold an owned struct or a ref of Channel to solve the issue.

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

Other thing, I feel like the S: From<(ChannelId, ChannelMsg) constraint could be removed altogether with the template type, since we always end up creating Msg in the Channel. Is there a reason this template is here ?

@Eugeny
Copy link
Owner

Eugeny commented Sep 21, 2023

@nuRRL it's due to there being two different Msg enums (for the server and the client). Nice trick with ChannelAsMut!

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

Ah yes, that makes sense, that means we could split ChannelMsg into client::ChannelMsg & server::ChannelMsg at some point to implement the client-only methods on Channel<client::ChannelMsg> and server-only methods on Channel<server::ChannelMsg> I think !

I'll look into it for a further PR.

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 21, 2023

Hi again, I fixed some flushing issues and added an end-to-end test for the data streaming from client to server and back.

@Eugeny
Copy link
Owner

Eugeny commented Sep 22, 2023

LGTM! @all-contributors please add @nuRRL for code

@Eugeny Eugeny merged commit 737a49c into Eugeny:main Sep 22, 2023
2 of 3 checks passed
@allcontributors
Copy link
Contributor

@Eugeny

I've put up a pull request to add @nuRRL! 🎉

@lowlevl lowlevl deleted the feature/channel-into-io-part branch September 22, 2023 12:15
@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 22, 2023

Merging might have closed #99 & #178 @Eugeny 👀

@Eugeny
Copy link
Owner

Eugeny commented Sep 22, 2023

@nuRRL could you please also make ChannelRx and ChannelTx public? And make ChannelStream::new pub(crate)

@lowlevl
Copy link
Contributor Author

lowlevl commented Sep 25, 2023

Hi there, just came back from week-end, sure for ChannelStream, even pub(super) works, about ChannelRx and ChannelTx, isn't it better to even hide them via a impl ? They don't expose any methods and keeping the public interface small helps mitigate breaking changes.

@lowlevl lowlevl mentioned this pull request Sep 25, 2023
@Eugeny
Copy link
Owner

Eugeny commented Sep 26, 2023

Agreed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants