Skip to content

Commit 96c4c0b

Browse files
tustvoldalamb
andauthored
Replace AsyncWrite with Upload trait and rename MultiPartStore to MultipartStore (#5458) (#5500)
* Replace AsyncWrite with Upload trait (#5458) * Make BufWriter abortable * Flesh out cloud implementations * Review feedback * Misc tweaks and fixes * Format * Replace multi-part with multipart * More docs * Clippy * Rename to MultipartUpload * Rename ChunkedUpload to WriteMultipart * Doc tweaks * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Docs * Format --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent f41c2a4 commit 96c4c0b

18 files changed

+691
-883
lines changed

object_store/src/aws/mod.rs

+59-45
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
//! An object store implementation for S3
1919
//!
20-
//! ## Multi-part uploads
20+
//! ## Multipart uploads
2121
//!
22-
//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] method.
23-
//! Data passed to the writer is automatically buffered to meet the minimum size
24-
//! requirements for a part. Multiple parts are uploaded concurrently.
22+
//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method.
2523
//!
2624
//! If the writer fails for any reason, you may have parts uploaded to AWS but not
27-
//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method
28-
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
29-
//! consider implementing [automatic cleanup] of unused parts that are older than one
30-
//! week.
25+
//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop
26+
//! these unneeded parts, however, it is recommended that you consider implementing
27+
//! [automatic cleanup] of unused parts that are older than some threshold.
3128
//!
3229
//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
3330
@@ -38,18 +35,17 @@ use futures::{StreamExt, TryStreamExt};
3835
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
3936
use reqwest::{Method, StatusCode};
4037
use std::{sync::Arc, time::Duration};
41-
use tokio::io::AsyncWrite;
4238
use url::Url;
4339

4440
use crate::aws::client::{RequestError, S3Client};
4541
use crate::client::get::GetClientExt;
4642
use crate::client::list::ListClientExt;
4743
use crate::client::CredentialProvider;
48-
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
44+
use crate::multipart::{MultipartStore, PartId};
4945
use crate::signer::Signer;
5046
use crate::{
51-
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode,
52-
PutOptions, PutResult, Result,
47+
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
48+
ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
5349
};
5450

5551
static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
@@ -85,6 +81,7 @@ const STORE: &str = "S3";
8581

8682
/// [`CredentialProvider`] for [`AmazonS3`]
8783
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
84+
use crate::client::parts::Parts;
8885
pub use credential::{AwsAuthorizer, AwsCredential};
8986

9087
/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
@@ -211,25 +208,18 @@ impl ObjectStore for AmazonS3 {
211208
}
212209
}
213210

214-
async fn put_multipart(
215-
&self,
216-
location: &Path,
217-
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
218-
let id = self.client.create_multipart(location).await?;
219-
220-
let upload = S3MultiPartUpload {
221-
location: location.clone(),
222-
upload_id: id.clone(),
223-
client: Arc::clone(&self.client),
224-
};
225-
226-
Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
227-
}
228-
229-
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
230-
self.client
231-
.delete_request(location, &[("uploadId", multipart_id)])
232-
.await
211+
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
212+
let upload_id = self.client.create_multipart(location).await?;
213+
214+
Ok(Box::new(S3MultiPartUpload {
215+
part_idx: 0,
216+
state: Arc::new(UploadState {
217+
client: Arc::clone(&self.client),
218+
location: location.clone(),
219+
upload_id: upload_id.clone(),
220+
parts: Default::default(),
221+
}),
222+
}))
233223
}
234224

235225
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
@@ -319,30 +309,55 @@ impl ObjectStore for AmazonS3 {
319309
}
320310
}
321311

312+
#[derive(Debug)]
322313
struct S3MultiPartUpload {
314+
part_idx: usize,
315+
state: Arc<UploadState>,
316+
}
317+
318+
#[derive(Debug)]
319+
struct UploadState {
320+
parts: Parts,
323321
location: Path,
324322
upload_id: String,
325323
client: Arc<S3Client>,
326324
}
327325

328326
#[async_trait]
329-
impl PutPart for S3MultiPartUpload {
330-
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
331-
self.client
332-
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
327+
impl MultipartUpload for S3MultiPartUpload {
328+
fn put_part(&mut self, data: Bytes) -> UploadPart {
329+
let idx = self.part_idx;
330+
self.part_idx += 1;
331+
let state = Arc::clone(&self.state);
332+
Box::pin(async move {
333+
let part = state
334+
.client
335+
.put_part(&state.location, &state.upload_id, idx, data)
336+
.await?;
337+
state.parts.put(idx, part);
338+
Ok(())
339+
})
340+
}
341+
342+
async fn complete(&mut self) -> Result<PutResult> {
343+
let parts = self.state.parts.finish(self.part_idx)?;
344+
345+
self.state
346+
.client
347+
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
333348
.await
334349
}
335350

336-
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
337-
self.client
338-
.complete_multipart(&self.location, &self.upload_id, completed_parts)
339-
.await?;
340-
Ok(())
351+
async fn abort(&mut self) -> Result<()> {
352+
self.state
353+
.client
354+
.delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)])
355+
.await
341356
}
342357
}
343358

344359
#[async_trait]
345-
impl MultiPartStore for AmazonS3 {
360+
impl MultipartStore for AmazonS3 {
346361
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
347362
self.client.create_multipart(path).await
348363
}
@@ -377,7 +392,6 @@ mod tests {
377392
use crate::{client::get::GetClient, tests::*};
378393
use bytes::Bytes;
379394
use hyper::HeaderMap;
380-
use tokio::io::AsyncWriteExt;
381395

382396
const NON_EXISTENT_NAME: &str = "nonexistentname";
383397

@@ -542,9 +556,9 @@ mod tests {
542556
store.put(&locations[0], data.clone()).await.unwrap();
543557
store.copy(&locations[0], &locations[1]).await.unwrap();
544558

545-
let (_, mut writer) = store.put_multipart(&locations[2]).await.unwrap();
546-
writer.write_all(&data).await.unwrap();
547-
writer.shutdown().await.unwrap();
559+
let mut upload = store.put_multipart(&locations[2]).await.unwrap();
560+
upload.put_part(data.clone()).await.unwrap();
561+
upload.complete().await.unwrap();
548562

549563
for location in &locations {
550564
let res = store

object_store/src/azure/mod.rs

+47-33
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,15 @@
1919
//!
2020
//! ## Streaming uploads
2121
//!
22-
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those
23-
//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks
24-
//! are uploaded concurrently.
22+
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks.
2523
//!
26-
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
27-
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
28-
//! after 7 days.
24+
//! Unused blocks will automatically be dropped after 7 days.
2925
use crate::{
30-
multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
26+
multipart::{MultipartStore, PartId},
3127
path::Path,
3228
signer::Signer,
33-
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
34-
Result,
29+
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
30+
PutOptions, PutResult, Result, UploadPart,
3531
};
3632
use async_trait::async_trait;
3733
use bytes::Bytes;
@@ -40,7 +36,6 @@ use reqwest::Method;
4036
use std::fmt::Debug;
4137
use std::sync::Arc;
4238
use std::time::Duration;
43-
use tokio::io::AsyncWrite;
4439
use url::Url;
4540

4641
use crate::client::get::GetClientExt;
@@ -54,6 +49,8 @@ mod credential;
5449

5550
/// [`CredentialProvider`] for [`MicrosoftAzure`]
5651
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
52+
use crate::azure::client::AzureClient;
53+
use crate::client::parts::Parts;
5754
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
5855
pub use credential::AzureCredential;
5956

@@ -94,21 +91,15 @@ impl ObjectStore for MicrosoftAzure {
9491
self.client.put_blob(location, bytes, opts).await
9592
}
9693

97-
async fn put_multipart(
98-
&self,
99-
location: &Path,
100-
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
101-
let inner = AzureMultiPartUpload {
102-
client: Arc::clone(&self.client),
103-
location: location.to_owned(),
104-
};
105-
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
106-
}
107-
108-
async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
109-
// There is no way to drop blocks that have been uploaded. Instead, they simply
110-
// expire in 7 days.
111-
Ok(())
94+
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
95+
Ok(Box::new(AzureMultiPartUpload {
96+
part_idx: 0,
97+
state: Arc::new(UploadState {
98+
client: Arc::clone(&self.client),
99+
location: location.clone(),
100+
parts: Default::default(),
101+
}),
102+
}))
112103
}
113104

114105
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
@@ -197,26 +188,49 @@ impl Signer for MicrosoftAzure {
197188
/// put_multipart_part -> PUT block
198189
/// complete -> PUT block list
199190
/// abort -> No equivalent; blocks are simply dropped after 7 days
200-
#[derive(Debug, Clone)]
191+
#[derive(Debug)]
201192
struct AzureMultiPartUpload {
202-
client: Arc<client::AzureClient>,
193+
part_idx: usize,
194+
state: Arc<UploadState>,
195+
}
196+
197+
#[derive(Debug)]
198+
struct UploadState {
203199
location: Path,
200+
parts: Parts,
201+
client: Arc<AzureClient>,
204202
}
205203

206204
#[async_trait]
207-
impl PutPart for AzureMultiPartUpload {
208-
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
209-
self.client.put_block(&self.location, idx, buf.into()).await
205+
impl MultipartUpload for AzureMultiPartUpload {
206+
fn put_part(&mut self, data: Bytes) -> UploadPart {
207+
let idx = self.part_idx;
208+
self.part_idx += 1;
209+
let state = Arc::clone(&self.state);
210+
Box::pin(async move {
211+
let part = state.client.put_block(&state.location, idx, data).await?;
212+
state.parts.put(idx, part);
213+
Ok(())
214+
})
215+
}
216+
217+
async fn complete(&mut self) -> Result<PutResult> {
218+
let parts = self.state.parts.finish(self.part_idx)?;
219+
220+
self.state
221+
.client
222+
.put_block_list(&self.state.location, parts)
223+
.await
210224
}
211225

212-
async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
213-
self.client.put_block_list(&self.location, parts).await?;
226+
async fn abort(&mut self) -> Result<()> {
227+
// Nothing to do
214228
Ok(())
215229
}
216230
}
217231

218232
#[async_trait]
219-
impl MultiPartStore for MicrosoftAzure {
233+
impl MultipartStore for MicrosoftAzure {
220234
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
221235
Ok(String::new())
222236
}

0 commit comments

Comments
 (0)