Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
refactor: Read chunks until n bytes instead of take
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed May 2, 2024
1 parent ced6bef commit b74e645
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 4 deletions.
11 changes: 11 additions & 0 deletions hook-worker/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ pub enum WebhookRequestError {
},
}

/// Enumeration of errors that can occur while handling a `reqwest::Response`.
/// Currently, not consumed anywhere. Grouped here to support a common error type for
/// `utils::first_n_bytes_of_response`.
#[derive(Error, Debug)]
pub enum WebhookResponseError {
#[error("failed to parse a response as UTF8")]
ParseUTF8StringError(#[from] std::str::Utf8Error),
#[error("error while iterating over response body chunks")]
StreamIterationError(#[from] reqwest::Error),
}

/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error`
/// any response message if available.
impl fmt::Display for WebhookRequestError {
Expand Down
22 changes: 18 additions & 4 deletions hook-worker/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
use crate::error::WebhookResponseError;
use futures::StreamExt;
use reqwest::Response;

pub async fn first_n_bytes_of_response(
response: Response,
n: usize,
) -> Result<String, reqwest::Error> {
let mut body = response.bytes_stream().take(n);
let mut buffer = String::new();
) -> Result<String, WebhookResponseError> {
let mut body = response.bytes_stream();
let mut buffer = String::with_capacity(n);

while let Some(chunk) = body.next().await {
if buffer.len() >= n {
// Early return before reading next chunk.
break;
}

let chunk = chunk?;
buffer.push_str(std::str::from_utf8(&chunk).unwrap());
let chunk_str = std::str::from_utf8(&chunk)?;
if let Some(partial_chunk_str) =
chunk_str.get(0..std::cmp::min(n - buffer.len(), chunk_str.len()))
{
buffer.push_str(&partial_chunk_str);
} else {
// For whatever reason, we are out of bounds, give up.
break;
}
}

Ok(buffer)
Expand Down
30 changes: 30 additions & 0 deletions hook-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ async fn send_webhook(
Err(WebhookError::Request(
WebhookRequestError::RetryableRequestError {
error: err,
// TODO: Make amount of bytes configurable.
response: first_n_bytes_of_response(response, 10 * 1024).await.ok(),
retry_after,
},
Expand Down Expand Up @@ -636,4 +637,33 @@ mod tests {
));
}
}

#[sqlx::test(migrations = "../migrations")]
async fn test_error_message_contains_up_to_n_bytes_of_response_body(_pg: PgPool) {
let method = HttpMethod::POST;
let url = "http://localhost:18081/fail";
let headers = collections::HashMap::new();
// This is double the current hardcoded amount of bytes.
// TODO: Make this configurable and chage it here too.
let body = (0..20 * 1024).map(|_| "a").collect::<Vec<_>>().concat();
let client = reqwest::Client::new();

let err = send_webhook(client, &method, url, &headers, body.to_owned())
.await
.err()
.expect("request didn't fail when it should have failed");

assert!(matches!(err, WebhookError::Request(..)));
if let WebhookError::Request(request_error) = err {
assert_eq!(request_error.status(), Some(StatusCode::BAD_REQUEST));
assert!(request_error.to_string().contains(&body[0..10 * 1024]));
// The 81 bytes account for the reqwest erorr message as described below.
assert_eq!(request_error.to_string().len(), 10 * 1024 + 81);
// This is the display implementation of reqwest. Just checking it is still there.
// See: https://github.com/seanmonstar/reqwest/blob/master/src/error.rs
assert!(request_error.to_string().contains(
"HTTP status client error (400 Bad Request) for url (http://localhost:18081/fail)"
));
}
}
}

0 comments on commit b74e645

Please sign in to comment.