Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using SCOOP for distributed parallel processing #111

Closed
wants to merge 45 commits into from
Closed

Conversation

rwest
Copy link
Member

@rwest rwest commented Apr 5, 2013

SCOOP (Scalable Concurrent Operations in Python) is a distributed task module allowing concurrent parallel programming on various environments, from heterogeneous grids to supercomputers.
http://code.google.com/p/scoop/

This branch implements it for bits of RMG-Py.

Do a pip install scoop, checkout this branch, then try it with python -m scoop -vv rmg.py path/to/input.py or see the queue submission file in examples/rmg/scoop/ for running on a multi-computer cluster.

I don't think this is ready to be merged yet, but am making a pull request as somewhere to store discussions.

So far:

  • Database cache saving (in the main thread) and loading (in each worker)
  • Functioning on a multi-core machine
  • Functioning across several nodes on a cluster (via LSF queuing system)
  • Implement for Thermo estimation.
  • Implement for Pressure dependence calculations.
  • Implement for Reactor simulations.
  • Get logging information to end up where you expect it.

Until we implement QMThermo, parallelizing thermo estimation alone doesn't save an awful lot of time.
Implementing the reactor simulations and PDep calculations may be more difficult, or at least involve more pickling and passing. E.g. can we unpickle an updated network after a remotely executed PDep calc, or will all the reactions and species point to the wrong things?

…nModel

This is so we can do parallelization etc within this method.
For now it's very simple and just the same as before.
Run as python -m scoop -vv rmg.py path/to/input.py

This is VERY slow, as we're pickling, passing, and unpickling
the entire database every time we try to evaluate a species.

…but it works :)
This is the start of a framework for sharing the database
across multiple workers with scoop.

Saving is a method of rmgpy.data.rmg.
Loading is done when needed.

The filename used to store the database pickle is set via the environment variable
RMG_DB_FILE.

Call it with something like:
 RMG_DB_FILE=$PWD/database.pkl python -m scoop -vv rmg.py path/to/input.py
The example is a copy of the methylformate example.
The lsf.sh script should be submitted to the LSF queuing system.
This is the system used on "Venture" at Northeastern University.

Submit with "bsub < lsf.sh".

Apparently Lava is related, so it may work for that also.
http://en.wikipedia.org/wiki/Platform_LSF

Equivalent ones should probably be written for PBS, etc.

NB. the RMG_DB_FILE environment variable is required.
This function returns the checksum (hash) of a list of files/folders,
eg. 
  hash = rmgpy.utilities.path_checksum(['path/to/database'])

This will be useful for checking whether things have changed, for cache validation.
We hash a bunch of metadata to try to be sure that the cached database is the
same as what would be loaded if you loaded it from scratch. Hopefully I have
included everything that matters.
We pass around the location and hash of the database cache file so
that each worker can load it and check it has the right version.

This removes the need for the "RMG_DB_FILE" environment variable.
@rwest
Copy link
Member Author

rwest commented Apr 9, 2013

Here is the profile from the scoop example job run on 8 processors across 4 machines.
RMG profile dot

I'm not sure if the thermo estimation is really taking only 1% of the time, or if it's just no longer being counted properly.
Clearly for this example case, updateUnimolecularReactionNetworks is the next target.

…mporary solution to avoid attribute errors from cclib during parsing.
Adding standoalone thermoestimator to scoop2 branch.
Added a generator to divide species list into chunks (100 species) so that output.txt is written once a chunk is calculated.
Updated scoop from 0.62 to development version of 0.7RC1. With this new version of scoop you can pass environment variables to workers through bash scripts (prolog.sh).
Fixed the wrong usage of futures.map. Interestingly, older version was working correctly even with this bug. There are many debug loggings that should be deleted.
keceli and others added 16 commits November 15, 2013 10:19
All arguments are now optional.

I was having problems with importing gprof2dot:
```
from external.gprof2dot import gprof2dot
```
works on Pharos with python 2.6
```
from external import gprof2dot
```
works on my Mac with python 2.7. There should be a better way of handling it.
To reduce the database pickle size, thermoEstimator now only loads thermo libraries.
Originally, RMG stops if any two species are identical in the initial species list. Since this can happen frequently when using thermoEstimator with a large list of spcies, I changed it, so that RMG
ignores the duplicate, and continue execution.
Optional positional argument is added to change chunk size.
I am still not sure the best way to do it.
QM calculations fail for linear molecules with an error '''only length-1 arrays can be converted to Python scalars'''.
The problem is that we only need a single rotational constant for a linear rotor, while three of them were being passed.
There are still some issues with positioning of the lone pairs.  For instance
1 N 1 1 {2,S} {3,S}
2 H 0 0 {1,S}
3 H 0 0 {1,S}

Will draw the long pair on the hydrogen rather than the N.  But for now, we can
avoid drawing lone pairs on oxygens and such where it is not needed.
The format syntax where you omit the field numbers
was introduced in Python 2.7.
There is a bug in pickling of the salvation groups, and there is no reason to load them by default.
Scoop now with QMthermo and many updates from Master, from @keceli.
rwest referenced this pull request Aug 8, 2014
A recent run spent quite a lot of time generating thermo data, and this seems
reasonably parallelizable (each species is independent). 
The overheads may still be large, but this is a
start to try experimenting with.
@nickvandewiele
Copy link
Contributor

MOPAC is programmed to use all available processors. Multithreading of RMG-py routines in which a call is made to MOPAC will suffer from CPU overloading when this is not dealt with explicitly.
See:
92093d5#commitcomment-7325472

@nickvandewiele
Copy link
Contributor

In the example file on:
https://github.com/GreenGroup/RMG-Py/blob/scoop/examples/thermoEstimator/scoop/sge.sh

the singlenode keyword is used with the -pe parallel environment flag for the SGE script on Pharos. As far as I understand, this will force the scheduler to only pick a single computer (node) with 48 processors on it, which boils down to multithreading on a single node.

I want to run parallelize this across multiple computers, rather than multiple processors. Has anyone tried to run this across multiple computers?

Scoop provides a way to specify a list of host computers that you want to work on through the --hostfile flag. Since this contrasts the automated assignment of computers by the scheduler for my job on Pharos, I want to avoid that.

From what I can read on the Pharos wiki, the SGE on Pharos provides MPI functionality (e.g. -pe mpich2_fill script) for multi-node computing. Has anyone, maybe @rwest or @keceli , had the chance yet to play around with this in the framework of RMG-Py?

@keceli
Copy link

keceli commented Aug 11, 2014

To run rmg-scoop across nodes I have used:

#$ -pe mpich2_fill 240
hosts=$(cat $PE_HOSTFILE | awk '{printf "%s ", $1}')
echo $hosts $NSLOTS > hostfile
python -m scoop.__main__ -vv --tunnel --prolog $RMGpy/examples/rmg/scoop/prolog.sh  -n $NSLOTS $RMGpy/thermoEstimator.py input.py > std.out

And prolog.sh has only source ~/.bash_profile to pass some env. variables.

@nickvandewiele
Copy link
Contributor

I posted this some time ago in the scoop-user mailing list, but nobody seems to listen. Maybe you have some inspiration that could help me out.

Here's my problem:

I want to create a scheme in which parallelism is done on 2 levels:

  1. scoop runs multiple computers in parallel
  2. the CPUs in each computer will run MOPAC in parallel as well.

This is how I implemented it on Pharos. The example requests 48 CPUs that will be split in 6 workers, thus providing 8 CPUs to each worker.

I started by requesting 48 CPUs to SGE, and specified that each node provides 8 CPUs, which leads to SGE assigning 6 nodes. cat $PE_HOSTFILE | awk '{printf "%s ", $1}' shows that this is done correctly:

node28:8
node63:8
node39:8
node56:8
node47:8
node61:8

Next, I create a hostfile with the names of the hosts and append "1" to only allow 1 worker call per node. The hostfile looks like this:

node28.cluster node63.cluster node39.cluster node56.cluster node47.cluster node61.cluster 1

Next, I tell SCOOP to only deploy 6 worker calls, through the -n option:

python -m scoop -n 6 script.py

What happens next is that SCOOP uses only 1 node, and sends the 6 worker functions to this node, instead of distributing the 6 workers to the 6 available nodes:

[2014-08-12 09:45:27,798] launcher  INFO    Worker distribution:
[2014-08-12 09:45:27,799] launcher  INFO       node28.cluster:  5 + origin

I read that the -n flag uses the round-robin algorithm for handing out worker jobs to CPU, i.e. give each node the same number of worker jobs by iterating over all the nodes (as opposed to giving all the jobs to a single node until it is fully occupied).

As a result, my observation kind of contrast this statement...

Any inspiration for experiments that would help me understand what's going on?

@keceli
Copy link

keceli commented Aug 13, 2014

Maybe, the easiest way to debug is doing it interactively. You can reserve
2 nodes, login into one and try to deploy your jobs and avoid scheduler.
Once it works, probably you may want to try to assigning threads for Mopac
depending on the molecule size to have a better work balance.

By the way, how much speed up you get when you run Mopac on 8 cores?

On Wed, Aug 13, 2014 at 3:43 PM, nickvandewiele [email protected]
wrote:

I posted this some time ago in the scoop-user mailing list, but nobody
seems to listen. Maybe you have some inspiration that could help me out.

Here's my problem:

I want to create a scheme in which parallelism is done on 2 levels:

  1. scoop runs multiple computers in parallel
  2. the CPUs in each computer will run MOPAC in parallel as well.

This is how I implemented it on Pharos. The example requests 48 CPUs that
will be split in 6 workers, thus providing 8 CPUs to each worker.

I started by requesting 48 CPUs to SGE, and specified that each node
provides 8 CPUs, which leads to SGE assigning 6 nodes. cat $PE_HOSTFILE |
awk '{printf "%s ", $1}' shows that this is done correctly:

node28:8
node63:8
node39:8
node56:8
node47:8
node61:8

Next, I create a hostfile with the names of the hosts and append "1" to
only allow 1 worker call per node. The hostfile looks like this:

node28.cluster node63.cluster node39.cluster node56.cluster node47.cluster node61.cluster 1

Next, I tell SCOOP to only deploy 6 worker calls, through the -n option:

python -m scoop -n 6 script.py

What happens next is that SCOOP uses only 1 node, and sends the 6 worker
functions to this node, instead of distributing the 6 workers to the 6
available nodes:

[2014-08-12 09:45:27,798] launcher INFO Worker distribution:
[2014-08-12 09:45:27,799] launcher INFO node28.cluster: 5 + origin

I read that the -n flag uses the round-robin algorithm for handing out
worker jobs to CPU, i.e. give each node the same number of worker jobs by
iterating over all the nodes (as opposed to giving all the jobs to a single
node until it is fully occupied).

As a result, my observation kind of contrast this statement...

Any inspiration for experiments that would help me understand what's going
on?


Reply to this email directly or view it on GitHub
#111 (comment).

@nickvandewiele
Copy link
Contributor

OK, I figured out why the workers were not equally distributed among the reserved nodes.

a hostfile in which the hostnames of the nodes are on one line is not interpreted by SCOOP the way I think it was.

So this is wrong:

node28.cluster node63.cluster node39.cluster node56.cluster node47.cluster node61.cluster 1

Instead, each hostname should be on a separate line:

node28.cluster 1
node63.cluster 1
node39.cluster 1
node56.cluster 1
node47.cluster 1
node61.cluster 1

@nickvandewiele
Copy link
Contributor

The second thing I did wrong is not pointing to the hostfile that contains a list of hostnames.
You need to explicitly provide the filename (in this case the filename is hostfile):

python -m scoop --hostfile hostfile -n 6 script.py

In addition, the flag -n 6 is optional, and is merely when the number of workers you want to launch does not correspond to the sum of the cores on each node in the hostfile.

@nickvandewiele
Copy link
Contributor

As parallellizing MOPAC jobs seem to scale pretty well over processors and over nodes, I am focusing on Gaussian jobs now.

After some days of testing, I figured out that g03 is a bit different:
a g03 input file needs to have a flag %nproc that refers to the number of processors that can be used. E.g.:

%chk=checkpoint
%mem=6MW
%nproc=4
# pm3 opt=(verytight,gdiis) freq IOP(2/16=3)

 InChI=1/C10H16O2/c1-7(12-11-2)9-5-3-8-4-6-10(8)9/h6-9H,3-5H2,1-2H3

0  1
C          -1.27790        -0.86310         0.07550
...

If the flag %nproc is not used, it uses a default of 1 processor only. Hence, contrary to MOPAC, the variable $OMP_NUM_THREADS is not used to set the number of processors for a g03 job. Moreover, contrary to what I find on the internet, $OMP_NUM_THREADS cannot be used as an alternative to set the number of CPUs on Pharos. This is quite annoying since you need to modify each and every g03 input file, if you want to change the number of CPUs used for the calculation.

@nickvandewiele
Copy link
Contributor

Also: when I increase the value of OMP_NUM_THREADS to a value above 4 on harpertown, i.e. nodes 01-64 of Pharos (where g03 is installed), the scheduler complaints by saying I shouldn't do that:

OMP_NUM_THREADS or NCPUS value (5) is invalid

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants