@@ -460,7 +460,11 @@ fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Re
460
460
}
461
461
462
462
fn serve < M : jsonrpc:: Metadata , S : jsonrpc:: Middleware < M > > (
463
- signals : ( oneshot:: Receiver < ( ) > , mpsc:: Sender < io:: Result < SocketAddr > > , oneshot:: Sender < ( ) > ) ,
463
+ signals : (
464
+ oneshot:: Receiver < ( ) > ,
465
+ mpsc:: Sender < io:: Result < SocketAddr > > ,
466
+ oneshot:: Sender < ( ) > ,
467
+ ) ,
464
468
executor : tokio:: runtime:: TaskExecutor ,
465
469
addr : SocketAddr ,
466
470
cors_domains : CorsDomains ,
@@ -476,89 +480,89 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
476
480
max_request_body_size : usize ,
477
481
) {
478
482
let ( shutdown_signal, local_addr_tx, done_tx) = signals;
479
- executor. spawn ( future:: lazy ( move || {
480
- let handle = tokio:: reactor:: Handle :: default ( ) ;
481
-
482
- let bind = move || {
483
- let listener = match addr {
484
- SocketAddr :: V4 ( _) => net2:: TcpBuilder :: new_v4 ( ) ?,
485
- SocketAddr :: V6 ( _) => net2:: TcpBuilder :: new_v6 ( ) ?,
483
+ executor. spawn (
484
+ future:: lazy ( move || {
485
+ let handle = tokio:: reactor:: Handle :: default ( ) ;
486
+
487
+ let bind = move || {
488
+ let listener = match addr {
489
+ SocketAddr :: V4 ( _) => net2:: TcpBuilder :: new_v4 ( ) ?,
490
+ SocketAddr :: V6 ( _) => net2:: TcpBuilder :: new_v6 ( ) ?,
491
+ } ;
492
+ configure_port ( reuse_port, & listener) ?;
493
+ listener. reuse_address ( true ) ?;
494
+ listener. bind ( & addr) ?;
495
+ let listener = listener. listen ( 1024 ) ?;
496
+ let listener = tokio:: net:: TcpListener :: from_std ( listener, & handle) ?;
497
+ // Add current host to allowed headers.
498
+ // NOTE: we need to use `l.local_addr()` instead of `addr`
499
+ // it might be different!
500
+ let local_addr = listener. local_addr ( ) ?;
501
+
502
+ Ok ( ( listener, local_addr) )
486
503
} ;
487
- configure_port ( reuse_port, & listener) ?;
488
- listener. reuse_address ( true ) ?;
489
- listener. bind ( & addr) ?;
490
- let listener = listener. listen ( 1024 ) ?;
491
- let listener = tokio:: net:: TcpListener :: from_std ( listener, & handle) ?;
492
- // Add current host to allowed headers.
493
- // NOTE: we need to use `l.local_addr()` instead of `addr`
494
- // it might be different!
495
- let local_addr = listener. local_addr ( ) ?;
496
-
497
- Ok ( ( listener, local_addr) )
498
- } ;
499
504
500
- let bind_result = match bind ( ) {
501
- Ok ( ( listener, local_addr) ) => {
502
- // Send local address
503
- match local_addr_tx. send ( Ok ( local_addr) ) {
504
- Ok ( _) => futures:: future:: ok ( ( listener, local_addr) ) ,
505
- Err ( _) => {
506
- warn ! (
507
- "Thread {:?} unable to reach receiver, closing server" ,
508
- thread:: current( ) . name( )
509
- ) ;
510
- futures:: future:: err ( ( ) )
505
+ let bind_result = match bind ( ) {
506
+ Ok ( ( listener, local_addr) ) => {
507
+ // Send local address
508
+ match local_addr_tx. send ( Ok ( local_addr) ) {
509
+ Ok ( _) => futures:: future:: ok ( ( listener, local_addr) ) ,
510
+ Err ( _) => {
511
+ warn ! (
512
+ "Thread {:?} unable to reach receiver, closing server" ,
513
+ thread:: current( ) . name( )
514
+ ) ;
515
+ futures:: future:: err ( ( ) )
516
+ }
511
517
}
512
518
}
513
- }
514
- Err ( err) => {
515
- // Send error
516
- let _send_result = local_addr_tx. send ( Err ( err) ) ;
519
+ Err ( err) => {
520
+ // Send error
521
+ let _send_result = local_addr_tx. send ( Err ( err) ) ;
517
522
518
- futures:: future:: err ( ( ) )
519
- }
520
- } ;
523
+ futures:: future:: err ( ( ) )
524
+ }
525
+ } ;
521
526
522
- bind_result. and_then ( move |( listener, local_addr) | {
523
- let allowed_hosts = server_utils:: hosts:: update ( allowed_hosts, & local_addr) ;
524
-
525
- let mut http = server:: conn:: Http :: new ( ) ;
526
- http. keep_alive ( keep_alive) ;
527
- let tcp_stream = SuspendableStream :: new ( listener. incoming ( ) ) ;
528
-
529
- tcp_stream
530
- . for_each ( move |socket| {
531
- let service = ServerHandler :: new (
532
- jsonrpc_handler. clone ( ) ,
533
- cors_domains. clone ( ) ,
534
- cors_max_age,
535
- allowed_headers. clone ( ) ,
536
- allowed_hosts. clone ( ) ,
537
- request_middleware. clone ( ) ,
538
- rest_api,
539
- health_api. clone ( ) ,
540
- max_request_body_size,
541
- keep_alive,
542
- ) ;
543
- tokio:: spawn (
544
- http. serve_connection ( socket, service)
545
- . map_err ( |e| error ! ( "Error serving connection: {:?}" , e) ) ,
546
- ) ;
547
- Ok ( ( ) )
548
- } )
549
- . map_err ( |e| {
550
- warn ! ( "Incoming streams error, closing sever: {:?}" , e) ;
551
- } )
552
- . select ( shutdown_signal
553
- . map_err ( |e| {
554
- debug ! ( "Shutdown signaller dropped, closing server: {:?}" , e ) ;
555
- } ) )
556
- . map ( |_| ( ) )
557
- . map_err ( |_| ( ) )
527
+ bind_result. and_then ( move |( listener, local_addr) | {
528
+ let allowed_hosts = server_utils:: hosts:: update ( allowed_hosts, & local_addr) ;
529
+
530
+ let mut http = server:: conn:: Http :: new ( ) ;
531
+ http. keep_alive ( keep_alive) ;
532
+ let tcp_stream = SuspendableStream :: new ( listener. incoming ( ) ) ;
533
+
534
+ tcp_stream
535
+ . for_each ( move |socket| {
536
+ let service = ServerHandler :: new (
537
+ jsonrpc_handler. clone ( ) ,
538
+ cors_domains. clone ( ) ,
539
+ cors_max_age,
540
+ allowed_headers. clone ( ) ,
541
+ allowed_hosts. clone ( ) ,
542
+ request_middleware. clone ( ) ,
543
+ rest_api,
544
+ health_api. clone ( ) ,
545
+ max_request_body_size,
546
+ keep_alive,
547
+ ) ;
548
+ tokio:: spawn (
549
+ http. serve_connection ( socket, service)
550
+ . map_err ( |e| error ! ( "Error serving connection: {:?}" , e) ) ,
551
+ ) ;
552
+ Ok ( ( ) )
553
+ } )
554
+ . map_err ( |e| {
555
+ warn ! ( "Incoming streams error, closing sever: {:?}" , e) ;
556
+ } )
557
+ . select ( shutdown_signal. map_err ( |e| {
558
+ debug ! ( "Shutdown signaller dropped, closing server: {:?}" , e ) ;
559
+ } ) )
560
+ . map ( |_| ( ) )
561
+ . map_err ( |_| ( ) )
562
+ } )
558
563
} )
559
- } ) . and_then ( |_| {
560
- done_tx. send ( ( ) )
561
- } ) ) ;
564
+ . and_then ( |_| done_tx. send ( ( ) ) ) ,
565
+ ) ;
562
566
}
563
567
564
568
#[ cfg( unix) ]
@@ -586,12 +590,12 @@ pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>)
586
590
impl CloseHandle {
587
591
/// Shutdown a running server
588
592
pub fn close ( self ) {
589
- if let Some ( executors) = self . 0 . lock ( ) . take ( ) {
590
- for ( executor, closer) in executors {
591
- executor. close ( ) ;
592
- let _ = closer. send ( ( ) ) ;
593
- }
594
- }
593
+ if let Some ( executors) = self . 0 . lock ( ) . take ( ) {
594
+ for ( executor, closer) in executors {
595
+ executor. close ( ) ;
596
+ let _ = closer. send ( ( ) ) ;
597
+ }
598
+ }
595
599
}
596
600
}
597
601
0 commit comments