Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestion - Redesign RayEnvWorker for Improved Performance #1172

Open
4 tasks done
destin-v opened this issue Jul 10, 2024 · 14 comments
Open
4 tasks done

Suggestion - Redesign RayEnvWorker for Improved Performance #1172

destin-v opened this issue Jul 10, 2024 · 14 comments
Assignees
Labels
performance issues Slow execution or poor-quality results

Comments

@destin-v
Copy link

  • I have marked all applicable categories:
        + [ ] exception-raising bug
        + [ ] RL algorithm bug
        + [ ] documentation request (i.e. "X is missing from the documentation.")
        + [x] new feature request
        + [x] design request (i.e. "X should be changed to Y.")
  • I have visited the source website
  • I have searched through the issue tracker for duplicates
  • I have mentioned version numbers, operating system and environment, where applicable:
     

Description

The issue with the RayEnvWorker is that it is almost guaranteed to be slower than the other vectorized environments because of how it is designed.  Tianshou's RayEnvWorker (shown below) creates an environment on each Ray worker and calls send() and recv() on the environments to step or reset.
 
When the Ray workers receive a step or reset commands it generates an object reference which is sent back to the head node.  At this point RayEnvWorker calls ray.get to dereference the object.  This process involves serialization every time you send an object and deserialization every time you dereference an object.  Hence, the communication cost is high.  But the actual processing done on the worker is trivial (recall we are just issuing a step or reset command).
 
The bottom line is that RayEnvWorker should be rewritten to heavily utilize the CPU and minimize communication costs. 
 

Options

  1. One way to do this is by providing each RayEnvWorker its own copy of the neural network so it can take actions without communicating with the head node.  Once you have enough samples, use ray.get to pull the data over the network.
     
  2. Another alternative is to implement a type of double buffering approach where each worker hosts vectorized environments and takes a batch of actions as inputs to its step or reset functions.  This would make more use of the CPUs and reduce the amount of communication calls.

Note

Of the two approaches, the second one is preferred because you only need CPUs to build vectorized environments. The first approach requires that you have both CPUs and GPUs available to the RayEnvWorker which is more costly from a resource standpoint.
 

RayEnvWorker

 

class RayEnvWorker(EnvWorker):
    """Ray worker used in RayVectorEnv."""
 
    def __init__(
        self,
        env_fn: Callable[[], ENV_TYPE],
    ) -> None# TODO: is ENV_TYPE actually correct?
        self.env = ray.remote(_SetAttrWrapper).options(num_cpus=0).remote(env_fn())  # type: ignore
        super().__init__(env_fn)
 
    def get_env_attr(self, key: str) -> Any:
        return ray.get(self.env.get_env_attr.remote(key))
 
    def set_env_attr(self, key: str, value: Any) -> None:
        ray.get(self.env.set_env_attr.remote(key, value))
 
    def reset(self, **kwargs: Any) -> Any:
        if "seed" in kwargs:
            super().seed(kwargs["seed"])
        return ray.get(self.env.reset.remote(**kwargs))
 
    @staticmethod
    def wait# type: ignore
        workers: list["RayEnvWorker"],
        wait_num: int,
        timeout: float | None = None,
    ) -> list["RayEnvWorker"]:
        results = [x.result for x in workers]
        ready_results, _ = ray.wait(results, num_returns=wait_num, timeout=timeout)
        return [workers[results.index(result)] for result in ready_results]
 
    def send(self, action: np.ndarray | None, **kwargs: Any) -> None:
        # self.result is actually a handle
        if action is None:
            self.result = self.env.reset.remote(**kwargs)
        else:
            self.result = self.env.step.remote(action)
 
    def recv(self) -> gym_new_venv_step_type:
        return ray.get(self.result# type: ignore
 
    def seed(self, seed: int | None = None) -> list[int] | None:
        super().seed(seed)
        try:
            return ray.get(self.env.seed.remote(seed))
        except (AttributeError, NotImplementedError):
            self.env.reset.remote(seed=seed)
            return None
 
    def render(self, **kwargs: Any) -> Any:
        return ray.get(self.env.render.remote(**kwargs))
 
    def close_env(self) -> None:
        ray.get(self.env.close.remote())
@MischaPanch
Copy link
Collaborator

MischaPanch commented Jul 18, 2024

Thank you @destin-v ! I'll look into it on the weekend and will run some performance tests

@MischaPanch MischaPanch added the performance issues Slow execution or poor-quality results label Jul 18, 2024
@MischaPanch MischaPanch self-assigned this Jul 18, 2024
@destin-v
Copy link
Author

@MischaPanch , in order to get optimal results from using Ray I recommend you provide a vectorized environment to each of the Ray remote workers. When running the vectorized step() function on the vectorized environments it should take approximately ~1sec. The idea is that you want to saturate the CPUs with work for at least 1 sec.

Then gather all of the Ray reference objects (sometimes called futures) into a list. Then perform a ray.get() on your list of Ray references objects. This will deserialize your Ray reference objects into their true values.

If done properly, you will see a speedup because the number of serialization/deserializations will decrease and your utilization of the CPU will increase.

Let me know if you need any help.

@MischaPanch
Copy link
Collaborator

MischaPanch commented Jul 28, 2024

Ok, I looked into this in more detail. You are right, the design is suboptimal. There is however the overall question whether the ray worker is important for tianshou. The only reason I would see for using it is if one wants to start the training on a multi-node cluster. This is currently not the main focus of Tianshou, there are many more important problems to fix before that. Several practical algorithms like SAC cannot be arbitrarily parallelized anyway. There's also rllib for a tight integration with ray (I had extremely bad experiences with it, but still).

The solution of this issue would require quite some work. Is this feature important for you? Are you using multi-node training, and if not, is the current subproc vectorized env not sufficient for you?

@MischaPanch
Copy link
Collaborator

Related to #1133

@destin-v
Copy link
Author

I am interested in Tianshou's RayVecEnv because I have access to a multi-node computing infrastructure. I would be willing to contribute a new design for RayVecEnv. Due to multiple priorities I may not get to it in the near term. But looking at the code for RayVecEnv I think it will require a complete rewrite to get a double buffering solution.

RLLib is heavily abstracted and inherits from many classes that are not needed for RL (i.e. Tune). This has led to coupling issues where bugs have propagated across RLLib making it difficult to fix or debug. Even though RLLib provides multi-node support, it has its own unique problems.

@MischaPanch
Copy link
Collaborator

@destin-v fully agree, in fact rllib's large number of problems is the main reason for me for investing significant effort into tianshou. I feel that tianshou might strike the right balance between useful abstractions while still not being overwhelming for researchers, and not being breaking and bug-prone like rllib.

I also agree that RayVecEnv needs to be redesigned from the ground up, which is precisely why I was reluctant to do it myself now. If you want to collaborate on this, I'm happy to discuss, review, and participate to some extent. Let me know when you have time and let's come back to this issue then.

@MischaPanch MischaPanch assigned destin-v and unassigned MischaPanch Jul 29, 2024
@destin-v
Copy link
Author

destin-v commented Aug 2, 2024

Description

A set of benchmarks for RayVecEnv showing the scaling efficiency across distributed nodes.

Experiment Setup

  • Each environment consists of a step() function that sleeps for x seconds.
  • Each computing core is given a copy of the environment.
  • Each computing node has 48 CPU cores.
  • Each computing node is connected via Ray.
  • No stochastic gradient descent is being done. Only step() calls in RayVecEnv are counted.

Hypothesis

  • Environments with fast step() functions will be sub-optimal because they will make many communication calls.
  • Environments with long step() functions will be more optimal because they will make less communication calls.

Results

Note

  • SPS: Steps per Second
  • Number of Nodes designate individual computing machines. The number of cores on each node is 48. Hence the total Number of Cores is always Number of Nodes multiplied by 48.
  • Ideal SPS is the Number of Cores divided by Step Duration. This assumes that communication costs are zero using Ray.
  • Efficiency is equal to measured SPS divided by Ideal SPS.
  • The Efficiency column is the single best metric to evaluate whether adding distributed computing will benefit your training process.
Number of Nodes Number of Cores Step Duration (sec) SPS SPS/Core Ideal SPS Efficiency
2 96 1 91.97 0.96 96 96%
2 96 0.1 680.66 7.09 960 71%
2 96 0.01 3,130.63 32.61 9,600 33%
2 96 0.001 3,941.47 41.06 96,000 4%
4 192 1 181.92 0.95 192 95%
4 192 0.1 1,285.94 6.70 1,920 67%
4 192 0.01 3,962.63 20.64 19,200 21%
4 192 0.001 4,089.34 21.30 192,000 2%
8 384 1 351.10 0.91 384 91%
8 384 0.1 2,153.68 5.61 3,840 56%
8 384 0.01 4,348.30 11.32 38,400 11%
8 384 0.001 4,226.37 11.01 384,000 1%
16 768 1 660.87 0.86 768 86%
16 768 0.1 3,118.94 4.06 7,680 41%
16 768 0.01 4,210.96 5.48 76,800 5%
16 768 0.001 4,113.67 5.36 768,000 1%
32 1,536 1 1,191.83 0.78 1,536 78%
32 1,536 0.1 4,040.86 2.63 15,360 26%
32 1,536 0.01 4,303.75 2.80 153,600 3%
32 1,536 0.001 4,267.77 2.79 1,536,000 0%

Discussion

These results show that environments with slow steps (>1sec) will greatly benefit from RayVecEnv in its current state. But for environments that step faster (<0.01sec), the communication costs of Ray outweigh the benefits.

Note the key findings from the experimental results:

  • When Step Duration==0.001sec, Efficiency was always in the low single digits.
  • When Step Duration==1sec, Efficiency was always in the high double digits.

Since Efficiency is a measure of how close your process is to achieving the ideal speedup, it provides the best single metric for evaluating the performance of distributed computing.

Note

Environments like Cartpole step very fast (<0.0001sec) meaning there is likely no benefit to retrieving a single step of Cartpole from a distributed core under the current RayVecEnv implementation. But if 1M steps of Cartpole are aggregated on the remote core and then sent back to the head node over Ray, the communication efficiency would go way up.

@MischaPanch
Copy link
Collaborator

@destin-v thank you for this very thorough and clear evaluation! I will include these results in the docs for the next release, if you don't mind.

It is unfortunate that the overhead is so large. Envs with a single step per second are kind of doomed - even with simple envs agents need roughly 1e6 steps to converge to anything. Highly parallelizable algos will need even more. So having such a slow env in the first place is pretty much a no-go. Maybe some very slow envs would be 0.1 seconds per step, but at least for research purposes it's way too slow.

Your results just confirm to me that optimizing for multi-node scenarios should not be a focus of Tianshou for now, do you agree? I'm not saying it's generally irrelevant, just that in most situations it seems like one node is a better way to go. Clouds offer VMs with 96 cores, with AMD CPUs one can get even more.

@MischaPanch
Copy link
Collaborator

MischaPanch commented Aug 2, 2024

Or do you think that with the re-design allowing aggregation we would get significant benefits from multi-node? Most off-policy algos don't aggregate all too much before performing updates in the main process, but some high-throughput algorithms do (impala, apex).

We could consider implementing them together with a redesigned ray worker. In this scenario, multi-node might become useful, and better supporting it would make some sense.

EDIT: after looking at the whole conversation again I am even more convinced that it's not purely a worker-redesign issue, but rather tightly coupled to algorithms and their parametrizations. Not all parametrizations of all algos would be able to make use of either of the options that you presented. In fact, option one is very close to the impala algo, if I'm not mistaken

@destin-v
Copy link
Author

destin-v commented Aug 2, 2024

I agree that any redesign of RayVecEnv would only benefit a few of the algorithms. Algorithms like Impala can parallelize data collection even when the update process is happening because it uses Vtrace. There's also asynchronous PPO which can take advantage parallel data collection and updating.

I think scaling to multiple nodes gives users the ability to push the boundaries on state-of-the art performance. At the moment Tianshou is designed for vertical scaling (scaling resources on a single node). But horizontal scaling (scaling resources across mulitple nodes) will likely provide the best opportunities for improving performance.

I am still thinking through what is the best way to achieve a good solution for RayVecEnv.

Note

One upgrade that may provide a speed boost to all algorithms is double-buffered sampling. This simply has the worker perform a batch of steps on the environment while waiting for the next inference.

@MischaPanch
Copy link
Collaborator

I generally agree. We can start looking into this and implementing some multi-node things. However, it's not going to be as simple as changing the workers. The Collector right now is not able to deal with receiving a batch of steps. It will likely be necessary to separate out an EpisodeCollector from the current logic which can collect both steps and episodes in order to deal.with batched workers, since special care needs to be taken when an episode is finished , before attempting to make a suitable Collector. Moreover, we currently don't even have a proper Collector interface...

I propose the following:

  1. You implement the double-buffered sampling. It's generally useful, safe, and should also help with multi-node things
  2. I improve the collector design meanwhile
  3. We make a new package experimental.multi_node where new things can e rested out, like BatchedStepWorker and Impala and apex

Independently of that, the tianshou core team is working on including an automated benchmarking for algos, which will help evaluate the results of step 3. Once those are firmly established, we can move it out of experimental.

Wdyt @destin-v ?

@destin-v
Copy link
Author

destin-v commented Aug 5, 2024

Sounds great, I'll take a look at the double-buffered sampling and send a pull request when it is ready.

@destin-v
Copy link
Author

I have been busy with papers and a conference but I have not forgotten about this topic. Once I get through my presentations I will have more time to devote to this. My initial experiments have confirmed that buffering affects the Steps per Second (SPS) positively and it improves performance.

@destin-v
Copy link
Author

destin-v commented Oct 8, 2024

I checked with my university employer and it appears that there restrictions that prevent me from directly contributing to an open-source repository. However, I am able to publish papers and open-source code as a researcher for my university. I am working on a white paper describing ways to improve throughput in multi-node settings. When the paper is ready for release, I'll share more details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance issues Slow execution or poor-quality results
Projects
None yet
Development

No branches or pull requests

2 participants