4
4
5
5
//! Tools for downloading blobs
6
6
7
- use anyhow:: { anyhow, Result } ;
7
+ use anyhow:: { anyhow, Context , Result } ;
8
8
use chrono:: { DateTime , FixedOffset , Utc } ;
9
9
use futures_util:: StreamExt ;
10
10
use reqwest:: header:: { CONTENT_LENGTH , LAST_MODIFIED } ;
11
- use std:: path:: Path ;
11
+ use ring:: digest:: { Context as DigestContext , Digest , SHA256 } ;
12
+ use std:: path:: { Path , PathBuf } ;
12
13
use std:: str:: FromStr ;
13
- use tokio:: io:: AsyncWriteExt ;
14
+ use tokio:: io:: { AsyncReadExt , AsyncWriteExt , BufReader } ;
14
15
15
- use crate :: progress:: Progress ;
16
+ use crate :: progress:: { NoProgress , Progress } ;
16
17
17
18
// Path to the blob S3 Bucket.
18
19
const S3_BUCKET : & str = "https://oxide-omicron-build.s3.amazonaws.com" ;
19
20
// Name for the directory component where downloaded blobs are stored.
20
21
pub ( crate ) const BLOB : & str = "blob" ;
21
22
23
+ #[ derive( Debug ) ]
24
+ pub enum Source < ' a > {
25
+ S3 ( & ' a PathBuf ) ,
26
+ Buildomat ( & ' a crate :: package:: PrebuiltBlob ) ,
27
+ }
28
+
29
+ impl < ' a > Source < ' a > {
30
+ pub ( crate ) fn get_url ( & self ) -> String {
31
+ match self {
32
+ Self :: S3 ( s) => format ! ( "{}/{}" , S3_BUCKET , s. to_string_lossy( ) ) ,
33
+ Self :: Buildomat ( spec) => {
34
+ format ! (
35
+ "https://buildomat.eng.oxide.computer/public/file/oxidecomputer/{}/{}/{}/{}" ,
36
+ spec. repo, spec. series, spec. commit, spec. artifact
37
+ )
38
+ }
39
+ }
40
+ }
41
+
42
+ async fn download_required (
43
+ & self ,
44
+ url : & str ,
45
+ client : & reqwest:: Client ,
46
+ destination : & Path ,
47
+ ) -> Result < bool > {
48
+ if !destination. exists ( ) {
49
+ return Ok ( true ) ;
50
+ }
51
+
52
+ match self {
53
+ Self :: S3 ( _) => {
54
+ // Issue a HEAD request to get the blob's size and last modified
55
+ // time. If these match what's on disk, assume the blob is
56
+ // current and don't re-download it.
57
+ let head_response = client
58
+ . head ( url)
59
+ . send ( )
60
+ . await ?
61
+ . error_for_status ( )
62
+ . with_context ( || format ! ( "HEAD failed for {}" , url) ) ?;
63
+ let headers = head_response. headers ( ) ;
64
+ let content_length = headers
65
+ . get ( CONTENT_LENGTH )
66
+ . ok_or_else ( || anyhow ! ( "no content length on {} HEAD response!" , url) ) ?;
67
+ let content_length: u64 = u64:: from_str ( content_length. to_str ( ) ?) ?;
68
+
69
+ // From S3, header looks like:
70
+ //
71
+ // "Last-Modified: Fri, 27 May 2022 20:50:17 GMT"
72
+ let last_modified = headers
73
+ . get ( LAST_MODIFIED )
74
+ . ok_or_else ( || anyhow ! ( "no last modified on {} HEAD response!" , url) ) ?;
75
+ let last_modified: DateTime < FixedOffset > =
76
+ chrono:: DateTime :: parse_from_rfc2822 ( last_modified. to_str ( ) ?) ?;
77
+
78
+ let metadata = tokio:: fs:: metadata ( & destination) . await ?;
79
+ let metadata_modified: DateTime < Utc > = metadata. modified ( ) ?. into ( ) ;
80
+
81
+ Ok ( metadata. len ( ) != content_length || metadata_modified != last_modified)
82
+ }
83
+ Self :: Buildomat ( blob_spec) => {
84
+ let digest = get_sha256_digest ( destination) . await ?;
85
+ let expected_digest = hex:: decode ( & blob_spec. sha256 ) ?;
86
+ Ok ( digest. as_ref ( ) != expected_digest)
87
+ }
88
+ }
89
+ }
90
+ }
91
+
22
92
// Downloads "source" from S3_BUCKET to "destination".
23
- pub async fn download ( progress : & impl Progress , source : & str , destination : & Path ) -> Result < ( ) > {
93
+ pub async fn download < ' a > (
94
+ progress : & impl Progress ,
95
+ source : & Source < ' a > ,
96
+ destination : & Path ,
97
+ ) -> Result < ( ) > {
24
98
let blob = destination
25
99
. file_name ( )
26
100
. ok_or_else ( || anyhow ! ( "missing blob filename" ) ) ?;
27
101
28
- let url = format ! ( "{}/{}" , S3_BUCKET , source) ;
102
+ let url = source. get_url ( ) ;
29
103
let client = reqwest:: Client :: new ( ) ;
30
-
31
- let head_response = client. head ( & url) . send ( ) . await ?. error_for_status ( ) ?;
32
- let headers = head_response. headers ( ) ;
33
-
34
- // From S3, header looks like:
35
- //
36
- // "Content-Length: 49283072"
37
- let content_length = headers
38
- . get ( CONTENT_LENGTH )
39
- . ok_or_else ( || anyhow ! ( "no content length on {} HEAD response!" , url) ) ?;
40
- let mut content_length: u64 = u64:: from_str ( content_length. to_str ( ) ?) ?;
41
-
42
- if destination. exists ( ) {
43
- // If destination exists, check against size and last modified time. If
44
- // both are the same, then return Ok
45
-
46
- // From S3, header looks like:
47
- //
48
- // "Last-Modified: Fri, 27 May 2022 20:50:17 GMT"
49
- let last_modified = headers
50
- . get ( LAST_MODIFIED )
51
- . ok_or_else ( || anyhow ! ( "no last modified on {} HEAD response!" , url) ) ?;
52
- let last_modified: DateTime < FixedOffset > =
53
- chrono:: DateTime :: parse_from_rfc2822 ( last_modified. to_str ( ) ?) ?;
54
- let metadata = tokio:: fs:: metadata ( & destination) . await ?;
55
- let metadata_modified: DateTime < Utc > = metadata. modified ( ) ?. into ( ) ;
56
-
57
- if metadata. len ( ) == content_length && metadata_modified == last_modified {
58
- return Ok ( ( ) ) ;
59
- }
104
+ if !source. download_required ( & url, & client, destination) . await ? {
105
+ return Ok ( ( ) ) ;
60
106
}
61
107
62
108
let response = client. get ( url) . send ( ) . await ?. error_for_status ( ) ?;
63
109
let response_headers = response. headers ( ) ;
64
110
65
111
// Grab update Content-Length from response headers, if present.
66
112
// We only use it as a hint for the progress so no need to fail.
67
- if let Some ( Ok ( Ok ( resp_len) ) ) = response_headers
113
+ let content_length = if let Some ( Ok ( Ok ( resp_len) ) ) = response_headers
68
114
. get ( CONTENT_LENGTH )
69
115
. map ( |c| c. to_str ( ) . map ( u64:: from_str) )
70
116
{
71
- content_length = resp_len;
72
- }
73
-
74
- // Store modified time from HTTPS response
75
- let last_modified = response_headers
76
- . get ( LAST_MODIFIED )
77
- . ok_or_else ( || anyhow ! ( "no last modified on GET response!" ) ) ?;
78
- let last_modified: DateTime < FixedOffset > =
79
- chrono:: DateTime :: parse_from_rfc2822 ( last_modified. to_str ( ) ?) ?;
117
+ Some ( resp_len)
118
+ } else {
119
+ None
120
+ } ;
121
+
122
+ // If the server advertised a last-modified time for the blob, save it here
123
+ // so that the downloaded blob's last-modified time can be set to it.
124
+ let last_modified = if let Some ( time) = response_headers. get ( LAST_MODIFIED ) {
125
+ Some ( chrono:: DateTime :: parse_from_rfc2822 ( time. to_str ( ) ?) ?)
126
+ } else {
127
+ None
128
+ } ;
80
129
81
130
// Write file bytes to destination
82
131
let mut file = tokio:: fs:: File :: create ( destination) . await ?;
83
132
84
133
// Create a sub-progress for the blob download
85
- let blob_progress = progress. sub_progress ( content_length) ;
134
+ let blob_progress = if let Some ( length) = content_length {
135
+ progress. sub_progress ( length)
136
+ } else {
137
+ Box :: new ( NoProgress )
138
+ } ;
86
139
blob_progress. set_message ( blob. to_string_lossy ( ) . into_owned ( ) . into ( ) ) ;
87
140
88
141
let mut stream = response. bytes_stream ( ) ;
@@ -107,14 +160,39 @@ pub async fn download(progress: &impl Progress, source: &str, destination: &Path
107
160
drop ( file) ;
108
161
109
162
// Set destination file's modified time based on HTTPS response
110
- filetime:: set_file_mtime (
111
- destination,
112
- filetime:: FileTime :: from_system_time ( last_modified. into ( ) ) ,
113
- ) ?;
163
+ if let Some ( last_modified) = last_modified {
164
+ filetime:: set_file_mtime (
165
+ destination,
166
+ filetime:: FileTime :: from_system_time ( last_modified. into ( ) ) ,
167
+ ) ?;
168
+ }
114
169
115
170
Ok ( ( ) )
116
171
}
117
172
173
+ async fn get_sha256_digest ( path : & Path ) -> Result < Digest > {
174
+ let mut reader = BufReader :: new (
175
+ tokio:: fs:: File :: open ( path)
176
+ . await
177
+ . with_context ( || format ! ( "could not open {path:?}" ) ) ?,
178
+ ) ;
179
+ let mut context = DigestContext :: new ( & SHA256 ) ;
180
+ let mut buffer = [ 0 ; 1024 ] ;
181
+
182
+ loop {
183
+ let count = reader
184
+ . read ( & mut buffer)
185
+ . await
186
+ . with_context ( || format ! ( "failed to read {path:?}" ) ) ?;
187
+ if count == 0 {
188
+ break ;
189
+ } else {
190
+ context. update ( & buffer[ ..count] ) ;
191
+ }
192
+ }
193
+ Ok ( context. finish ( ) )
194
+ }
195
+
118
196
#[ test]
119
197
fn test_converts ( ) {
120
198
let content_length = "1966080" ;
0 commit comments