Skip to content

Commit ba28c09

Browse files
committed
Support prefetching plugins metadata from an http api
Signed-off-by: Tom Sellman <[email protected]>
1 parent c8750a0 commit ba28c09

File tree

7 files changed

+497
-20
lines changed

7 files changed

+497
-20
lines changed

modules/nf-commons/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,9 @@ dependencies {
3939
/* testImplementation inherited from top gradle build file */
4040
testImplementation(testFixtures(project(":nextflow")))
4141
testFixturesImplementation(project(":nextflow"))
42+
43+
testImplementation "org.apache.groovy:groovy-json:4.0.26" // needed by wiremock
44+
testImplementation ('com.github.tomakehurst:wiremock:3.0.0-beta-1') { exclude module: 'groovy-all' }
45+
testImplementation ('com.github.tomjankes:wiremock-groovy:0.2.0') { exclude module: 'groovy-all' }
4246
}
4347

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package nextflow.plugin
2+
3+
import com.google.gson.Gson
4+
import dev.failsafe.Failsafe
5+
import dev.failsafe.FailsafeExecutor
6+
import dev.failsafe.Fallback
7+
import dev.failsafe.RetryPolicy
8+
import dev.failsafe.event.EventListener
9+
import dev.failsafe.event.ExecutionAttemptedEvent
10+
import dev.failsafe.function.CheckedSupplier
11+
import groovy.transform.CompileStatic
12+
import groovy.util.logging.Slf4j
13+
import nextflow.BuildInfo
14+
import org.pf4j.PluginRuntimeException
15+
import org.pf4j.update.FileDownloader
16+
import org.pf4j.update.FileVerifier
17+
import org.pf4j.update.PluginInfo
18+
import org.pf4j.update.SimpleFileDownloader
19+
import org.pf4j.update.verifier.CompoundVerifier
20+
21+
import java.net.http.HttpClient
22+
import java.net.http.HttpRequest
23+
import java.net.http.HttpResponse
24+
25+
/**
26+
* Represents an update repository served via an HTTP api.
27+
*
28+
* It implements PrefetchUpdateRepository so that all relevant
29+
* plugin metadata can be loaded with a single HTTP request, rather
30+
* than a request-per-plugin.
31+
*
32+
* Metadata is prefetched into memory when Nextflow starts and expires
33+
* upon termination (or when 'refresh()' is called).
34+
*/
35+
@Slf4j
36+
@CompileStatic
37+
class HttpPluginRepository implements PrefetchUpdateRepository {
38+
private final HttpClient client = HttpClient.newHttpClient()
39+
private final String id
40+
private final URI url
41+
42+
private Map<String, PluginInfo> plugins = new HashMap<>()
43+
44+
HttpPluginRepository(String id, URI url) {
45+
this.id = id
46+
this.url = url
47+
}
48+
49+
// NOTE ON PREFETCHING
50+
//
51+
// The prefetch mechanism is used to work around a limitation in the
52+
// UpdateRepository interface from pf4j.
53+
//
54+
// Specifically, p4fj expects that getPlugins() returns a Map<> of all
55+
// metadata about all plugins. To implement this for an HTTP repository
56+
// would require either downloading the entire contents of the remote
57+
// repository or implementing a lazy map and making an HTTP request for
58+
// each required plugin.
59+
//
60+
// Instead we can use the list of configured plugins to load all relevant
61+
// metadata in a single HTTP request at startup, and use this to populate
62+
// the map. Once the prefetch is complete, this repository will behave
63+
// like any other implementation of UpdateRepository.
64+
@Override
65+
void prefetch(List<PluginSpec> plugins) {
66+
if (plugins && !plugins.isEmpty()) {
67+
this.plugins = fetchMetadata(plugins)
68+
}
69+
}
70+
71+
@Override
72+
String getId() {
73+
return id
74+
}
75+
76+
@Override
77+
URL getUrl() {
78+
return url.toURL()
79+
}
80+
81+
@Override
82+
Map<String, PluginInfo> getPlugins() {
83+
if (plugins.isEmpty()) {
84+
log.warn "getPlugins() called before prefetch() - plugins map will be empty"
85+
return Map.of()
86+
}
87+
return Collections.unmodifiableMap(plugins)
88+
}
89+
90+
@Override
91+
PluginInfo getPlugin(String id) {
92+
return plugins.computeIfAbsent(id) { key -> fetchMetadataByIds([key]).get(key) }
93+
}
94+
95+
@Override
96+
void refresh() {
97+
plugins = fetchMetadataByIds(plugins.keySet())
98+
}
99+
100+
@Override
101+
FileDownloader getFileDownloader() {
102+
return new SimpleFileDownloader()
103+
}
104+
105+
@Override
106+
FileVerifier getFileVerifier() {
107+
return new CompoundVerifier()
108+
}
109+
110+
// ----------------------------------------------------------------------------
111+
// http handling
112+
113+
private Map<String, PluginInfo> fetchMetadataByIds(Collection<String> ids) {
114+
def specs = ids.collect(id -> new PluginSpec(id, null))
115+
return fetchMetadata(specs)
116+
}
117+
118+
private Map<String, PluginInfo> fetchMetadata(Collection<PluginSpec> specs) {
119+
final ordered = specs.sort(false)
120+
final CheckedSupplier<Map<String, PluginInfo>> supplier = () -> fetchMetadata0(ordered)
121+
return retry().get(supplier)
122+
}
123+
124+
private Map<String, PluginInfo> fetchMetadata0(List<PluginSpec> specs) {
125+
final gson = new Gson()
126+
127+
def reqBody = gson.toJson([
128+
'nextflowVersion': BuildInfo.version,
129+
'plugins' : specs
130+
])
131+
132+
def req = HttpRequest.newBuilder()
133+
.uri(url.resolve("plugins/collect"))
134+
.POST(HttpRequest.BodyPublishers.ofString(reqBody))
135+
.build()
136+
137+
def rep = client.send(req, HttpResponse.BodyHandlers.ofString())
138+
if (rep.statusCode() != 200) throw new PluginRuntimeException(errorMessage(rep, gson))
139+
140+
try {
141+
def repBody = gson.fromJson(rep.body(), FetchResponse)
142+
return repBody.plugins.collectEntries { p -> Map.entry(p.id, p) }
143+
} catch (Exception e) {
144+
log.info("Plugin metadata response body: '${rep.body()}'")
145+
throw new PluginRuntimeException("Failed to parse response body", e)
146+
}
147+
}
148+
149+
// create a retry executor using failsafe
150+
private static FailsafeExecutor retry() {
151+
EventListener<ExecutionAttemptedEvent> logAttempt = (ExecutionAttemptedEvent attempt) -> {
152+
log.debug("Retrying download of plugins metadata - attempt ${attempt.attemptCount}, ${attempt.lastFailure.message}", attempt.lastFailure)
153+
}
154+
Fallback fallback = Fallback.ofException { e ->
155+
e.lastFailure instanceof ConnectException
156+
? new ConnectException("Failed to download plugins metadata")
157+
: new PluginRuntimeException("Failed to download plugin metadata: ${e.lastFailure.message}")
158+
}
159+
final policy = RetryPolicy.builder()
160+
.withMaxAttempts(3)
161+
.handle(ConnectException)
162+
.onRetry(logAttempt)
163+
.build()
164+
return Failsafe.with(fallback, policy)
165+
}
166+
167+
private static String errorMessage(HttpResponse<String> rep, Gson gson) {
168+
try {
169+
def err = gson.fromJson(rep.body(), ErrorResponse)
170+
return "${err.type} - ${err.message}"
171+
} catch (Exception e) {
172+
return rep.body()
173+
}
174+
}
175+
176+
// ---------------------
177+
178+
/**
179+
* Response format object expected from repository
180+
*/
181+
private static class FetchResponse {
182+
List<PluginInfo> plugins
183+
}
184+
185+
private static class ErrorResponse {
186+
String type
187+
String message
188+
}
189+
}

modules/nf-commons/src/main/nextflow/plugin/PluginUpdater.groovy

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,11 @@ class PluginUpdater extends UpdateManager {
8989
result.add(new LocalUpdateRepository('downloaded', local))
9090
}
9191
else {
92-
result.add(new DefaultUpdateRepository('nextflow.io', remote))
92+
def remoteRepo = remote.path.endsWith('.json')
93+
? new DefaultUpdateRepository('nextflow.io', remote)
94+
: new HttpPluginRepository('registry', remote.toURI())
95+
96+
result.add(remoteRepo)
9397
result.addAll(customRepos())
9498
}
9599
return result
@@ -138,6 +142,18 @@ class PluginUpdater extends UpdateManager {
138142
return new DefaultUpdateRepository('uri', new URL(uri), fileName)
139143
}
140144

145+
/**
146+
* Prefetch metadata for plugins. This gives an opportunity for certain
147+
* repository types to perform some data-loading optimisations.
148+
*/
149+
void prefetchMetadata(List<PluginSpec> plugins) {
150+
for( def repo : this.@repositories ) {
151+
if( repo instanceof PrefetchUpdateRepository ) {
152+
repo.prefetch(plugins)
153+
}
154+
}
155+
}
156+
141157
/**
142158
* Resolve a plugin installing or updating the dependencies if necessary
143159
* and start the plugin

modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,25 @@ class PluginsFacade implements PluginStateListener {
5050
private PluginUpdater updater
5151
private CustomPluginManager manager
5252
private DefaultPlugins defaultPlugins = DefaultPlugins.INSTANCE
53-
private String indexUrl = Plugins.DEFAULT_PLUGINS_REPO
53+
private String indexUrl
5454
private boolean embedded
5555

5656
PluginsFacade() {
5757
mode = getPluginsMode()
5858
root = getPluginsDir()
59+
indexUrl = getPluginsIndexUrl()
5960
offline = env.get('NXF_OFFLINE') == 'true'
6061
if( mode==DEV_MODE && root.toString()=='plugins' && !isRunningFromDistArchive() )
6162
root = detectPluginsDevRoot()
6263
System.setProperty('pf4j.mode', mode)
6364
}
6465

65-
PluginsFacade(Path root, String mode=PROD_MODE, boolean offline=false) {
66+
PluginsFacade(Path root, String mode=PROD_MODE, boolean offline=false,
67+
String indexUrl=Plugins.DEFAULT_PLUGINS_REPO) {
6668
this.mode = mode
6769
this.root = root
6870
this.offline = offline
71+
this.indexUrl = indexUrl
6972
System.setProperty('pf4j.mode', mode)
7073
}
7174

@@ -95,6 +98,30 @@ class PluginsFacade implements PluginStateListener {
9598
}
9699
}
97100

101+
protected String getPluginsIndexUrl() {
102+
final url = env.get('NXF_PLUGINS_INDEX_URL')
103+
if( url ) {
104+
log.trace "Detected NXF_PLUGINS_INDEX_URL=$url"
105+
if( url != Plugins.DEFAULT_PLUGINS_REPO ) {
106+
// warn that this is experimental behaviour
107+
log.warn """\
108+
=======================================================================
109+
= WARNING =
110+
= You are running this script using a custom plugins index url. =
111+
= =
112+
= ${url}
113+
= =
114+
= This is an experimental feature and should not be used in production. =
115+
=============================================================================
116+
""".stripIndent(true)
117+
}
118+
return url
119+
} else {
120+
log.trace "Using default plugins url"
121+
return Plugins.DEFAULT_PLUGINS_REPO
122+
}
123+
}
124+
98125
private boolean isNextflowDevRoot(File file) {
99126
file.name=='nextflow' && file.isDirectory() && new File(file, 'settings.gradle').isFile()
100127
}
@@ -320,27 +347,21 @@ class PluginsFacade implements PluginStateListener {
320347
new DefaultPluginManager()
321348
}
322349

323-
void start( String pluginId ) {
324-
if( isEmbedded() && defaultPlugins.hasPlugin(pluginId) ) {
325-
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugin: $pluginId"
326-
return
327-
}
328-
329-
start(PluginSpec.parse(pluginId, defaultPlugins))
350+
void start(String pluginId) {
351+
start([PluginSpec.parse(pluginId, defaultPlugins)])
330352
}
331353

332-
void start(PluginSpec plugin) {
333-
if( isEmbedded() && defaultPlugins.hasPlugin(plugin.id) ) {
334-
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugin: $plugin.id"
335-
return
354+
void start(List<PluginSpec> specs) {
355+
def split = specs.split { plugin -> isEmbedded() && defaultPlugins.hasPlugin(plugin.id) }
356+
def (skip, toStart) = [split[0], split[1]]
357+
if( !skip.isEmpty() ) {
358+
def skippedIds = skip.collect{ plugin -> plugin.id }
359+
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugins: $skippedIds"
336360
}
337361

338-
updater.prepareAndStart(plugin.id, plugin.version)
339-
}
340-
341-
void start(List<PluginSpec> specs) {
342-
for( PluginSpec it : specs ) {
343-
start(it)
362+
updater.prefetchMetadata(toStart)
363+
for( PluginSpec plugin : toStart ) {
364+
updater.prepareAndStart(plugin.id, plugin.version)
344365
}
345366
}
346367

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package nextflow.plugin
2+
3+
import groovy.transform.CompileStatic
4+
import org.pf4j.update.UpdateRepository
5+
6+
/**
7+
* Extension to pf4j's UpdateRepository which supports pre-fetching
8+
* metadata for a specified set of plugins.
9+
*
10+
* This gives the ability to avoid downloading metadata for unused
11+
* plugins.
12+
*/
13+
@CompileStatic
14+
interface PrefetchUpdateRepository extends UpdateRepository {
15+
/**
16+
* This will be called when Nextflow starts, before
17+
* initialising the plugins.
18+
*/
19+
void prefetch(List<PluginSpec> plugins)
20+
}

0 commit comments

Comments
 (0)