-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RayDistributor for using Ray to distribute the calculations in tsfresh #1030
base: main
Are you sure you want to change the base?
Conversation
@nils-braun It would be great to have some suggestions to avoid changing pre-commit-config version and to the PR itself |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @TheaperDeng !
Really nice addition. I had a few comments but I am fine in general with the changes.
Two additional questions:
- did you had the chance to run some speed tests? Is it faster/slower than other options?
- I know that ray also has a feature for datasets, which would allow for data locality. Now, you need to move all data from the main node to the worker nodes. Did you had a look into this as well? Is this worth exploring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TheaperDeng - I have update the pre-commit file on the main branch to use the newest versions. Can you please merge in the newest changes and resolve the merge conflicts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The structure of the test requirements changed on main to have a more "modern" or typical repository structure. Those changes will go into the setup.cfg file once you merge in newest main.
@@ -8,4 +8,6 @@ seaborn>=0.7.1 | |||
ipython>=5.3.0 | |||
notebook>=4.4.1 | |||
pandas-datareader>=0.5.0 | |||
ray>=2.5.0 | |||
protobuf<=3.20.3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that needed? Could you maybe add a comment because I do not see protobuf being used directly
|
||
ray.init(address=address, **rayinit_config) | ||
self.n_workers = n_workers | ||
self.cpu_per_worker = max( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not an expert in ray, but this assumes that you have a heterogeneous cluster where each machine has the same number of CPUs, or? Why is this setting needed at all? Does ray not use all CPUs of a machine by default (again, not an expert in ray!)
import ray | ||
|
||
ray.init(address=address, **rayinit_config) | ||
self.n_workers = n_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of workers is defined by the ray cluster size when starting the cluster and can not be controlled by the user st this point, or? So the user needs to make sure to always pass in the correct number of workers according to the cluster. Can this also be retrieved from ray? We do something similar for dask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, I would recommend the following: as the number of worker property currently is not changing the cluster deployment, I would prefer if it is filled automatically. If this is not possible, we should remove the default value of 1 and maybe rename the parameter to make sure users know they need to set it to the number of cluster workers.
Now, it might look to users as if they can control the number of workers in ther cluster using this variable (which I think they can not)
download_robot_execution_failures() | ||
df, y = load_robot_execution_failures() | ||
|
||
Distributor = RayDistributor(address="ray://123.45.67.89:10001") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the number of workers is not retrieved automatically , don't you need to pass it here because the default is 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can you use a lowercase distributor
? The object is not a class but an instance.
download_robot_execution_failures() | ||
df, y = load_robot_execution_failures() | ||
|
||
Distributor = RayDistributor(n_workers=3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understood, this will not automatically start a ray cluster with 3 workers, or?
Ray is an optional dependency and users who needs to use Ray to distribute the calculations should install | ||
it first by `pip install ray`. | ||
|
||
Ray is a easy-to-use developing framework for distributed computing workload. Users could use it on single node or scale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a few words on why a user would choose the ray distributor and not use any other distributors? I am totally fine with having it in the code-base, I just want to make sure users are not confused on what to choose. What are benefits compared to e.g. dask?
What understood, using ray allows to parallelize the computation but does not help for out-of-memory data. And it is of course useful if you already run a fay cluster
class LocalRayDistributorTestCase(DataTestCase): | ||
def test_ray_cluster_extraction_one_worker(self): | ||
|
||
Distributor = RayDistributor(n_workers=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same nit as before, can you use lowercase?
|
||
def test_ray_cluster_extraction_two_worker(self): | ||
|
||
Distributor = RayDistributor(n_workers=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again the question: this is not creating a ray cluster with two workers, or? It is technically the same cluster as without this option - you just change the chunking.
Not sure if this is expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to actually start a 2-worker local ray cluster?
Ray is getting popular for building distributed applications and easy to fit into tsfresh by a
RayDistributor
.Distributed tsfresh on Ray
This repo involves a new
RayDistributor
for tsfresh to use ray to distribute the calculations.RayDistributor
is a subclass ofIterableDistributorBaseClass
in tsfresh which follows the developing instruction in https://tsfresh.readthedocs.io/en/latest/text/tsfresh_on_a_cluster.html.Quick Start
Use
RayDistributor
the same way asMultiprocessingDistributor
,ClusterDaskDistributor
orLocalDaskDistributor
.Code change summary
RayDistributor
definition in tsfresh.utilities.distributionRayDistributor
document in docs/text/tsfresh_on_a_cluster.rst