Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include retry policy in Channel.fromSRA factory #5387

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/reference/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ Available options:
`protocol`
: Allow choosing the protocol for the resulting remote URLs. Available choices: `ftp`, `http`, `https` (default: `ftp`).

`retryPolicy`
: Set a retry policy in case of the SRA request fail with a retriable error.
The retry policy is set as a Map specifying the different policy properties.

Available retry policy properties:

| Property | Description | Default |
| ------------- |-------------------------------------------------| ------- |
| `delay` | Delay when retrying failed SRA requests. | `500ms` |
| `jitter` | Jitter value when retrying failed SRA requests. | `0.25` |
| `maxAttempts` | Max attempts when retrying failed SRA requests. | `3` |
| `maxDelay` | Max delay when retrying failed SRA requests. | `30s` |

The following code snippet shows an example for using the `Channel.fromSRA` factory method with a custom `retryPolicy`.

```groovy
channel.fromSRA(ids, retryPolicy: [delay: '250ms', maxAttempts: 5])
```

(channel-interval)=

## interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package nextflow.datasource

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier

import java.nio.file.NoSuchFileException
import java.nio.file.Path

Expand All @@ -34,6 +40,10 @@ import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.util.CacheHelper
import nextflow.util.Duration

import java.time.temporal.ChronoUnit
import java.util.function.Predicate

/**
* Query NCBI SRA database and returns the retrieved FASTQs to the specified
* target channel. Inspired to SRA-Explorer by Phil Ewels -- https://ewels.github.io/sra-explorer/
Expand All @@ -43,7 +53,9 @@ import nextflow.util.Duration
@Slf4j
class SraExplorer {

static public Map PARAMS = [apiKey:[String,GString], cache: Boolean, max: Integer, protocol: ['ftp','http','https']]
static public Map PARAMS = [apiKey:[String,GString], cache: Boolean, max: Integer, protocol: ['ftp','http','https'],
retryPolicy: Map]
final static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)

@ToString
static class SearchRecord {
Expand All @@ -67,6 +79,7 @@ class SraExplorer {
private List<String> missing = new ArrayList<>()
private Path cacheFolder
private String protocol = 'ftp'
private SraRetryConfig retryConfig = new SraRetryConfig()

String apiKey
boolean useCache = true
Expand Down Expand Up @@ -94,6 +107,8 @@ class SraExplorer {
maxResults = opts.max as int
if( opts.protocol )
protocol = opts.protocol as String
if( opts.retryPolicy )
retryConfig = new SraRetryConfig(opts.retryPolicy as Map)
}

DataflowWriteChannel apply() {
Expand Down Expand Up @@ -181,7 +196,7 @@ class SraExplorer {

protected Map makeDataRequest(String url) {
log.debug "SRA data request url=$url"
final text = new URL(url).getText()
final text = runWithRetry(()->getTextFormUrl(url))

log.trace "SRA data result:\n${pretty(text)?.indent()}"
def response = jsonSlurper.parseText(text)
Expand Down Expand Up @@ -220,7 +235,7 @@ class SraExplorer {

protected SearchRecord makeSearch(String url) {
log.debug "SRA search url=$url"
final text = new URL(url).getText()
final text = runWithRetry(()-> getTextFormUrl(url))

log.trace "SRA search result:\n${pretty(text)?.indent()}"
final response = jsonSlurper.parseText(text)
Expand Down Expand Up @@ -265,10 +280,14 @@ class SraExplorer {
return result
}

protected static String getTextFormUrl(String url) {
new URI(url).toURL().getText()
}

protected String readRunUrl(String acc) {
final url = "https://www.ebi.ac.uk/ena/portal/api/filereport?result=read_run&fields=fastq_ftp&accession=$acc"
log.debug "SRA fetch ftp fastq url=$url"
String result = new URL(url).text.trim()
String result = runWithRetry(() -> getTextFormUrl(url)).trim()
log.trace "SRA fetch ftp fastq url result:\n${result?.indent()}"

if( result.indexOf('\n')==-1 ) {
Expand Down Expand Up @@ -330,6 +349,68 @@ class SraExplorer {
return url
}

/**
* Creates a retry policy using the SRA retry configuration
*
* @param cond A predicate that determines when a retry should be triggered
* @return The {@link dev.failsafe.RetryPolicy} instance
*/
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
final EventListener<ExecutionAttemptedEvent> listener = new EventListener<ExecutionAttemptedEvent>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
log.debug("Retryable response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.withBackoff(retryConfig.delay.toMillis(), retryConfig.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(retryConfig.maxAttempts)
.withJitter(retryConfig.jitter)
.onRetry(listener)
.build()
}

/**
* Carry out the invocation of the specified action using a retry policy
* when {@link java.io.IOException} is returned containing an error code.
*
* @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner
* @return The result of the supplied action
*/
protected <T> T runWithRetry(CheckedSupplier<T> action) {
// define listener
final listener = new EventListener<ExecutionAttemptedEvent>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
log.debug("Retryable response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
// define the retry condition
final cond = new Predicate<? extends Throwable>() {
@Override
boolean test(Throwable t) {
if( t instanceof IOException && containsErrorCodes(t.message, RETRY_CODES))
return true
if(t.cause instanceof IOException && containsErrorCodes(t.cause.message, RETRY_CODES))
return true
return false
}
}
// create the retry policy
def policy = retryPolicy(cond)
// apply the action with
return Failsafe.with(policy).get(action)
}

static boolean containsErrorCodes(String message, List<Integer> codes){
def pattern = /Server returned HTTP response code: (\d+) for URL.*/
def matcher = (message =~ pattern)
def httpCode = matcher ? matcher[0][1] as Integer : null
return httpCode != null && codes.contains(httpCode)

}

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.datasource

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import nextflow.util.Duration
/**
* Models retry policy configuration for Sra queries
*
* @author Jorge Ejarque <[email protected]>
*/
@ToString(includePackage = false, includeNames = true)
@EqualsAndHashCode
@CompileStatic
class SraRetryConfig {
Duration delay = Duration.of('500ms')
Duration maxDelay = Duration.of('30s')
int maxAttempts = 3
double jitter = 0.25

SraRetryConfig() {
this(Collections.emptyMap())
}

SraRetryConfig(Map config) {
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
20 changes: 20 additions & 0 deletions modules/nextflow/src/test/groovy/nextflow/ChannelTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,24 @@ class ChannelTest extends Specification {

}

def 'should not fail when setting SRA correct properties' () {
given:
def id = 'SRR389222'
def retryPolicy = [maxAttempts: 2]

when:
def result = Channel.fromSRA(id, apiKey: '1234', retryPolicy: retryPolicy, cache: false, max: 10, protocol: 'http')
then:
result != null

}

def 'should fail when SRA incorrect property' () {
when:
def result = Channel.fromSRA('SRR389222', incorrectKey: '1234')
then:
thrown(IllegalArgumentException)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.datasource

import dev.failsafe.FailsafeException

import java.nio.file.Files
import java.nio.file.Path

Expand Down Expand Up @@ -242,4 +244,36 @@ class SraExplorerTest extends Specification {
result == '1bc'
}

def 'should detect retry errors' () {
given:
def ex = new IOException("Server returned HTTP response code: " + ERROR +" for URL: https://dummy.url")

expect:
SraExplorer.containsErrorCodes(ex.getLocalizedMessage(), SraExplorer.RETRY_CODES) == EXPECTED

where:
ERROR | EXPECTED
'404' | false
'429' | true

}
def 'should retry on errors' () {
given:
def ex = new IOException("Server returned HTTP response code: 429 for URL: https://dummy.url")
def slurper = new SraExplorer(null, [retryPolicy: [maxAttempts: 2]])
def retries = 0

when:
slurper.runWithRetry{
retries ++
throw ex
}

then:
def e = thrown(FailsafeException)
e.cause.message == ex.message
retries == 2
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.datasource

import nextflow.util.Duration
import spock.lang.Specification

/**
*
* @author Jorge Ejarque <[email protected]>
*/
class SraRetryConfigTest extends Specification {

def 'should create retry config'() {

expect:
new SraRetryConfig().delay == Duration.of('500ms')
new SraRetryConfig().maxDelay == Duration.of('30s')
new SraRetryConfig().maxAttempts == 3
new SraRetryConfig().jitter == 0.25d

and:
new SraRetryConfig([maxAttempts: 20]).maxAttempts == 20
new SraRetryConfig([delay: '1s']).delay == Duration.of('1s')
new SraRetryConfig([maxDelay: '1m']).maxDelay == Duration.of('1m')
new SraRetryConfig([jitter: '0.5']).jitter == 0.5d

}
}
Loading