Skip to content

Commit

Permalink
Implement fsync and handle write errors (#313)
Browse files Browse the repository at this point in the history
Implement `fsync` to allow users to complete a put request and receive confirmation that it succeeded or failed. If a file handle is released without a call to `fsync`, `release` will still complete the upload as before. 

Wrap the `UploadRequest` in the file handle in a new `UploadState` enum, in order to detect:
* on `release`, whether the request had been already completed by an `fsync` call,
* `write` is invoked after an `fsync`,
* `write` (or `fsync`) is invoked after a previous call failed.

Also adds support for put failures to FailureClient.

---------

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Jun 29, 2023
1 parent 5c1c831 commit c89f05d
Show file tree
Hide file tree
Showing 10 changed files with 588 additions and 85 deletions.
112 changes: 92 additions & 20 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ use crate::object_client::{
GetObjectError, HeadObjectError, HeadObjectResult, ListObjectsError, ObjectClientError, ObjectClientResult,
PutObjectError, PutObjectParams,
};
use crate::{ETag, ListObjectsResult, ObjectAttribute, ObjectClient};
use crate::{ETag, ListObjectsResult, ObjectAttribute, ObjectClient, PutObjectRequest, PutObjectResult};

// Wrapper for injecting failures into a get stream
pub struct FailureGetWrapper<Client: ObjectClient, GetWrapperState> {
state: GetWrapperState,
result_fn: fn(&mut GetWrapperState) -> Result<(), Client::ClientError>,
// Wrapper for injecting failures into a get stream or a put request
pub struct FailureRequestWrapper<Client: ObjectClient, RequestWrapperState> {
state: RequestWrapperState,
result_fn: fn(&mut RequestWrapperState) -> Result<(), Client::ClientError>,
}

#[allow(clippy::type_complexity)]
pub struct FailureClient<Client: ObjectClient, State, GetWrapperState> {
pub struct FailureClient<Client: ObjectClient, State, RequestWrapperState> {
pub client: Client,
pub state: Mutex<State>,
pub get_object_cb: fn(
Expand All @@ -33,7 +33,7 @@ pub struct FailureClient<Client: ObjectClient, State, GetWrapperState> {
Option<Range<u64>>,
Option<ETag>,
) -> Result<
FailureGetWrapper<Client, GetWrapperState>,
FailureRequestWrapper<Client, RequestWrapperState>,
ObjectClientError<GetObjectError, Client::ClientError>,
>,
pub head_object_cb:
Expand All @@ -46,6 +46,15 @@ pub struct FailureClient<Client: ObjectClient, State, GetWrapperState> {
usize,
&str,
) -> Result<(), ObjectClientError<ListObjectsError, Client::ClientError>>,
pub put_object_cb: fn(
&mut State,
&str,
&str,
&PutObjectParams,
) -> Result<
FailureRequestWrapper<Client, RequestWrapperState>,
ObjectClientError<PutObjectError, Client::ClientError>,
>,
}

#[async_trait]
Expand All @@ -56,7 +65,7 @@ where
GetWrapperState: Send + Sync + 'static,
{
type GetObjectResult = FailureGetResult<Client, GetWrapperState>;
type PutObjectRequest = Client::PutObjectRequest;
type PutObjectRequest = FailurePutObjectRequest<Client, GetWrapperState>;
type ClientError = Client::ClientError;

async fn delete_object(
Expand Down Expand Up @@ -127,8 +136,13 @@ where
key: &str,
params: &PutObjectParams,
) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError> {
// TODO Add put fault injection hooks
self.client.put_object(bucket, key, params).await
let wrapper = (self.put_object_cb)(&mut *self.state.lock().unwrap(), bucket, key, &params.clone())?;
let request = self.client.put_object(bucket, key, params).await?;
Ok(FailurePutObjectRequest {
request,
state: wrapper.state,
result_fn: wrapper.result_fn,
})
}

async fn get_object_attributes(
Expand Down Expand Up @@ -164,31 +178,58 @@ impl<Client: ObjectClient, FailState> Stream for FailureGetResult<Client, FailSt
}
}

pub struct FailurePutObjectRequest<Client: ObjectClient, PutWrapperState> {
request: Client::PutObjectRequest,
state: PutWrapperState,
result_fn: fn(&mut PutWrapperState) -> Result<(), Client::ClientError>,
}

#[async_trait]
impl<Client: ObjectClient, PutWrapperState> PutObjectRequest for FailurePutObjectRequest<Client, PutWrapperState>
where
Client::PutObjectRequest: Send,
PutWrapperState: Send,
{
type ClientError = Client::ClientError;

async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
(self.result_fn)(&mut self.state)?;
self.request.write(slice).await
}

async fn complete(mut self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
(self.result_fn)(&mut self.state)?;
self.request.complete().await
}
}

/// A failure client that fails operations based on counts.
pub type CountdownFailureClient<Client> =
FailureClient<Client, CountdownFailureClientState<Client>, CountdownFailureGetState<Client>>;
FailureClient<Client, CountdownFailureClientState<Client>, CountdownFailureRequestState<Client>>;

pub type GetFailureMap<Client> = HashMap<
pub type RequestFailureMap<Client, RequestError> = HashMap<
usize,
Result<
(usize, <Client as ObjectClient>::ClientError),
ObjectClientError<GetObjectError, <Client as ObjectClient>::ClientError>,
ObjectClientError<RequestError, <Client as ObjectClient>::ClientError>,
>,
>;

#[allow(clippy::type_complexity)]
#[derive(Default)]
pub struct CountdownFailureClientState<Client: ObjectClient> {
get_count: usize,
get_results: GetFailureMap<Client>,
get_results: RequestFailureMap<Client, GetObjectError>,
head_count: usize,
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, Client::ClientError>>,
list_count: usize,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, Client::ClientError>>,
put_count: usize,
put_results: RequestFailureMap<Client, PutObjectError>,
}

#[derive(Debug, Default)]
pub struct CountdownFailureGetState<Client: ObjectClient> {
pub struct CountdownFailureRequestState<Client: ObjectClient> {
count: usize,
fail_count: usize,
error: Option<Client::ClientError>,
Expand All @@ -203,12 +244,16 @@ pub fn countdown_failure_client<Client: ObjectClient>(
// returns error E on the n'th read request from that stream, otherwise reads from the underlying stream
// (Note: we could also define a failure client that tracks offsets, and returns an error when the offset
// reaches a specified threshold.)
get_results: GetFailureMap<Client>,
get_results: RequestFailureMap<Client, GetObjectError>,
// For HEAD and LIST, map entries are interpreted as follows:
// (k -> E) means inject error E on the k'th call to that operation
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, Client::ClientError>>,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, Client::ClientError>>,
// TODO add put failures
// For PUT, map entries are interpreted as follows (operations are numbered starting at 1):
// (k -> Err(E) means return error E on the k'th PUT
// (k -> Ok((n, E))) means return a put request object on the k'th put that
// returns error E on the n'th write, otherwise writes to the underlying request.
put_results: RequestFailureMap<Client, PutObjectError>,
) -> CountdownFailureClient<Client> {
let state = Mutex::new(CountdownFailureClientState {
get_count: 0usize,
Expand All @@ -217,6 +262,8 @@ pub fn countdown_failure_client<Client: ObjectClient>(
head_failures,
list_count: 0usize,
list_failures,
put_count: 0usize,
put_results,
});
FailureClient {
client,
Expand All @@ -229,8 +276,8 @@ pub fn countdown_failure_client<Client: ObjectClient>(
} else {
(usize::MAX, None)
};
Ok(FailureGetWrapper {
state: CountdownFailureGetState {
Ok(FailureRequestWrapper {
state: CountdownFailureRequestState {
count: 0,
fail_count,
error,
Expand Down Expand Up @@ -261,6 +308,30 @@ pub fn countdown_failure_client<Client: ObjectClient>(
Ok(())
}
},
put_object_cb: |state, _bucket, _key, _params| {
state.put_count += 1;
let (fail_count, error) = if let Some(result) = state.put_results.remove(&state.put_count) {
let (fail_count, error) = result?;
(fail_count, Some(error))
} else {
(usize::MAX, None)
};
Ok(FailureRequestWrapper {
state: CountdownFailureRequestState {
count: 0,
fail_count,
error,
},
result_fn: |state| {
state.count += 1;
if state.count >= state.fail_count {
Err(state.error.take().unwrap())
} else {
Ok(())
}
},
})
},
}
}

Expand Down Expand Up @@ -299,7 +370,8 @@ mod tests {
Err(ObjectClientError::ClientError(MockClientError("no such bucket".into()))),
);

let fail_client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new());
let fail_client =
countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new());

let fail_set = HashSet::from([2, 4, 5]);
for i in 1..=6 {
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl PutObjectParams {
/// A streaming put request which allows callers to asynchronously write
/// the body of the request.
#[async_trait]
pub trait PutObjectRequest {
pub trait PutObjectRequest: Send {
type ClientError: std::error::Error + Send + Sync + 'static;

/// Write the given slice to the put request body.
Expand Down
Loading

1 comment on commit c89f05d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: c89f05d Previous: 62ac31d Ratio
random_read_small_file 1.6201171875 MiB/s 5.51171875 MiB/s 3.40

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.