@@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
7
7
use std:: time:: { Instant , Duration } ;
8
8
9
9
use bytesize:: ByteSize ;
10
+ use curl;
11
+ use curl_sys;
10
12
use curl:: easy:: { Easy , HttpVersion } ;
11
13
use curl:: multi:: { Multi , EasyHandle } ;
12
14
use lazycell:: LazyCell ;
@@ -255,11 +257,10 @@ pub struct PackageSet<'cfg> {
255
257
256
258
pub struct Downloads < ' a , ' cfg : ' a > {
257
259
set : & ' a PackageSet < ' cfg > ,
258
- pending : HashMap < usize , ( Download , EasyHandle ) > ,
260
+ pending : HashMap < usize , ( Download < ' cfg > , EasyHandle ) > ,
259
261
pending_ids : HashSet < PackageId > ,
260
- results : Vec < ( usize , CargoResult < ( ) > ) > ,
262
+ results : Vec < ( usize , Result < ( ) , curl :: Error > ) > ,
261
263
next : usize ,
262
- retry : Retry < ' cfg > ,
263
264
progress : RefCell < Option < Progress < ' cfg > > > ,
264
265
downloads_finished : usize ,
265
266
downloaded_bytes : u64 ,
@@ -268,15 +269,51 @@ pub struct Downloads<'a, 'cfg: 'a> {
268
269
success : bool ,
269
270
}
270
271
271
- struct Download {
272
+ struct Download < ' cfg > {
273
+ /// Token for this download, used as the key of the `Downloads::pending` map
274
+ /// and stored in `EasyHandle` as well.
272
275
token : usize ,
276
+
277
+ /// Package that we're downloading
273
278
id : PackageId ,
279
+
280
+ /// Actual downloaded data, updated throughout the lifetime of this download
274
281
data : RefCell < Vec < u8 > > ,
282
+
283
+ /// The URL that we're downloading from, cached here for error messages and
284
+ /// reenqueuing.
275
285
url : String ,
286
+
287
+ /// A descriptive string to print when we've finished downloading this crate
276
288
descriptor : String ,
289
+
290
+ /// Statistics updated from the progress callback in libcurl
277
291
total : Cell < u64 > ,
278
292
current : Cell < u64 > ,
293
+
294
+ /// The moment we started this transfer at
279
295
start : Instant ,
296
+
297
+ /// Last time we noticed that we got some more data from libcurl
298
+ updated_at : Cell < Instant > ,
299
+
300
+ /// Timeout management, both of timeout thresholds as well as whether or not
301
+ /// our connection has timed out (and accompanying message if it has).
302
+ ///
303
+ /// Note that timeout management is done manually here because we have a
304
+ /// `Multi` with a lot of active transfers but between transfers finishing
305
+ /// we perform some possibly slow synchronous work (like grabbing file
306
+ /// locks, extracting tarballs, etc). The default timers on our `Multi` keep
307
+ /// running during this work, but we don't want them to count towards timing
308
+ /// everythig out. As a result, we manage this manually and take the time
309
+ /// for synchronous work into account manually.
310
+ timeout : ops:: HttpTimeout ,
311
+ timed_out : Cell < Option < String > > ,
312
+ next_speed_check : Cell < Instant > ,
313
+ next_speed_check_bytes_threshold : Cell < u64 > ,
314
+
315
+ /// Logic used to track retrying this download if it's a spurious failure.
316
+ retry : Retry < ' cfg > ,
280
317
}
281
318
282
319
impl < ' cfg > PackageSet < ' cfg > {
@@ -329,7 +366,6 @@ impl<'cfg> PackageSet<'cfg> {
329
366
pending : HashMap :: new ( ) ,
330
367
pending_ids : HashSet :: new ( ) ,
331
368
results : Vec :: new ( ) ,
332
- retry : Retry :: new ( self . config ) ?,
333
369
progress : RefCell :: new ( Some ( Progress :: with_style (
334
370
"Downloading" ,
335
371
ProgressStyle :: Ratio ,
@@ -410,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
410
446
debug ! ( "downloading {} as {}" , id, token) ;
411
447
assert ! ( self . pending_ids. insert( id. clone( ) ) ) ;
412
448
413
- let mut handle = ops:: http_handle ( self . set . config ) ?;
449
+ let ( mut handle, timeout ) = ops:: http_handle_and_timeout ( self . set . config ) ?;
414
450
handle. get ( true ) ?;
415
451
handle. url ( & url) ?;
416
452
handle. follow_location ( true ) ?; // follow redirects
@@ -448,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
448
484
handle. progress ( true ) ?;
449
485
handle. progress_function ( move |dl_total, dl_cur, _, _| {
450
486
tls:: with ( |downloads| {
451
- let downloads = match downloads {
452
- Some ( d) => d,
453
- None => return false ,
454
- } ;
455
- let dl = & downloads. pending [ & token] . 0 ;
456
- dl. total . set ( dl_total as u64 ) ;
457
- dl. current . set ( dl_cur as u64 ) ;
458
- downloads. tick ( WhyTick :: DownloadUpdate ) . is_ok ( )
487
+ match downloads {
488
+ Some ( d) => d. progress ( token, dl_total as u64 , dl_cur as u64 ) ,
489
+ None => false ,
490
+ }
459
491
} )
460
492
} ) ?;
461
493
@@ -469,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
469
501
self . set . config . shell ( ) . status ( "Downloading" , "crates ..." ) ?;
470
502
}
471
503
504
+ let now = Instant :: now ( ) ;
472
505
let dl = Download {
473
506
token,
474
507
data : RefCell :: new ( Vec :: new ( ) ) ,
@@ -478,6 +511,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
478
511
total : Cell :: new ( 0 ) ,
479
512
current : Cell :: new ( 0 ) ,
480
513
start : Instant :: now ( ) ,
514
+ updated_at : Cell :: new ( now) ,
515
+ timeout,
516
+ timed_out : Cell :: new ( None ) ,
517
+ next_speed_check : Cell :: new ( now) ,
518
+ next_speed_check_bytes_threshold : Cell :: new ( 0 ) ,
519
+ retry : Retry :: new ( self . set . config ) ?,
481
520
} ;
482
521
self . enqueue ( dl, handle) ?;
483
522
self . tick ( WhyTick :: DownloadStarted ) ?;
@@ -514,12 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
514
553
// then we want to re-enqueue our request for another attempt and
515
554
// then we wait for another request to finish.
516
555
let ret = {
517
- self . retry . try ( || {
518
- result?;
556
+ let timed_out = & dl. timed_out ;
557
+ let url = & dl. url ;
558
+ dl. retry . try ( || {
559
+ if let Err ( e) = result {
560
+ // If this error is "aborted by callback" then that's
561
+ // probably because our progress callback aborted due to
562
+ // a timeout. We'll find out by looking at the
563
+ // `timed_out` field, looking for a descriptive message.
564
+ // If one is found we switch the error code (to ensure
565
+ // it's flagged as spurious) and then attach our extra
566
+ // information to the error.
567
+ if !e. is_aborted_by_callback ( ) {
568
+ return Err ( e. into ( ) )
569
+ }
570
+
571
+ return Err ( match timed_out. replace ( None ) {
572
+ Some ( msg) => {
573
+ let code = curl_sys:: CURLE_OPERATION_TIMEDOUT ;
574
+ let mut err = curl:: Error :: new ( code) ;
575
+ err. set_extra ( msg) ;
576
+ err
577
+ }
578
+ None => e,
579
+ } . into ( ) )
580
+ }
519
581
520
582
let code = handle. response_code ( ) ?;
521
583
if code != 200 && code != 0 {
522
- let url = handle. effective_url ( ) ?. unwrap_or ( & dl . url ) ;
584
+ let url = handle. effective_url ( ) ?. unwrap_or ( url) ;
523
585
return Err ( HttpNot200 {
524
586
code,
525
587
url : url. to_string ( ) ,
@@ -568,20 +630,39 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
568
630
let source = sources
569
631
. get_mut ( dl. id . source_id ( ) )
570
632
. ok_or_else ( || internal ( format ! ( "couldn't find source for `{}`" , dl. id) ) ) ?;
633
+ let start = Instant :: now ( ) ;
571
634
let pkg = source. finish_download ( & dl. id , data) ?;
635
+
636
+ // Assume that no time has passed while we were calling
637
+ // `finish_download`, update all speed checks and timeout limits of all
638
+ // active downloads to make sure they don't fire because of a slowly
639
+ // extracted tarball.
640
+ let finish_dur = start. elapsed ( ) ;
641
+ for ( dl, _) in self . pending . values_mut ( ) {
642
+ dl. updated_at . set ( dl. updated_at . get ( ) + finish_dur) ;
643
+ dl. next_speed_check . set ( dl. next_speed_check . get ( ) + finish_dur) ;
644
+ }
645
+
572
646
let slot = & self . set . packages [ & dl. id ] ;
573
647
assert ! ( slot. fill( pkg) . is_ok( ) ) ;
574
648
Ok ( slot. borrow ( ) . unwrap ( ) )
575
649
}
576
650
577
- fn enqueue ( & mut self , dl : Download , handle : Easy ) -> CargoResult < ( ) > {
651
+ fn enqueue ( & mut self , dl : Download < ' cfg > , handle : Easy ) -> CargoResult < ( ) > {
578
652
let mut handle = self . set . multi . add ( handle) ?;
653
+ let now = Instant :: now ( ) ;
579
654
handle. set_token ( dl. token ) ?;
655
+ dl. timed_out . set ( None ) ;
656
+ dl. updated_at . set ( now) ;
657
+ dl. current . set ( 0 ) ;
658
+ dl. total . set ( 0 ) ;
659
+ dl. next_speed_check . set ( now + dl. timeout . dur ) ;
660
+ dl. next_speed_check_bytes_threshold . set ( dl. timeout . low_speed_limit as u64 ) ;
580
661
self . pending . insert ( dl. token , ( dl, handle) ) ;
581
662
Ok ( ( ) )
582
663
}
583
664
584
- fn wait_for_curl ( & mut self ) -> CargoResult < ( usize , CargoResult < ( ) > ) > {
665
+ fn wait_for_curl ( & mut self ) -> CargoResult < ( usize , Result < ( ) , curl :: Error > ) > {
585
666
// This is the main workhorse loop. We use libcurl's portable `wait`
586
667
// method to actually perform blocking. This isn't necessarily too
587
668
// efficient in terms of fd management, but we should only be juggling
@@ -609,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
609
690
let token = msg. token ( ) . expect ( "failed to read token" ) ;
610
691
let handle = & pending[ & token] . 1 ;
611
692
if let Some ( result) = msg. result_for ( & handle) {
612
- results. push ( ( token, result. map_err ( |e| e . into ( ) ) ) ) ;
693
+ results. push ( ( token, result) ) ;
613
694
} else {
614
695
debug ! ( "message without a result (?)" ) ;
615
696
}
@@ -619,11 +700,59 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
619
700
break Ok ( pair)
620
701
}
621
702
assert ! ( self . pending. len( ) > 0 ) ;
622
- self . set . multi . wait ( & mut [ ] , Duration :: new ( 60 , 0 ) )
703
+ let timeout = self . set . multi . get_timeout ( ) ?
704
+ . unwrap_or ( Duration :: new ( 5 , 0 ) ) ;
705
+ self . set . multi . wait ( & mut [ ] , timeout)
623
706
. chain_err ( || "failed to wait on curl `Multi`" ) ?;
624
707
}
625
708
}
626
709
710
+ fn progress ( & self , token : usize , total : u64 , cur : u64 ) -> bool {
711
+ let dl = & self . pending [ & token] . 0 ;
712
+ dl. total . set ( total) ;
713
+ let now = Instant :: now ( ) ;
714
+ if cur != dl. current . get ( ) {
715
+ dl. current . set ( cur) ;
716
+ dl. updated_at . set ( now) ;
717
+
718
+ if dl. current . get ( ) >= dl. next_speed_check_bytes_threshold . get ( ) {
719
+ dl. next_speed_check . set ( now + dl. timeout . dur ) ;
720
+ dl. next_speed_check_bytes_threshold . set (
721
+ dl. current . get ( ) + dl. timeout . low_speed_limit as u64 ,
722
+ ) ;
723
+ }
724
+ }
725
+ if !self . tick ( WhyTick :: DownloadUpdate ) . is_ok ( ) {
726
+ return false
727
+ }
728
+
729
+ // If we've spent too long not actually receiving any data we time out.
730
+ if now - dl. updated_at . get ( ) > dl. timeout . dur {
731
+ let msg = format ! ( "failed to download any data for `{}` within {}s" ,
732
+ dl. id,
733
+ dl. timeout. dur. as_secs( ) ) ;
734
+ dl. timed_out . set ( Some ( msg) ) ;
735
+ return false
736
+ }
737
+
738
+ // If we reached the point in time that we need to check our speed
739
+ // limit, see if we've transferred enough data during this threshold. If
740
+ // it fails this check then we fail because the download is going too
741
+ // slowly.
742
+ if now >= dl. next_speed_check . get ( ) {
743
+ assert ! ( dl. current. get( ) < dl. next_speed_check_bytes_threshold. get( ) ) ;
744
+ let msg = format ! ( "download of `{}` failed to transfer more \
745
+ than {} bytes in {}s",
746
+ dl. id,
747
+ dl. timeout. low_speed_limit,
748
+ dl. timeout. dur. as_secs( ) ) ;
749
+ dl. timed_out . set ( Some ( msg) ) ;
750
+ return false
751
+ }
752
+
753
+ true
754
+ }
755
+
627
756
fn tick ( & self , why : WhyTick ) -> CargoResult < ( ) > {
628
757
let mut progress = self . progress . borrow_mut ( ) ;
629
758
let progress = progress. as_mut ( ) . unwrap ( ) ;
0 commit comments