diff --git a/src/client/mod.rs b/src/client/mod.rs index 04e9503..13a6966 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -51,28 +51,28 @@ pub trait Connectable { impl NewClient { pub fn from_socket(left: TcpStream, list: ServerList, handle: Handle) - -> Box> { + -> impl Future { let src_dest = future::result(left.peer_addr()) .join(future::result(get_original_dest(&left))) .map_err(|err| warn!("fail to get original dest: {}", err)); - Box::new(src_dest.map(move |(src, dest)| { + src_dest.map(move |(src, dest)| { NewClient { left, src, dest: dest.into(), list, handle, } - })) + }) } } impl NewClient { pub fn retrive_dest(self) - -> Box> { + -> impl Future { let NewClient { left, src, mut dest, list, handle } = self; let wait = Duration::from_millis(500); // try to read TLS ClientHello for // 1. --remote-dns: parse host name from SNI // 2. --n-parallel: need the whole request to be forwarded let read = read_with_timeout(left, vec![0u8; 2048], wait, &handle); - let result = read.map(move |(left, mut data, len)| { + read.map(move |(left, mut data, len)| { let (allow_parallel, pending_data) = if len == 0 { info!("no tls request received before timeout"); (false, None) @@ -101,8 +101,7 @@ impl NewClient { client: NewClient { left, src, dest, list, handle }, allow_parallel, pending_data, } - }).map_err(|err| warn!("fail to read hello from client: {}", err)); - Box::new(result) + }).map_err(|err| warn!("fail to read hello from client: {}", err)) } fn connect_server(self, n_parallel: usize, wait_response: bool, diff --git a/src/main.rs b/src/main.rs index 1141c43..aedafbb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,13 +82,14 @@ fn main() { let mut sock_file = None; if let Some(http_addr) = args.value_of("web-bind") { let monitor = monitor.clone(); - let serv = if http_addr.starts_with("/") { + if http_addr.starts_with("/") { let sock = AutoRemoveFile::new(&http_addr); let incoming = UnixListener::bind(&sock, &handle) .expect("fail to bind web server") .incoming(); sock_file = Some(sock); - web::run_server(incoming, monitor, &handle) + let serv = web::run_server(incoming, monitor, &handle); + handle.spawn(serv); } else { // FIXME: remove duplicate code let addr = http_addr.parse() @@ -96,9 +97,9 @@ fn main() { let incoming = TcpListener::bind(&addr, &handle) .expect("fail to bind web server") .incoming(); - web::run_server(incoming, monitor, &handle) - }; - handle.spawn(serv); + let serv = web::run_server(incoming, monitor, &handle); + handle.spawn(serv); + } info!("http run on {}", http_addr); } diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 3e657f7..7461ed2 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -57,11 +57,11 @@ impl Monitor { /// Start monitoring delays. /// Returned Future won't return unless error on timer. pub fn monitor_delay(&self, probe: u64, handle: &Handle) - -> Box> { + -> impl Future { let handle = handle.clone(); let init = test_all(self.clone(), true, &handle); let interval = Duration::from_secs(probe); - let update = init.and_then(move |monitor| { + init.and_then(move |monitor| { future::loop_fn((monitor, handle), move |(monitor, handle)| { let wait = Timeout::new(interval, &handle) .expect("error on get timeout from reactor") @@ -71,17 +71,16 @@ impl Monitor { .map(|monitor| (monitor, handle)) }).and_then(|args| Ok(Loop::Continue(args))) }) - }).map_err(|_| ()); - Box::new(update) + }).map_err(|_| ()) } /// Start monitoring throughput. /// Returned Future won't return unless error on timer. pub fn monitor_throughput(&self, handle: &Handle) - -> Box> { + -> impl Future { let interval = Duration::from_secs(THROUGHPUT_INTERVAL_SECS); let handle = handle.clone(); - let lp = future::loop_fn(self.clone(), move |monitor| { + future::loop_fn(self.clone(), move |monitor| { for (server, meter) in monitor.meters.borrow_mut().iter_mut() { meter.add_sample(server.traffic()); } @@ -89,8 +88,7 @@ impl Monitor { .expect("error on get timeout from reactor") .map_err(|err| panic!("error on timer: {}", err)) .map(move |_| Loop::Continue(monitor)) - }); - Box::new(lp) + }) } /// Return average throughputs of all servers in the recent monitor @@ -115,7 +113,7 @@ fn info_stats(infos: &ServerList) -> String { } fn test_all(monitor: Monitor, init: bool, handle: &Handle) - -> Box> { + -> impl Future { debug!("testing all servers..."); let tests: Vec<_> = monitor.servers().into_iter().map(move |server| { let test = alive_test(&server, handle).then(move |result| { @@ -128,15 +126,14 @@ fn test_all(monitor: Monitor, init: bool, handle: &Handle) }); Box::new(test) as Box> }).collect(); - let sort = future::join_all(tests).then(move |_| { + future::join_all(tests).then(move |_| { monitor.resort(); Ok(monitor) - }); - Box::new(sort) + }) } fn alive_test(server: &ProxyServer, handle: &Handle) - -> Box> { + -> impl Future { let request = [ 0, 17, // length rand::random(), rand::random(), // transaction ID @@ -168,10 +165,9 @@ fn alive_test(server: &ProxyServer, handle: &Handle) } }); let wait = query.select(timeout).map_err(|(err, _)| err); - let delay = wait.and_then(|(result, _)| match result { + wait.and_then(|(result, _)| match result { Some(stream) => Ok(stream), None => Err(io::Error::new(io::ErrorKind::TimedOut, "test timeout")), - }).inspect(move |t| debug!("[{}] delay {}ms", tag, t.millis())); - Box::new(delay) + }).inspect(move |t| debug!("[{}] delay {}ms", tag, t.millis())) } diff --git a/src/web/mod.rs b/src/web/mod.rs index d583c54..4d4d58e 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -82,12 +82,12 @@ impl Service for StatusPages { .with_header(ContentType::plaintext()), }; debug!("{} {} [{}]", req.method(), req.path(), resp.status()); - return Box::new(future::ok(resp)); + Box::new(future::ok(resp)) } } pub fn run_server(incoming: I, monitor: Monitor, handle: &Handle) - -> Box> + -> impl Future where I: Stream + 'static, S: AsyncRead + AsyncWrite + 'static, A: Debug { @@ -102,13 +102,12 @@ where I: Stream + 'static, let serve = Http::new() .serve_incoming(incoming, new_service); let handle = handle.clone(); - let run = serve.for_each(move |conn| { + serve.for_each(move |conn| { handle.spawn( conn.map(|_| ()) .map_err(|err| info!("http: {}", err)) ); Ok(()) - }).map_err(|err| error!("error on http server: {}", err)); - Box::new(run) + }).map_err(|err| error!("error on http server: {}", err)) }