Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cannot used in tokio? #36

Open
Praying opened this issue Sep 18, 2024 · 2 comments
Open

cannot used in tokio? #36

Praying opened this issue Sep 18, 2024 · 2 comments

Comments

@Praying
Copy link

Praying commented Sep 18, 2024

I write code as this, but it got error as the follow, so I want to known how use this crate in tokio ?

use crate::com::CODE_PORT_IN_USE;
use std::net::SocketAddr;
use std::process;
use tokio::net::TcpStream;
use tracing::{error, info, instrument, warn};
use crate::com::{create_reuse_port_listener, ClusterConfig};
use futures::{SinkExt, StreamExt};
use redis_protocol::{
    codec::{resp3_encode_command, Resp3},
    resp3::types::{BytesFrame, Resp3Frame, RespVersion},
};
use tokio_util::codec::{Framed, Decoder, Encoder};

#[instrument]
pub async fn handle_front_conn(socket: TcpStream) {
    if let Err(err) = socket.set_nodelay(true) {
        warn!("cluster fail to set nodelay but skip, due to {:?}",err);
    }
    let mut client_framed = Framed::new(socket, Resp3::default());
    while let Some(result) = client_framed.next().await {
        match result {
            Ok(request) => {
                info!("cluster receive request: {:?}", request)
            },
            Err(e) => {
                eprintln!("Request decode error: {:?}", e);
                break;
            }
        }
    }
    todo!()
}

#[instrument]
pub async fn start(cc: ClusterConfig) -> anyhow::Result<()> {
    let addr = cc.listen_addr.parse::<SocketAddr>().expect("parse socket never fail");
    let listener = match create_reuse_port_listener(&addr).await {
        Ok(v) => v,
        Err(e) => {
            error!("listen {} error: {}", addr, e);
            process::exit(CODE_PORT_IN_USE);
        }
    };
    info!("Listening on {}", addr);

    loop {
        let (socket, addr) = listener.accept().await?;
        info!("Accepted connection from {}", addr);

        tokio::spawn(async move  {
            handle_front_conn(socket).await;
        });
    }
}
error[E0599]: the method `next` exists for struct `Framed<TcpStream, Resp3>`, but its trait bounds were not satisfied
  --> src\proxy2\standalone\mod.rs:21:44
   |
21 |       while let Some(result) = client_framed.next().await {
   |                                              ^^^^ method cannot be called on `Framed<TcpStream, Resp3>` due to unsatisfied trait bounds
   |
  ::: C:\Users\codeg\.cargo\registry\src\rsproxy.cn-0dccff568467c15b\tokio-util-0.6.10\src\codec\framed.rs:16:1
   |
16 | / pin_project! {
17 | |     /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 | |     /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 | |     ///
...  |
30 | |     }
31 | | }
   | |_- doesn't satisfy `_: StreamExt` or `_: Stream`
   |
   = note: the following trait bounds were not satisfied:
           `tokio_util::codec::Framed<tokio::net::TcpStream, Resp3>: Stream`
           which is required by `tokio_util::codec::Framed<tokio::net::TcpStream, Resp3>: StreamExt`
@aembke
Copy link
Owner

aembke commented Sep 18, 2024

Hi @Praying. The codec interface should work with Tokio, but from this code it's not obvious what the issue is. I won't be able to reproduce this due to the references to your other crate types though.

Here's an example of the tests that use Tokio: https://github.com/aembke/redis-protocol.rs/blob/main/tests/codec/mod.rs

@Praying
Copy link
Author

Praying commented Sep 19, 2024

@aembke
It seems works with

tokio-util = { version = "0.7.12", features = ["codec"] }

but not work with

tokio-util = { version = "0.6", features = ["codec"] }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants