Skip to content

Commit

Permalink
Refactor: use impl Trait
Browse files Browse the repository at this point in the history
Requires Rust 1.26
  • Loading branch information
sorz committed Apr 17, 2018
1 parent aa617ca commit 70d1e1e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 33 deletions.
13 changes: 6 additions & 7 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,28 @@ pub trait Connectable {

impl NewClient {
pub fn from_socket(left: TcpStream, list: ServerList, handle: Handle)
-> Box<Future<Item=Self, Error=()>> {
-> impl Future<Item=Self, Error=()> {
let src_dest = future::result(left.peer_addr())
.join(future::result(get_original_dest(&left)))
.map_err(|err| warn!("fail to get original dest: {}", err));
Box::new(src_dest.map(move |(src, dest)| {
src_dest.map(move |(src, dest)| {
NewClient {
left, src, dest: dest.into(), list, handle,
}
}))
})
}
}

impl NewClient {
pub fn retrive_dest(self)
-> Box<Future<Item=NewClientWithData, Error=()>> {
-> impl Future<Item=NewClientWithData, Error=()> {
let NewClient { left, src, mut dest, list, handle } = self;
let wait = Duration::from_millis(500);
// try to read TLS ClientHello for
// 1. --remote-dns: parse host name from SNI
// 2. --n-parallel: need the whole request to be forwarded
let read = read_with_timeout(left, vec![0u8; 2048], wait, &handle);
let result = read.map(move |(left, mut data, len)| {
read.map(move |(left, mut data, len)| {
let (allow_parallel, pending_data) = if len == 0 {
info!("no tls request received before timeout");
(false, None)
Expand Down Expand Up @@ -101,8 +101,7 @@ impl NewClient {
client: NewClient { left, src, dest, list, handle },
allow_parallel, pending_data,
}
}).map_err(|err| warn!("fail to read hello from client: {}", err));
Box::new(result)
}).map_err(|err| warn!("fail to read hello from client: {}", err))
}

fn connect_server(self, n_parallel: usize, wait_response: bool,
Expand Down
11 changes: 6 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,24 @@ fn main() {
let mut sock_file = None;
if let Some(http_addr) = args.value_of("web-bind") {
let monitor = monitor.clone();
let serv = if http_addr.starts_with("/") {
if http_addr.starts_with("/") {
let sock = AutoRemoveFile::new(&http_addr);
let incoming = UnixListener::bind(&sock, &handle)
.expect("fail to bind web server")
.incoming();
sock_file = Some(sock);
web::run_server(incoming, monitor, &handle)
let serv = web::run_server(incoming, monitor, &handle);
handle.spawn(serv);
} else {
// FIXME: remove duplicate code
let addr = http_addr.parse()
.expect("not a valid address of TCP socket");
let incoming = TcpListener::bind(&addr, &handle)
.expect("fail to bind web server")
.incoming();
web::run_server(incoming, monitor, &handle)
};
handle.spawn(serv);
let serv = web::run_server(incoming, monitor, &handle);
handle.spawn(serv);
}
info!("http run on {}", http_addr);
}

Expand Down
28 changes: 12 additions & 16 deletions src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ impl Monitor {
/// Start monitoring delays.
/// Returned Future won't return unless error on timer.
pub fn monitor_delay(&self, probe: u64, handle: &Handle)
-> Box<Future<Item=(), Error=()>> {
-> impl Future<Item=(), Error=()> {
let handle = handle.clone();
let init = test_all(self.clone(), true, &handle);
let interval = Duration::from_secs(probe);
let update = init.and_then(move |monitor| {
init.and_then(move |monitor| {
future::loop_fn((monitor, handle), move |(monitor, handle)| {
let wait = Timeout::new(interval, &handle)
.expect("error on get timeout from reactor")
Expand All @@ -71,26 +71,24 @@ impl Monitor {
.map(|monitor| (monitor, handle))
}).and_then(|args| Ok(Loop::Continue(args)))
})
}).map_err(|_| ());
Box::new(update)
}).map_err(|_| ())
}

/// Start monitoring throughput.
/// Returned Future won't return unless error on timer.
pub fn monitor_throughput(&self, handle: &Handle)
-> Box<Future<Item=(), Error=()>> {
-> impl Future<Item=(), Error=()> {
let interval = Duration::from_secs(THROUGHPUT_INTERVAL_SECS);
let handle = handle.clone();
let lp = future::loop_fn(self.clone(), move |monitor| {
future::loop_fn(self.clone(), move |monitor| {
for (server, meter) in monitor.meters.borrow_mut().iter_mut() {
meter.add_sample(server.traffic());
}
Timeout::new(interval, &handle)
.expect("error on get timeout from reactor")
.map_err(|err| panic!("error on timer: {}", err))
.map(move |_| Loop::Continue(monitor))
});
Box::new(lp)
})
}

/// Return average throughputs of all servers in the recent monitor
Expand All @@ -115,7 +113,7 @@ fn info_stats(infos: &ServerList) -> String {
}

fn test_all(monitor: Monitor, init: bool, handle: &Handle)
-> Box<Future<Item=Monitor, Error=()>> {
-> impl Future<Item=Monitor, Error=()> {
debug!("testing all servers...");
let tests: Vec<_> = monitor.servers().into_iter().map(move |server| {
let test = alive_test(&server, handle).then(move |result| {
Expand All @@ -128,15 +126,14 @@ fn test_all(monitor: Monitor, init: bool, handle: &Handle)
});
Box::new(test) as Box<Future<Item=(), Error=()>>
}).collect();
let sort = future::join_all(tests).then(move |_| {
future::join_all(tests).then(move |_| {
monitor.resort();
Ok(monitor)
});
Box::new(sort)
})
}

fn alive_test(server: &ProxyServer, handle: &Handle)
-> Box<Future<Item=Duration, Error=io::Error>> {
-> impl Future<Item=Duration, Error=io::Error> {
let request = [
0, 17, // length
rand::random(), rand::random(), // transaction ID
Expand Down Expand Up @@ -168,10 +165,9 @@ fn alive_test(server: &ProxyServer, handle: &Handle)
}
});
let wait = query.select(timeout).map_err(|(err, _)| err);
let delay = wait.and_then(|(result, _)| match result {
wait.and_then(|(result, _)| match result {
Some(stream) => Ok(stream),
None => Err(io::Error::new(io::ErrorKind::TimedOut, "test timeout")),
}).inspect(move |t| debug!("[{}] delay {}ms", tag, t.millis()));
Box::new(delay)
}).inspect(move |t| debug!("[{}] delay {}ms", tag, t.millis()))
}

9 changes: 4 additions & 5 deletions src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ impl Service for StatusPages {
.with_header(ContentType::plaintext()),
};
debug!("{} {} [{}]", req.method(), req.path(), resp.status());
return Box::new(future::ok(resp));
Box::new(future::ok(resp))
}
}

pub fn run_server<I, S, A>(incoming: I, monitor: Monitor, handle: &Handle)
-> Box<Future<Item=(), Error=()>>
-> impl Future<Item=(), Error=()>
where I: Stream<Item=(S, A), Error=io::Error> + 'static,
S: AsyncRead + AsyncWrite + 'static,
A: Debug {
Expand All @@ -102,13 +102,12 @@ where I: Stream<Item=(S, A), Error=io::Error> + 'static,
let serve = Http::new()
.serve_incoming(incoming, new_service);
let handle = handle.clone();
let run = serve.for_each(move |conn| {
serve.for_each(move |conn| {
handle.spawn(
conn.map(|_| ())
.map_err(|err| info!("http: {}", err))
);
Ok(())
}).map_err(|err| error!("error on http server: {}", err));
Box::new(run)
}).map_err(|err| error!("error on http server: {}", err))
}

0 comments on commit 70d1e1e

Please sign in to comment.