-
Notifications
You must be signed in to change notification settings - Fork 664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wr as new Nextflow backend #1088
Comments
This is a nice initiative, I'll be very happy to review and merge a pull request implementing the support for this back-end engine. |
@pditommaso Is there a way to get around needing a shared disk for Nextflow state? |
Not easily, the bottom line is that temporary task outputs need to be shared to downstream tasks, therefore some kind of shared storage. For example with AWS Batch NF is using S3 as shared storage, I guess for OpenStack there's a similar object storage solution (and actually there's).
It depends, if wr provides a command line tool somehow similar to qsub, bsub, etc, your can just create a executor extending If you are planning instead to use a low-level API, you can have a look can extend the
Nope, but I can advise you if needed. |
That works for wr's purposes; wr provides easy mounting of S3, which could be used for job inputs/outputs. The problem I have is with Nextflow running on the "master" node needing to check the output written to S3 on a "worker" node before it, eg. submits the next job. wr's S3 mount is not POSIX enough for what Nexflow on the "master" needs to do for this state checking. (eg. it caches S3 "directory" contents, so is not aware if a different process writes to that "directory") Can job input/outputs be stored in S3, but job state be queried from the scheduler? |
Yes, you will need to write your own version of TaskHandler |
I have an initial basic but working implementation here: https://github.com/sb10/nextflow/blob/wr/modules/nextflow/src/main/groovy/nextflow/executor/WrExecutor.groovy I have
I've only had a quick glance at an example LSF test, and didn't really get it. How should I go about writing a complete test for this? Are end-to-end tests done, or only mocks? I'm currently using the TaskPollingMonitor, but the ideal way of implementing this would be for Nextflow to listen on a websocket and for wr to tell it the moment that tasks change state. What would be the best approach to doing that? Implement my own TaskMonitor? Or is it even possible? Currently, my implementation is only working running locally. I still haven't thought about how to handle input and output files being in S3. With existing Executors, how does a user start a workflow running when their input files are not available locally, but in an S3 bucket? And how do they specify that the output files go to S3 as well? |
It looks a good start.
I would suggest submitting a draft PR so I can reply to your comments.
The
What do you mean?
What do you mean for pure?
More than Groovy, you should look at URLConnection Java API. Have a look at K8sClient
Task specific settings are accessible on
Mostly when the execution is aborted and the cleanup method is invoked.
Mocks would be a good start. Have a look a K8sClientTest
I would keep this as a future enhancement.
This should be managed automatically by BashWrapperBuilder |
PR created here: #1114
OK, I'll give it a try. But what is
If possible I don't want Nextflow to create any files of its own (only tasks commands should create workflow output files). If that's not possible, then at least I need the wrapper script it creates to not have any absolute paths in it, because those paths won't exist on other OpenStack instances that wr might create to run tasks.
For a scatter task,
Specifically, what state could a task be in when kill() is called? Any state? Currently my code only considers "started" and "complete" statuses, which actually comprise multiple underlying states, and miss some possibilities. For example, in wr a job can be "lost" if it was running on an instance that got deleted externally. The user confirms the situation and then tells wr the job is really dead, and can then choose to restart it. I imagine the current situation with my Nextflow implementation, it would initially satisfy checkIfRunning(), but never satisfy checkIfCompleted(). Is something going to happen automatically in Nextflow to handle these situations, or should I either a) include "lost" as a "complete" status, or b) should users be checking wr's own status interface to deal with lost jobs?
It's not clear to me how this is done. What does a user have to do to enable this feature? The scenario I envisage is: Nextflow is run on a local machine from which wr has been deployed to OpenStack. This local machine does not have the S3 bucket mounted, and is completely different to the OpenStack servers that will be created to run tasks (in particular, different home directory location).
This results in For this to actually work, Is a scenario like this at all possible? Or what is the "Nextflow" way of doing this kind of thing? |
Actually, it should work. TES executor uses the same method. The best thing to do is to create a unit test and debug it.
You can create a copy strategy that does nothing. Have a look at TesFileCopyStrategy
Check the
It's used to kill pending or running tasks when the workflow execution is aborted.
It strongly depends on the storage type used as a pipeline work dir. When an input file is stored in a file system different for the one used as work dir, NF copies that file from the foreign file system to the work dir (for example, an input is on ftp and the work dir is a S3 bucket). This is controlled by the |
Are there any guides or examples aimed at end-users of using Nextflow with data in S3? |
There's nothing special, file paths need to start with |
I'm testing with this basic workflow:
If I change the input to be specified like:
Then it fails because it tries to contact Amazon S3. How do I tell Nextflow to use my own S3 service? I looked at https://www.nextflow.io/docs/latest/config.html#config-aws and created a
Assuming that can be resolved, what change do I make to my workflow so that the I'm hoping that instead of the current method of copying files from S3 to local disk as part of staging, I can make use of wr's S3 mount capabilities, avoiding the need for large local disks when working on large files in S3. |
Here there's an example (an by the way it should be a guy at sanger).
Tasks are implicitly assumed to be executed in a posix file system and therefore output files are created in the task work dir. Then if the pipeline work dir is a remote storage, NF will copy that file to the remote work dir.
It should be possible writing your own copying strategy. |
How does a user specify this? Or how does Nextflow know? Anyway, I'll look in to writing my own copy strategy. |
Using the
Then, this decides if an input file is on a foreign storage ie. different from the current work dir |
It wasn't working because of my config file, but I'm not sure how to get the TesExecutor's way of doing it to work.
but
or any other combination I could think of. What is the correct way that people would expect to be able to specify executor options in their |
I've now implemented my WrTaskHandler as a BatchHandler, which speeds up getting task status. Is there an equivalent way to batch up the submit requests as well? |
Do you mean to batch submit requests? nope, there isn't a specific mechanism for this. |
Yes, that's what I mean. I'm thinking I can add a BatchSubmitHandler trait to BatchHandler.groovy, which is identical to the BatchHandler trait but just with a different method name (batchSubmit() instead of batch()). I need this because a single batch() method can't handle the needs of batching both status requests and submit requests. Alternatively, I could pass my REST API client to my TaskMonitor and have the monitor class directly submit all the tasks in the pending queue to the client in 1 go. Or something else? |
I think both approaches can work. I would suggest to keep it as an optimisation step, or just postpone it because I need to merge some changes in the TaskPolling and Executor login in the master next week. |
Getting back to S3, I've got my config file working by doing:
And then using the normal But my
Is this expected, or am I not quite doing something correctly? Am I supposed to mount my sb10 bucket at |
And if that's the case, how can my WrTaskHandler.submit() method know that it should request such a mount when submitting to wr? |
One question at a time please, otherwise the thread become unreadable. You are getting that path because you are using the Provided that you will mount the inputs (in a container I supposed) how are you expecting the script to reference these files? |
The ideal way would be that I submit the command On an OpenStack node wr will create a unique working directory in I've been trying to get my head around |
Let's put in this way, give a pipeline execution work dir as Then for each (s3) input file the stageInputFile method is invoked to which is passed the source S3 path and the expected relative file name resolved against the task work dir (most of the time just the file name w/o path). Therefore it's the role of your custom file copy strategy to implement the logic to handle your use case. IMO it should be very similar to AwsBatchFileCopyStrategy.groovy with the difference that you don't need to copy the input files but only the output files to the targetDir (that is supposed to be the same as Entering in Easter mode 😄👋👋 |
Thanks. I now have tasks running successfully in OpenStack with data in S3, but my output file is not "unstaged" to be put in S3, so the workflow fails due to missing output files.
In particular, I don't know how to specify that I want Or should the workflow remain unchanged, and instead my CopyStrategy be changed to copy all output files to the same place the Edit: my |
@PeteClapham Nextflow using wr as a backend does use wr's database for state, but my implementaiton still involves Nextflow creating at least 2 files and numerous folders per job. These files are bash scripts of what should be run. I just tried to benchmark a workflow that just did 1 million |
yes, the disk requirements are also proving an impact. Again, a database would help here. I'll send a local pm with more details |
@PeteClapham I don't think it's an issue in OpenStack though, since it will only be a few files and folders per machine and no shared disk. Of course S3 needs to cope with lots of files. |
Absolutely, all platforms are going to be similarly limited regardless of the storage backend in use, some will cope for longer than others but the issue wil remain. Ultimately all open / close operations come with some overhead. Physical spiining storage elements suffer more than SSD etc but even then databaes are more scalable at this type of transaction. |
@PeteClapham We don't have a need to scale infinitely though, because we only have so much hardware on which to execute jobs. So if a workflow can fill a team's OpenStack quota without hitting performance issues, that's good enough. I think it's theoretically possible to make my Nextflow->wr implemention create no extra files on disk (just folders and the files created by the actual bioinformatic executables), but that may be a lot of extra work. Would be good to understand the details of real problems first. |
@sb10 I suspect that infinite scaling co-existis with Nessi and Bigfoot, at least I've never seen the 3 of them in the same room together :) Realistically, we have numerous workloads with 20,000 - 50,000 jobs within a single array (On LSF) and similar numbers that are being pushed through cloud platforms. In both cases 10k related concurrent running tasks is not unusual to see, depending on platform contention. So this is not an un-usual scaling question and certainly one we have been seeing local for many years. We can expect to see customers looking to scale to greater numbers in the longer term, but this will depend upon the ability of the infrastructure and tools to provide the required support to make this happen, hence the comment above. |
We have a local customer who has been hitting or reporting the above impact, so I'll pass on their details to allow a more rapid dialogue. |
For clarity, holding state in small files and many artifacts n disk has been reported locally as significantly imapcting scalabilty, quota management and run cleanup should a component or components fail. It may be that there are better working solutions around the current concerns and we would be happy to receive guidance from the NF team. Internally, we can look at optimisation otions regarding WR and similar workflows. I suspect that there will be local optimisations arising as teams become more familiar with the tools in flight. Thanks to you both for helping look at this. It is much appreciated. |
At some point, NF should implement a garbage collection algorithm able to remove files as soon as they are not needed any more #452. |
You can read the guide I wrote for users here: https://github.com/VertebrateResequencing/wr/wiki/Nextflow To try this out yourself for real, you can follow the guide up to the LSF section, but start wr in local mode with (To "install" wr, just download the latest version from https://github.com/VertebrateResequencing/wr/releases and extract it from the zip.) The PR has no integration tests with wr, btw. Is that a problem? |
It looks good.
No, but it could be useful that you setup a CI service for this. |
Hi, I'm not sure how to setup a CI service for it. This code has been in use here for production work at scale for some months now. It would be good to get it integrated so users can use the latest version of Nextflow without losing wr's OpenStack scheduling. |
I've rebased it on top of latest master. Tests pass locally, but your CI tests fail. I'm not sure why or how to debug. |
Is there are link to the error log or you can post it here? |
Are the WR tests failing. Weird local tests pass. Are you using your own password/s3 buckets/etc? |
I don't think any actual S3 access is occurring in these tests. Sorry, I was running the tests wrong locally. When I run |
Fixed and all tests now passing. I don't know how they passed before or what changed in between, but hopefully it's all correct now. |
Now the tests in CI are failing due to timing out after about 1.5hrs. |
It looks one or more wr tests hang. |
How do you debug what's happening in that CI service? If it doesn't hang locally, what can I do? |
The tests report is uploaded to the nextflow web site, but this is disabled on pull requests for security reasons. You may try to add a Timeout in your test classes. For example
|
Hi, getting back to working on this again. I've just added my files from #1148 to https://github.com/nextflow-io/nf-wr, it compiles with the command in the README.md, but I don't know how to run the tests or use it with Nextflow. Can you offer some guidance? |
I got tests to work and they're now passing. But how does an end-user install latest Nextflow that uses nf-wr? |
Nice. Once packaged and uploaded to Maven the user will need to use the |
Thanks. I don't know anything about Maven. Have you got code somewhere that does this package and upload? Or do I just follow something like https://maven.apache.org/repository/guide-central-repository-upload.html ? What repository do I upload to? And can you give me the exact value I'd set |
Uploading to Maven central is quite tricky. I'll try to set up automated uploading with the CI build at my earliest convenience. |
Thanks. In the mean time, is there any way I can create a build for myself with latest Nextflow and nf-wr? Ie. point it at my local clone or .jar file instead of at Maven? |
Let's move this to its own issue nextflow-io/nf-wr#1 |
New feature
I develop wr which is a workflow runner like Nextflow, but can also just be used as a backend scheduler. It can schedule to LSF and OpenStack right now.
The benefit to Nextflow users of going via wr instead of using Nextflow’s existing LSF or Kubernetes support is:
Usage scenario
Users with access to LSF or OpenStack clusters who want to run their Nextflow workflows efficiently and easily.
Suggest implementation
Since I don’t know Java well enough to understand how to implement this “correctly”, I wrote a simple bsub emulator in wr, which is what my tests so far have been based on. I submit the Nextflow command as a job to wr, turning on the bsub emulation, and configure Nextflow to use its existing LSF scheduler. While running under the emulation, Nextflow’s bsub calls actually call wr.
Of course the proper way to do this would be have Nextflow call wr directly (either the wr command line, or it’s REST API). The possibly tricky thing with regard to having it work in OpenStack mode is having it tell wr about OpenStack-specific things like what image to use, what hardware flavour to use, pass details on how to mount S3 etc. (the bsub emulation handles all of this).
Here's what I did for my LSF test...
echo_1000_sleep.nf:
nextflow.config:
install wr:
run:
Here's what I did to get it to work in OpenStack...
nextflow_install.sh:
put input files in S3:
~/.openstack_rc:
run:
The NFS share at
/shared
created by the--cloud_shared
option is slow and limited in size; a better solution would be to set up your own high performance shared filesystem in OpenStack (eg. GlusterFS), then add tonextflow_install.sh
to mount this share. Or even better, is there a way to have Nextflow not store state on disk? If it could just query wr for job completion status, that would be better.The text was updated successfully, but these errors were encountered: