21
21
use std:: io;
22
22
use std:: net:: { Shutdown , SocketAddr } ;
23
23
use std:: sync:: atomic:: { AtomicBool , AtomicU16 , Ordering } ;
24
- use std:: sync:: mpsc:: channel;
25
- use std:: sync:: { Arc , Mutex } ;
24
+ use std:: sync:: { mpsc, Arc , Mutex } ;
26
25
use std:: thread;
27
26
use std:: time:: Duration ;
28
27
@@ -31,7 +30,7 @@ use std::net::{TcpListener, TcpStream};
31
30
use crate :: args:: Args ;
32
31
use crate :: protocol:: communication:: { receive, send} ;
33
32
use crate :: protocol:: messaging:: { prepare_connect, prepare_connect_ready} ;
34
- use crate :: protocol:: results:: ServerDoneResult ;
33
+ use crate :: protocol:: results:: { IntervalResultBox , ServerDoneResult } ;
35
34
use crate :: stream:: { tcp, udp, TestStream } ;
36
35
use crate :: BoxResult ;
37
36
@@ -56,10 +55,7 @@ fn handle_client(
56
55
let mut parallel_streams: Vec < Arc < Mutex < ( dyn TestStream + Sync + Send ) > > > = Vec :: new ( ) ;
57
56
let mut parallel_streams_joinhandles = Vec :: new ( ) ;
58
57
59
- let ( results_tx, results_rx) : (
60
- std:: sync:: mpsc:: Sender < crate :: protocol:: results:: IntervalResultBox > ,
61
- std:: sync:: mpsc:: Receiver < crate :: protocol:: results:: IntervalResultBox > ,
62
- ) = channel ( ) ;
58
+ let ( results_tx, results_rx) = mpsc:: channel :: < IntervalResultBox > ( ) ;
63
59
64
60
//a closure used to pass results from stream-handlers to the client-communication stream
65
61
let mut forwarding_send_stream = stream. try_clone ( ) ?;
@@ -295,12 +291,12 @@ impl Drop for ClientThreadMonitor {
295
291
pub fn serve ( args : & Args ) -> BoxResult < ( ) > {
296
292
//config-parsing and pre-connection setup
297
293
let tcp_port_pool = Arc :: new ( Mutex :: new ( tcp:: receiver:: TcpPortPool :: new (
298
- args. tcp_port_pool . to_string ( ) ,
299
- args. tcp6_port_pool . to_string ( ) ,
294
+ & args. tcp_port_pool ,
295
+ & args. tcp6_port_pool ,
300
296
) ) ) ;
301
297
let udp_port_pool = Arc :: new ( Mutex :: new ( udp:: receiver:: UdpPortPool :: new (
302
- args. udp_port_pool . to_string ( ) ,
303
- args. udp6_port_pool . to_string ( ) ,
298
+ & args. udp_port_pool ,
299
+ & args. udp6_port_pool ,
304
300
) ) ) ;
305
301
306
302
let cpu_affinity_manager = Arc :: new ( Mutex :: new ( crate :: utils:: cpu_affinity:: CpuAffinityManager :: new ( & args. affinity ) ?) ) ;
@@ -317,53 +313,54 @@ pub fn serve(args: &Args) -> BoxResult<()> {
317
313
log:: info!( "server listening on {}" , listener. local_addr( ) ?) ;
318
314
319
315
while is_alive ( ) {
320
- match listener. accept ( ) {
321
- Ok ( ( mut stream, address) ) => {
322
- log:: info!( "connection from {}" , address) ;
323
-
324
- stream. set_nodelay ( true ) . expect ( "cannot disable Nagle's algorithm" ) ;
325
-
326
- #[ cfg( unix) ]
327
- {
328
- use crate :: protocol:: communication:: KEEPALIVE_DURATION ;
329
- let keepalive_parameters = socket2:: TcpKeepalive :: new ( ) . with_time ( KEEPALIVE_DURATION ) ;
330
- let raw_socket = socket2:: SockRef :: from ( & stream) ;
331
- raw_socket. set_tcp_keepalive ( & keepalive_parameters) ?;
332
- }
333
-
334
- let client_count = CLIENTS . fetch_add ( 1 , Ordering :: Relaxed ) + 1 ;
335
- if client_limit > 0 && client_count > client_limit {
336
- log:: warn!( "client-limit ({}) reached; disconnecting {}..." , client_limit, address. to_string( ) ) ;
337
- stream. shutdown ( Shutdown :: Both ) . unwrap_or_default ( ) ;
338
- CLIENTS . fetch_sub ( 1 , Ordering :: Relaxed ) ;
339
- } else {
340
- let c_cam = cpu_affinity_manager. clone ( ) ;
341
- let c_tcp_port_pool = tcp_port_pool. clone ( ) ;
342
- let c_udp_port_pool = udp_port_pool. clone ( ) ;
343
- let thread_builder = thread:: Builder :: new ( ) . name ( address. to_string ( ) ) ;
344
- thread_builder. spawn ( move || {
345
- //ensure the client is accounted-for even if the handler panics
346
- let _client_thread_monitor = ClientThreadMonitor {
347
- client_address : address. to_string ( ) ,
348
- } ;
349
-
350
- match handle_client ( & mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
351
- Ok ( _) => ( ) ,
352
- Err ( e) => log:: error!( "error in client-handler: {}" , e) ,
353
- }
354
-
355
- //in the event of panic, this will happen when the stream is dropped
356
- stream. shutdown ( Shutdown :: Both ) . unwrap_or_default ( ) ;
357
- } ) ?;
358
- }
359
- }
316
+ let ( mut stream, address) = match listener. accept ( ) {
317
+ Ok ( ( stream, address) ) => ( stream, address) ,
360
318
Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock => {
361
- //no pending clients
319
+ // no pending clients
362
320
thread:: sleep ( POLL_TIMEOUT ) ;
321
+ continue ;
363
322
}
364
323
Err ( e) => {
365
324
return Err ( Box :: new ( e) ) ;
366
325
}
326
+ } ;
327
+
328
+ log:: info!( "connection from {}" , address) ;
329
+
330
+ stream. set_nodelay ( true ) . expect ( "cannot disable Nagle's algorithm" ) ;
331
+
332
+ #[ cfg( unix) ]
333
+ {
334
+ use crate :: protocol:: communication:: KEEPALIVE_DURATION ;
335
+ let keepalive_parameters = socket2:: TcpKeepalive :: new ( ) . with_time ( KEEPALIVE_DURATION ) ;
336
+ let raw_socket = socket2:: SockRef :: from ( & stream) ;
337
+ raw_socket. set_tcp_keepalive ( & keepalive_parameters) ?;
338
+ }
339
+
340
+ let client_count = CLIENTS . fetch_add ( 1 , Ordering :: Relaxed ) + 1 ;
341
+ if client_limit > 0 && client_count > client_limit {
342
+ log:: warn!( "client-limit ({}) reached; disconnecting {}..." , client_limit, address. to_string( ) ) ;
343
+ stream. shutdown ( Shutdown :: Both ) . unwrap_or_default ( ) ;
344
+ CLIENTS . fetch_sub ( 1 , Ordering :: Relaxed ) ;
345
+ } else {
346
+ let c_cam = cpu_affinity_manager. clone ( ) ;
347
+ let c_tcp_port_pool = tcp_port_pool. clone ( ) ;
348
+ let c_udp_port_pool = udp_port_pool. clone ( ) ;
349
+ let thread_builder = thread:: Builder :: new ( ) . name ( address. to_string ( ) ) ;
350
+ thread_builder. spawn ( move || {
351
+ // ensure the client is accounted-for even if the handler panics
352
+ let _client_thread_monitor = ClientThreadMonitor {
353
+ client_address : address. to_string ( ) ,
354
+ } ;
355
+
356
+ match handle_client ( & mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
357
+ Ok ( _) => ( ) ,
358
+ Err ( e) => log:: error!( "error in client-handler: {}" , e) ,
359
+ }
360
+
361
+ //in the event of panic, this will happen when the stream is dropped
362
+ stream. shutdown ( Shutdown :: Both ) . unwrap_or_default ( ) ;
363
+ } ) ?;
367
364
}
368
365
}
369
366
0 commit comments