Skip to content

Commit

Permalink
Wait for data channel open
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Sep 1, 2023
1 parent 7e7054d commit a33d096
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 21 deletions.
31 changes: 21 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions core/src/swarm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,9 @@ impl Swarm {
) -> Result<()> {
match events.as_slice() {
[] => Ok(()),
[x] => {
let evs = self.handle_message_handler_event(x).await?;
self.handle_message_handler_events(&evs).await
}
[x, xs @ ..] => {
self.handle_message_handler_events(&vec![x.clone()]).await?;
let evs = self.handle_message_handler_event(x).await?;
self.handle_message_handler_events(&evs).await?;
self.handle_message_handler_events(&xs.to_vec()).await
}
}
Expand Down
3 changes: 2 additions & 1 deletion transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ edition = "2021"
# Include nothing by default
default = []
dummy = []
native-webrtc = ["webrtc"]
native-webrtc = ["webrtc", "tokio"]
web-sys-webrtc = ["wasm-bindgen", "js-sys", "web-sys"]

[dependencies]
# Dependencies for webrtc feature
tokio = { version = "1.32.0", optional = true, features = ["time"] }
webrtc = { version = "0.8.0", optional = true }

# Dependencies for web_sys_webrtc feature
Expand Down
48 changes: 43 additions & 5 deletions transport/src/connections/native_webrtc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::data_channel::data_channel_state::RTCDataChannelState;
use webrtc::data_channel::RTCDataChannel;
use webrtc::ice::mdns::MulticastDnsMode;
use webrtc::ice_transport::ice_candidate_type::RTCIceCandidateType;
Expand Down Expand Up @@ -32,12 +34,11 @@ pub struct WebrtcConnection {
}

impl WebrtcConnection {
pub async fn new(webrtc_conn: RTCPeerConnection) -> Result<Self> {
let webrtc_data_channel = webrtc_conn.create_data_channel("rings", None).await?;
Ok(Self {
pub fn new(webrtc_conn: RTCPeerConnection, webrtc_data_channel: Arc<RTCDataChannel>) -> Self {
Self {
webrtc_conn: Arc::new(webrtc_conn),
webrtc_data_channel,
})
}
}

async fn webrtc_gather(&self) -> Result<RTCSessionDescription> {
Expand All @@ -53,6 +54,25 @@ impl WebrtcConnection {
.ok_or(Error::WebrtcLocalSdpGenerationError)
}

async fn webrtc_wait_for_data_channel_ready(&self) -> Result<()> {
loop {
if matches!(
self.webrtc_connection_state(),
WebrtcConnectionState::Failed
| WebrtcConnectionState::Closed
| WebrtcConnectionState::Disconnected
) {
return Err(Error::DataChannelOpen("Connection unavailable".to_string()));
}

if self.webrtc_data_channel.ready_state() == RTCDataChannelState::Open {
return Ok(());
}

tokio::time::sleep(Duration::from_millis(100)).await;
}
}

async fn close(&self) -> Result<()> {
self.webrtc_conn.close().await.map_err(|e| e.into())
}
Expand All @@ -64,6 +84,7 @@ impl SharedConnection for WebrtcConnection {
type Error = Error;

async fn send_message(&self, msg: TransportMessage) -> Result<()> {
self.webrtc_wait_for_data_channel_ready().await?;
let data = bincode::serialize(&msg).map(Bytes::from)?;
self.webrtc_data_channel.send(&data).await?;
Ok(())
Expand Down Expand Up @@ -121,6 +142,9 @@ impl SharedTransport for Transport<WebrtcConnection> {
where
CE: std::error::Error + Send + Sync + 'static,
{
//
// Setup webrtc connection env
//
let ice_servers = self.ice_servers.iter().cloned().map(|x| x.into()).collect();

let webrtc_config = RTCConfiguration {
Expand All @@ -142,8 +166,14 @@ impl SharedTransport for Transport<WebrtcConnection> {
.with_setting_engine(setting)
.build();

//
// Create webrtc connection
//
let webrtc_conn = webrtc_api.new_peer_connection(webrtc_config).await?;

//
// Set callbacks
//
let inner_cb = Arc::new(InnerCallback::new(cid, callback));

let data_channel_inner_cb = inner_cb.clone();
Expand Down Expand Up @@ -182,7 +212,15 @@ impl SharedTransport for Transport<WebrtcConnection> {
})
}));

let conn = WebrtcConnection::new(webrtc_conn).await?;
//
// Create data channel
//
let webrtc_data_channel = webrtc_conn.create_data_channel("rings", None).await?;

//
// Consturct the Connection

Check warning on line 221 in transport/src/connections/native_webrtc/mod.rs

View workflow job for this annotation

GitHub Actions / Check rustfmt style && run clippy

"Consturct" should be "Construct".
//
let conn = WebrtcConnection::new(webrtc_conn, webrtc_data_channel);
self.connections.insert(cid.to_string(), conn.clone());

Ok(conn)
Expand Down
3 changes: 3 additions & 0 deletions transport/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum Error {
#[error("IceServer error: {0}")]
IceServer(#[from] IceServerError),

#[error("Failed when waiting for data channel open: {0}")]
DataChannelOpen(String),

#[error("WebRTC local SDP generation error")]
WebrtcLocalSdpGenerationError,

Expand Down

0 comments on commit a33d096

Please sign in to comment.