diff --git a/Cargo.lock b/Cargo.lock index 60d54d2..5e87a5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1079,7 +1079,7 @@ dependencies = [ [[package]] name = "omnip" -version = "0.4.18" +version = "0.4.19" dependencies = [ "android_logger", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index d2c8fc2..33aafd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnip" -version = "0.4.18" +version = "0.4.19" edition = "2021" [lib] diff --git a/src/server.rs b/src/server.rs index 4cb88bf..21c0f5d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -27,7 +27,7 @@ use std::net::SocketAddr; use std::path::Path; use std::str::{self, FromStr}; use std::sync::{Arc, Mutex}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::task::JoinHandle; @@ -384,7 +384,7 @@ impl Server { .await .map_err(|e| match e { ProxyError::BadRequest | ProxyError::BadGateway(_) => { - error!("{:?}", e); + error!("err: {e:?}"); } _ => {} }) @@ -395,7 +395,8 @@ impl Server { } Err(e) => { - error!("access server failed, err: {e}"); + error!("access server will wait due to err: {e}"); + tokio::time::sleep(Duration::from_secs(5)).await } } } @@ -459,11 +460,11 @@ impl Server { let mut buffer = [0u8; 512]; let mut proxy_handler = None; loop { - let nread = inbound_stream - .read(&mut buffer) - .await - .context("failed to read from inbound_stream") - .map_err(|_| ProxyError::BadRequest)?; + let nread = + tokio::time::timeout(Duration::from_secs(2), inbound_stream.read(&mut buffer)) + .await + .map_err(|_| ProxyError::BadRequest)? + .map_err(|_| ProxyError::BadRequest)?; if proxy_handler.is_none() { proxy_handler = Some(Self::create_proxy_handler( @@ -545,7 +546,10 @@ impl Server { } if outbound_type == OutboundType::Direct { - debug!("serve request directly: {addr}"); + debug!( + "serve request directly: {addr} from {}", + inbound_stream.peer_addr().unwrap() + ); outbound_stream = match &addr.host { Host::Domain(domain) => { let resolver = if addr.is_internal_domain() { @@ -628,14 +632,14 @@ impl Server { match dashboard_addr { Some(addr) => { debug!("dashboard request: {}", inbound_stream.peer_addr().unwrap()); - Ok(Self::create_tcp_stream(addr.clone()).await) + Ok(Self::create_tcp_stream(addr).await) } None => { log::warn!( "request routing to the proxy server itself is rejected: {}", inbound_stream.peer_addr().unwrap() ); - return Err(ProxyError::BadRequest); + Err(ProxyError::BadRequest) } } } @@ -671,9 +675,9 @@ impl Server { let result = match tokio::io::copy_bidirectional(&mut a_stream, &mut b_stream).await { Ok((tx_bytes, rx_bytes)) => { debug!( - "transfer, out:{tx_bytes}, in:{rx_bytes}, {:?} <-> {:?}", - a_stream.local_addr(), - b_stream.local_addr() + "transfer, out:{tx_bytes}, in:{rx_bytes}, {} <-> {:?}", + a_stream.local_addr().unwrap(), + b_stream.local_addr().unwrap(), ); stats_sender