diff --git a/docs/reference/channel.md b/docs/reference/channel.md index c43cac2d2e..b2a5938482 100644 --- a/docs/reference/channel.md +++ b/docs/reference/channel.md @@ -318,6 +318,25 @@ Available options: `protocol` : Allow choosing the protocol for the resulting remote URLs. Available choices: `ftp`, `http`, `https` (default: `ftp`). +`retryPolicy` +: Set a retry policy in case of the SRA request fail with a retriable error. +The retry policy is set as a Map specifying the different policy properties. + +Available retry policy properties: + +| Property | Description | Default | +| ------------- |-------------------------------------------------| ------- | +| `delay` | Delay when retrying failed SRA requests. | `500ms` | +| `jitter` | Jitter value when retrying failed SRA requests. | `0.25` | +| `maxAttempts` | Max attempts when retrying failed SRA requests. | `3` | +| `maxDelay` | Max delay when retrying failed SRA requests. | `30s` | + +The following code snippet shows an example for using the `Channel.fromSRA` factory method with a custom `retryPolicy`. + + ```groovy + channel.fromSRA(ids, retryPolicy: [delay: '250ms', maxAttempts: 5]) + ``` + (channel-interval)= ## interval diff --git a/modules/nextflow/src/main/groovy/nextflow/datasource/SraExplorer.groovy b/modules/nextflow/src/main/groovy/nextflow/datasource/SraExplorer.groovy index 8c0f118beb..a4c98dabe5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/datasource/SraExplorer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/datasource/SraExplorer.groovy @@ -16,6 +16,12 @@ package nextflow.datasource +import dev.failsafe.Failsafe +import dev.failsafe.RetryPolicy +import dev.failsafe.event.EventListener +import dev.failsafe.event.ExecutionAttemptedEvent +import dev.failsafe.function.CheckedSupplier + import java.nio.file.NoSuchFileException import java.nio.file.Path @@ -34,6 +40,10 @@ import nextflow.extension.FilesEx import nextflow.file.FileHelper import nextflow.util.CacheHelper import nextflow.util.Duration + +import java.time.temporal.ChronoUnit +import java.util.function.Predicate + /** * Query NCBI SRA database and returns the retrieved FASTQs to the specified * target channel. Inspired to SRA-Explorer by Phil Ewels -- https://ewels.github.io/sra-explorer/ @@ -43,7 +53,9 @@ import nextflow.util.Duration @Slf4j class SraExplorer { - static public Map PARAMS = [apiKey:[String,GString], cache: Boolean, max: Integer, protocol: ['ftp','http','https']] + static public Map PARAMS = [apiKey:[String,GString], cache: Boolean, max: Integer, protocol: ['ftp','http','https'], + retryPolicy: Map] + final static List RETRY_CODES = List.of(408, 429, 500, 502, 503, 504) @ToString static class SearchRecord { @@ -67,6 +79,7 @@ class SraExplorer { private List missing = new ArrayList<>() private Path cacheFolder private String protocol = 'ftp' + private SraRetryConfig retryConfig = new SraRetryConfig() String apiKey boolean useCache = true @@ -94,6 +107,8 @@ class SraExplorer { maxResults = opts.max as int if( opts.protocol ) protocol = opts.protocol as String + if( opts.retryPolicy ) + retryConfig = new SraRetryConfig(opts.retryPolicy as Map) } DataflowWriteChannel apply() { @@ -181,7 +196,7 @@ class SraExplorer { protected Map makeDataRequest(String url) { log.debug "SRA data request url=$url" - final text = new URL(url).getText() + final text = runWithRetry(()->getTextFormUrl(url)) log.trace "SRA data result:\n${pretty(text)?.indent()}" def response = jsonSlurper.parseText(text) @@ -220,7 +235,7 @@ class SraExplorer { protected SearchRecord makeSearch(String url) { log.debug "SRA search url=$url" - final text = new URL(url).getText() + final text = runWithRetry(()-> getTextFormUrl(url)) log.trace "SRA search result:\n${pretty(text)?.indent()}" final response = jsonSlurper.parseText(text) @@ -265,10 +280,14 @@ class SraExplorer { return result } + protected static String getTextFormUrl(String url) { + new URI(url).toURL().getText() + } + protected String readRunUrl(String acc) { final url = "https://www.ebi.ac.uk/ena/portal/api/filereport?result=read_run&fields=fastq_ftp&accession=$acc" log.debug "SRA fetch ftp fastq url=$url" - String result = new URL(url).text.trim() + String result = runWithRetry(() -> getTextFormUrl(url)).trim() log.trace "SRA fetch ftp fastq url result:\n${result?.indent()}" if( result.indexOf('\n')==-1 ) { @@ -330,6 +349,68 @@ class SraExplorer { return url } + /** + * Creates a retry policy using the SRA retry configuration + * + * @param cond A predicate that determines when a retry should be triggered + * @return The {@link dev.failsafe.RetryPolicy} instance + */ + protected RetryPolicy retryPolicy(Predicate cond) { + final EventListener listener = new EventListener() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + log.debug("Retryable response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}") + } + } + return RetryPolicy.builder() + .handleIf(cond) + .withBackoff(retryConfig.delay.toMillis(), retryConfig.maxDelay.toMillis(), ChronoUnit.MILLIS) + .withMaxAttempts(retryConfig.maxAttempts) + .withJitter(retryConfig.jitter) + .onRetry(listener) + .build() + } + + /** + * Carry out the invocation of the specified action using a retry policy + * when {@link java.io.IOException} is returned containing an error code. + * + * @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner + * @return The result of the supplied action + */ + protected T runWithRetry(CheckedSupplier action) { + // define listener + final listener = new EventListener() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + log.debug("Retryable response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}") + } + } + // define the retry condition + final cond = new Predicate() { + @Override + boolean test(Throwable t) { + if( t instanceof IOException && containsErrorCodes(t.message, RETRY_CODES)) + return true + if(t.cause instanceof IOException && containsErrorCodes(t.cause.message, RETRY_CODES)) + return true + return false + } + } + // create the retry policy + def policy = retryPolicy(cond) + // apply the action with + return Failsafe.with(policy).get(action) + } + + static boolean containsErrorCodes(String message, List codes){ + def pattern = /Server returned HTTP response code: (\d+) for URL.*/ + def matcher = (message =~ pattern) + def httpCode = matcher ? matcher[0][1] as Integer : null + return httpCode != null && codes.contains(httpCode) + + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/datasource/SraRetryConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/datasource/SraRetryConfig.groovy new file mode 100644 index 0000000000..f752a54a3d --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/datasource/SraRetryConfig.groovy @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.datasource + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString +import nextflow.util.Duration +/** + * Models retry policy configuration for Sra queries + * + * @author Jorge Ejarque + */ +@ToString(includePackage = false, includeNames = true) +@EqualsAndHashCode +@CompileStatic +class SraRetryConfig { + Duration delay = Duration.of('500ms') + Duration maxDelay = Duration.of('30s') + int maxAttempts = 3 + double jitter = 0.25 + + SraRetryConfig() { + this(Collections.emptyMap()) + } + + SraRetryConfig(Map config) { + if( config.delay ) + delay = config.delay as Duration + if( config.maxDelay ) + maxDelay = config.maxDelay as Duration + if( config.maxAttempts ) + maxAttempts = config.maxAttempts as int + if( config.jitter ) + jitter = config.jitter as double + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/ChannelTest.groovy b/modules/nextflow/src/test/groovy/nextflow/ChannelTest.groovy index e954c4bbcc..ccd2fd3ea4 100644 --- a/modules/nextflow/src/test/groovy/nextflow/ChannelTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/ChannelTest.groovy @@ -990,4 +990,24 @@ class ChannelTest extends Specification { } + def 'should not fail when setting SRA correct properties' () { + given: + def id = 'SRR389222' + def retryPolicy = [maxAttempts: 2] + + when: + def result = Channel.fromSRA(id, apiKey: '1234', retryPolicy: retryPolicy, cache: false, max: 10, protocol: 'http') + then: + result != null + + } + + def 'should fail when SRA incorrect property' () { + when: + def result = Channel.fromSRA('SRR389222', incorrectKey: '1234') + then: + thrown(IllegalArgumentException) + } + + } diff --git a/modules/nextflow/src/test/groovy/nextflow/datasource/SraExplorerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/datasource/SraExplorerTest.groovy index d251873a3a..585cc5f0ff 100644 --- a/modules/nextflow/src/test/groovy/nextflow/datasource/SraExplorerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/datasource/SraExplorerTest.groovy @@ -16,6 +16,8 @@ package nextflow.datasource +import dev.failsafe.FailsafeException + import java.nio.file.Files import java.nio.file.Path @@ -242,4 +244,36 @@ class SraExplorerTest extends Specification { result == '1bc' } + def 'should detect retry errors' () { + given: + def ex = new IOException("Server returned HTTP response code: " + ERROR +" for URL: https://dummy.url") + + expect: + SraExplorer.containsErrorCodes(ex.getLocalizedMessage(), SraExplorer.RETRY_CODES) == EXPECTED + + where: + ERROR | EXPECTED + '404' | false + '429' | true + + } + def 'should retry on errors' () { + given: + def ex = new IOException("Server returned HTTP response code: 429 for URL: https://dummy.url") + def slurper = new SraExplorer(null, [retryPolicy: [maxAttempts: 2]]) + def retries = 0 + + when: + slurper.runWithRetry{ + retries ++ + throw ex + } + + then: + def e = thrown(FailsafeException) + e.cause.message == ex.message + retries == 2 + } + + } diff --git a/modules/nextflow/src/test/groovy/nextflow/datasource/SraRetryConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/datasource/SraRetryConfigTest.groovy new file mode 100644 index 0000000000..61fc9c1339 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/datasource/SraRetryConfigTest.groovy @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.datasource + +import nextflow.util.Duration +import spock.lang.Specification + +/** + * + * @author Jorge Ejarque + */ +class SraRetryConfigTest extends Specification { + + def 'should create retry config'() { + + expect: + new SraRetryConfig().delay == Duration.of('500ms') + new SraRetryConfig().maxDelay == Duration.of('30s') + new SraRetryConfig().maxAttempts == 3 + new SraRetryConfig().jitter == 0.25d + + and: + new SraRetryConfig([maxAttempts: 20]).maxAttempts == 20 + new SraRetryConfig([delay: '1s']).delay == Duration.of('1s') + new SraRetryConfig([maxDelay: '1m']).maxDelay == Duration.of('1m') + new SraRetryConfig([jitter: '0.5']).jitter == 0.5d + + } +}