Skip to content

Commit

Permalink
bug fixed, timestamp calculation error, MSRV 1.33
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Sep 29, 2022
1 parent 117d791 commit a0427ef
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio_kcp"
version = "0.8.2"
version = "0.9.0"
authors = ["Matrix <[email protected]>", "Y. T. Chung <[email protected]>"]
description = "A kcp implementation for tokio"
license = "MIT"
Expand All @@ -13,7 +13,7 @@ edition = "2018"
[dependencies]
bytes = "1.1"
futures = "0.3"
kcp = "0.4.16"
kcp = "0.5.0"
log = "0.4"
tokio = { version = "1.11", features = ["net", "sync", "rt", "macros", "time"] }
byte_string = "1"
Expand Down
18 changes: 14 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl KcpSession {
Ok(n) => {
let input_buffer = &input_buffer[..n];
let input_conv = kcp::get_conv(input_buffer);
trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}", n, input_conv, ByteStr::new(input_buffer));
trace!("[SESSION] UDP recv {} bytes, conv: {}, going to input {:?}",
n, input_conv, ByteStr::new(input_buffer));

let mut socket = session.socket.lock();

Expand All @@ -108,7 +109,8 @@ impl KcpSession {
}
Ok(false) => {}
Err(err) => {
error!("[SESSION] UDP input {} bytes error: {}, input buffer {:?}", n, err, ByteStr::new(input_buffer));
error!("[SESSION] UDP input {} bytes error: {}, input buffer {:?}",
n, err, ByteStr::new(input_buffer));
}
}
}
Expand All @@ -120,8 +122,11 @@ impl KcpSession {
if let Some(input_buffer) = input_opt {
let mut socket = session.socket.lock();
match socket.input(&input_buffer) {
Ok(..) => {
trace!("[SESSION] UDP input {} bytes from channel {:?}", input_buffer.len(), ByteStr::new(&input_buffer));
Ok(waked) => {
// trace!("[SESSION] UDP input {} bytes from channel {:?}",
// input_buffer.len(), ByteStr::new(&input_buffer));
trace!("[SESSION] UDP input {} bytes from channel, waked? {} sender/receiver",
input_buffer.len(), waked);
}
Err(err) => {
error!("[SESSION] UDP input {} bytes from channel failed, error: {}, input buffer {:?}",
Expand Down Expand Up @@ -177,6 +182,11 @@ impl KcpSession {
}
}

// If window is full, flush it immediately
if socket.need_flush() {
let _ = socket.flush();
}

match socket.update() {
Ok(next_next) => Instant::from_std(next_next),
Err(err) => {
Expand Down
20 changes: 18 additions & 2 deletions src/skcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,27 @@ impl KcpSocket {
}

match self.kcp.recv(buf) {
Ok(0) | Err(KcpError::RecvQueueEmpty) | Err(KcpError::ExpectingFragment) => {
e @ (Ok(0) | Err(KcpError::RecvQueueEmpty) | Err(KcpError::ExpectingFragment)) => {
trace!(
"[RECV] rcvwnd={} peeksize={} r={:?}",
self.kcp.rcv_wnd(),
self.kcp.peeksize().unwrap_or(0),
e
);

if let Some(waker) = self.pending_receiver.replace(cx.waker().clone()) {
if !cx.waker().will_wake(&waker) {
waker.wake();
}
}

Poll::Pending
}
Err(err) => Err(err).into(),
Ok(n) => Ok(n).into(),
Ok(n) => {
self.last_update = Instant::now();
Ok(n).into()
}
}
}

Expand Down Expand Up @@ -302,6 +313,11 @@ impl KcpSocket {
pub fn last_update_time(&self) -> Instant {
self.last_update
}

pub fn need_flush(&self) -> bool {
(self.kcp.wait_snd() >= self.kcp.snd_wnd() as usize || self.kcp.wait_snd() >= self.kcp.rmt_wnd() as usize)
&& !self.kcp.waiting_conv()
}
}

#[cfg(test)]
Expand Down
2 changes: 0 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl KcpStream {
match ready!(kcp.poll_recv(cx, buf)) {
Ok(n) => {
trace!("[CLIENT] recv directly {} bytes", n);
self.session.notify();
return Ok(n).into();
}
Err(KcpError::UserBufTooSmall) => {}
Expand All @@ -108,7 +107,6 @@ impl KcpStream {
Ok(0) => return Ok(0).into(),
Ok(n) => {
trace!("[CLIENT] recv buffered {} bytes", n);
self.session.notify();
self.recv_buffer_pos = 0;
self.recv_buffer_cap = n;
}
Expand Down
3 changes: 2 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ use std::time::{SystemTime, UNIX_EPOCH};
pub fn now_millis() -> u32 {
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("time went afterwards");
(since_the_epoch.as_secs() * 1000 + since_the_epoch.subsec_millis() as u64 / 1_000_000) as u32
// (since_the_epoch.as_secs() * 1000 + since_the_epoch.subsec_millis() as u64) as u32
since_the_epoch.as_millis() as u32
}

0 comments on commit a0427ef

Please sign in to comment.