diff --git a/src/binaries.rs b/src/binaries.rs index ba22e93..4d5ef55 100644 --- a/src/binaries.rs +++ b/src/binaries.rs @@ -7,7 +7,7 @@ use tokio::{fs::File as AsyncFile, io::AsyncSeekExt}; use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::{info_span, instrument, Instrument}; -use crate::retry::{retry_large_request, retry_request}; +use crate::retry::retry_request; #[instrument(skip(client))] pub async fn download_binaries( @@ -28,7 +28,7 @@ pub async fn download_binaries( let mut binaries = HashMap::new(); for binary in binary_list.binaries { - let mut dest = retry_large_request(|| { + let mut dest = retry_request(|| { let binary = binary.clone(); let client = client.clone(); async move { diff --git a/src/monitor.rs b/src/monitor.rs index 2880504..9b2d861 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -11,7 +11,7 @@ use tokio::{ }; use tracing::{debug, instrument}; -use crate::retry::{retry_large_request, retry_request}; +use crate::retry::retry_request; #[derive(Debug)] pub enum PackageCompletion { @@ -245,7 +245,7 @@ impl ObsMonitor { pub async fn download_build_log(&self) -> Result { const LOG_LEN_TO_CHECK_FOR_MD5: u64 = 2500; - let mut file = retry_large_request(|| async { + let mut file = retry_request(|| async { let mut file = AsyncFile::from_std( tempfile::tempfile().wrap_err("Failed to create tempfile to build log")?, ); diff --git a/src/retry.rs b/src/retry.rs index 79cff39..d7f2ca2 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -8,6 +8,8 @@ use open_build_service_api as obs; use tokio::sync::Mutex; use tracing::instrument; +const INITIAL_INTERVAL: Duration = Duration::from_millis(300); + fn is_client_error(err: &(dyn std::error::Error + 'static)) -> bool { err.downcast_ref::() .and_then(|e| e.status()) @@ -30,7 +32,8 @@ fn is_caused_by_client_error(report: &Report) -> bool { }) } -async fn retry_request_impl(backoff_limit: Duration, func: Func) -> Result +#[instrument(skip(func))] +pub async fn retry_request(func: Func) -> Result where Fut: Future>, Func: FnMut() -> Fut, @@ -39,7 +42,8 @@ where let func = Arc::new(Mutex::new(func)); backoff::future::retry( ExponentialBackoff { - max_elapsed_time: Some(backoff_limit), + max_elapsed_time: None, + initial_interval: INITIAL_INTERVAL, ..Default::default() }, move || { @@ -60,32 +64,11 @@ where .await } -#[instrument(skip(func))] -pub async fn retry_request(func: Func) -> Result -where - Fut: Future>, - Func: FnMut() -> Fut, - E: Into, -{ - const BACKOFF_LIMIT: Duration = Duration::from_secs(10 * 60); // 10 minutes - retry_request_impl(BACKOFF_LIMIT, func).await -} - -#[instrument(skip(func))] -pub async fn retry_large_request(func: Func) -> Result -where - Fut: Future>, - Func: FnMut() -> Fut, - E: Into, -{ - const BACKOFF_LIMIT: Duration = Duration::from_secs(60 * 60); // 1 hour - retry_request_impl(BACKOFF_LIMIT, func).await -} - #[cfg(test)] mod tests { use claim::*; use open_build_service_api as obs; + use rstest::*; use wiremock::{ matchers::{method, path_regex}, Mock, MockServer, ResponseTemplate, @@ -95,10 +78,12 @@ mod tests { use super::*; - const LIMIT: Duration = Duration::from_secs(1); + fn wrap_in_io_error(err: obs::Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, err) + } - #[tokio::test] - async fn test_retry_on_non_client_errors() { + #[fixture] + async fn server() -> MockServer { let server = MockServer::start().await; Mock::given(method("GET")) @@ -116,6 +101,13 @@ mod tests { .mount(&server) .await; + server + } + + #[rstest] + #[tokio::test] + async fn test_retry_on_non_client_errors(server: impl Future) { + let server = server.await; let client = obs::Client::new( server.uri().parse().unwrap(), TEST_USER.to_owned(), @@ -124,50 +116,89 @@ mod tests { let mut attempts = 0; assert_err!( - retry_request_impl(LIMIT, || { - attempts += 1; - async { client.project("500".to_owned()).meta().await } - }) + tokio::time::timeout( + Duration::from_millis(2000), + retry_request(|| { + attempts += 1; + async { client.project("500".to_owned()).meta().await } + }) + ) .await ); assert_gt!(attempts, 1); + } + + #[rstest] + #[tokio::test] + async fn test_retry_on_nested_non_client_errors(server: impl Future) { + let server = server.await; + let client = obs::Client::new( + server.uri().parse().unwrap(), + TEST_USER.to_owned(), + TEST_PASS.to_owned(), + ); let mut attempts = 0; assert_err!( - retry_request_impl(LIMIT, || { - attempts += 1; - async { - client - .project("500".to_owned()) - .meta() - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - } - }) + tokio::time::timeout( + Duration::from_millis(2000), + retry_request(|| { + attempts += 1; + async { + client + .project("500".to_owned()) + .meta() + .await + .map_err(wrap_in_io_error) + } + }) + ) .await ); assert_gt!(attempts, 1); + } - attempts = 0; + #[rstest] + #[tokio::test] + async fn test_no_retry_on_client_errors(server: impl Future) { + let server = server.await; + let client = obs::Client::new( + server.uri().parse().unwrap(), + TEST_USER.to_owned(), + TEST_PASS.to_owned(), + ); + + let mut attempts = 0; assert_err!( - retry_request_impl(LIMIT, || { + retry_request(|| { attempts += 1; async { client.project("403".to_owned()).meta().await } }) .await ); assert_eq!(attempts, 1); + } - attempts = 0; + #[rstest] + #[tokio::test] + async fn test_no_retry_on_nested_client_errors(server: impl Future) { + let server = server.await; + let client = obs::Client::new( + server.uri().parse().unwrap(), + TEST_USER.to_owned(), + TEST_PASS.to_owned(), + ); + + let mut attempts = 0; assert_err!( - retry_request_impl(LIMIT, || { + retry_request(|| { attempts += 1; async { client .project("403".to_owned()) .meta() .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .map_err(wrap_in_io_error) } }) .await diff --git a/src/upload.rs b/src/upload.rs index c8d85cf..79516c2 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -10,11 +10,7 @@ use md5::{Digest, Md5}; use open_build_service_api as obs; use tracing::{debug, info_span, instrument, trace, Instrument}; -use crate::{ - artifacts::ArtifactDirectory, - dsc::Dsc, - retry::{retry_large_request, retry_request}, -}; +use crate::{artifacts::ArtifactDirectory, dsc::Dsc, retry::retry_request}; type Md5String = String; @@ -234,7 +230,7 @@ impl ObsDscUploader { debug!("Uploading file"); let file = artifacts.get_file(root.join(filename).as_str()).await?; - retry_large_request(|| { + retry_request(|| { file.try_clone().then(|file| async { let file = file.wrap_err("Failed to clone file")?; self.client