@@ -21,6 +21,7 @@ use futures_channel::oneshot;
21
21
use futures_util:: future:: { self , FutureExt , TryFuture , TryFutureExt } ;
22
22
use support:: TokioIo ;
23
23
use tokio:: net:: TcpStream ;
24
+
24
25
mod support;
25
26
26
27
fn s ( buf : & [ u8 ] ) -> & str {
@@ -2009,6 +2010,74 @@ mod conn {
2009
2010
. expect_err ( "client should be closed" ) ;
2010
2011
}
2011
2012
2013
+ #[ tokio:: test]
2014
+ async fn http2_connect_detect_close ( ) {
2015
+ // Regression test for failure to fully close connections when using HTTP2 CONNECT
2016
+ // We send a request, read/write some data, then drop the client connection.
2017
+ use futures_util:: future;
2018
+ let ( listener, addr) = setup_tk_test_server ( ) . await ;
2019
+ let ( tx, rx) = oneshot:: channel :: < ( ) > ( ) ;
2020
+ const BODY : & [ u8 ] = b"hello world" ;
2021
+ tokio:: task:: spawn ( async move {
2022
+ use hyper:: server:: conn:: http2;
2023
+ use hyper:: service:: service_fn;
2024
+
2025
+ let res = listener. accept ( ) . await ;
2026
+ let ( stream, _) = res. unwrap ( ) ;
2027
+ let stream = TokioIo :: new ( stream) ;
2028
+
2029
+ let service = service_fn ( |req : Request < hyper:: body:: Incoming > | {
2030
+ tokio:: task:: spawn ( async move {
2031
+ let io = & mut TokioIo :: new ( hyper:: upgrade:: on ( req) . await . unwrap ( ) ) ;
2032
+ let mut buf: [ u8 ; BODY . len ( ) ] = [ 0 ; BODY . len ( ) ] ;
2033
+ io. read_exact ( & mut buf) . await . unwrap ( ) ;
2034
+ io. write_all ( BODY ) . await . unwrap ( ) ;
2035
+ } ) ;
2036
+
2037
+ future:: ok :: < _ , hyper:: Error > ( Response :: new ( Empty :: < Bytes > :: new ( ) ) )
2038
+ } ) ;
2039
+
2040
+ tokio:: task:: spawn ( async move {
2041
+ let conn = http2:: Builder :: new ( TokioExecutor ) . serve_connection ( stream, service) ;
2042
+ conn. await . unwrap ( ) ;
2043
+ drop ( tx) ;
2044
+ } ) ;
2045
+ } ) ;
2046
+
2047
+ let io = tcp_connect ( & addr) . await . expect ( "tcp connect" ) ;
2048
+ let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2049
+ . handshake ( io)
2050
+ . await
2051
+ . expect ( "http handshake" ) ;
2052
+
2053
+ tokio:: task:: spawn ( async move {
2054
+ conn. await . expect ( "client conn" ) ;
2055
+ } ) ;
2056
+
2057
+ // Sanity check that client is ready
2058
+ future:: poll_fn ( |ctx| client. poll_ready ( ctx) )
2059
+ . await
2060
+ . expect ( "client poll ready sanity" ) ;
2061
+
2062
+ let req = Request :: builder ( )
2063
+ . method ( Method :: CONNECT )
2064
+ . uri ( format ! ( "{}" , addr) )
2065
+ . body ( Empty :: < Bytes > :: new ( ) )
2066
+ . expect ( "request builder" ) ;
2067
+
2068
+ let resp = client. send_request ( req) . await . expect ( "req1 send" ) ;
2069
+ assert_eq ! ( resp. status( ) , 200 ) ;
2070
+ let io = & mut TokioIo :: new ( hyper:: upgrade:: on ( resp) . await . unwrap ( ) ) ;
2071
+
2072
+ let mut buf: [ u8 ; BODY . len ( ) ] = [ 0 ; BODY . len ( ) ] ;
2073
+ io. write_all ( BODY ) . await . unwrap ( ) ;
2074
+ io. read_exact ( & mut buf) . await . unwrap ( ) ;
2075
+ drop ( client) ;
2076
+ let _ = tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , rx)
2077
+ . await
2078
+ . unwrap ( ) ;
2079
+ }
2080
+
2012
2081
#[ tokio:: test]
2013
2082
async fn http2_keep_alive_detects_unresponsive_server ( ) {
2014
2083
let ( listener, addr) = setup_tk_test_server ( ) . await ;
0 commit comments