Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ractor WebSockets #912

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions websockets/echo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ name = "websocket-client"
path = "src/client.rs"

[dependencies]
actix.workspace = true
actix-files.workspace = true
actix-web.workspace = true
actix-web-actors.workspace = true
actix-ws.workspace = true
awc.workspace = true

env_logger.workspace = true
futures-util = { workspace = true, features = ["sink"] }
log.workspace = true
ractor = { version = "0.10", default-features = false }
tokio = { workspace = true, features = ["full"] }
tokio-stream.workspace = true
18 changes: 15 additions & 3 deletions websockets/echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@

use actix_files::NamedFile;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web_actors::ws;
use ractor::Actor;

mod server;
use self::server::MyWebSocket;
use self::server::{AsMessage, MyWebSocket};

async fn index() -> impl Responder {
NamedFile::open_async("./static/index.html").await.unwrap()
}

/// WebSocket handshake and start `MyWebSocket` actor.
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
ws::start(MyWebSocket::new(), &req, stream)
let (res, session, stream) = actix_ws::handle(&req, stream)?;

let (actor, _handle) = Actor::spawn(None, MyWebSocket, session).await.unwrap();

actix_web::rt::spawn(async move {
let mut stream = stream.aggregate_continuations();

while let Some(Ok(msg)) = stream.recv().await {
actor.send_message(AsMessage::Ws(msg)).unwrap();
}
});

Ok(res)
}

// the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support
Expand Down
135 changes: 84 additions & 51 deletions websockets/echo/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,111 @@
use actix_ws::AggregatedMessage;
use ractor::{ActorProcessingErr, ActorRef};
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_web_actors::ws;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Debug)]
pub(crate) enum AsMessage {
Ws(actix_ws::AggregatedMessage),
Hb,
}

/// websocket connection is long running connection, it easier
/// to handle with an actor
pub struct MyWebSocket {
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
}
pub(crate) struct MyWebSocket;

impl MyWebSocket {
pub fn new() -> Self {
Self { hb: Instant::now() }
async fn handle_hb(
&self,
state: &mut (Instant, actix_ws::Session),
myself: &ActorRef<AsMessage>,
) -> Result<(), ActorProcessingErr> {
if Instant::now().duration_since(state.0) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");

let _ = state.1.clone().close(None).await;
myself.stop(None);

// don't try to send a ping
} else {
state.1.ping(b"").await?;
};

Ok(())
}

/// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL).
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");

// stop actor
ctx.stop();

// don't try to send a ping
return;
async fn handle_ws_msg(
&self,
msg: AggregatedMessage,
state: &mut (Instant, actix_ws::Session),
myself: ActorRef<AsMessage>,
) -> Result<(), ActorProcessingErr> {
println!("WS: {msg:?}");

match msg {
AggregatedMessage::Ping(msg) => {
state.0 = Instant::now();
state.1.pong(&msg).await?;
}

ctx.ping(b"");
});
AggregatedMessage::Pong(_) => {
state.0 = Instant::now();
}

AggregatedMessage::Text(text) => {
state.1.text(text).await?;
}

AggregatedMessage::Binary(bin) => {
state.1.binary(bin).await?;
}

AggregatedMessage::Close(reason) => {
let _ = state.1.clone().close(reason).await;
myself.stop(None);
}
};

Ok(())
}
}

impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
impl ractor::Actor for MyWebSocket {
type Msg = AsMessage;
type State = (Instant, actix_ws::Session);
type Arguments = actix_ws::Session;

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
session: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
myself.send_interval(HEARTBEAT_INTERVAL, || AsMessage::Hb);

/// Method is called on actor start. We start the heartbeat process here.
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
Ok((Instant::now(), session))
}
}

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
// process websocket messages
println!("WS: {msg:?}");
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
AsMessage::Hb => {
self.handle_hb(state, &myself).await?;
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();

AsMessage::Ws(msg) => {
self.handle_ws_msg(msg, state, myself).await?;
}
_ => ctx.stop(),
}

Ok(())
}
}
Loading