Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayDistributor for using Ray to distribute the calculations in tsfresh #1030

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

TheaperDeng
Copy link

Ray is getting popular for building distributed applications and easy to fit into tsfresh by a RayDistributor.

Distributed tsfresh on Ray

This repo involves a new RayDistributor for tsfresh to use ray to distribute the calculations.

RayDistributor is a subclass of IterableDistributorBaseClass in tsfresh which follows the developing instruction in https://tsfresh.readthedocs.io/en/latest/text/tsfresh_on_a_cluster.html.

Quick Start

Use RayDistributor the same way as MultiprocessingDistributor, ClusterDaskDistributor or LocalDaskDistributor.

from tsfresh.utilities.distribution import RayDistributor

distributor = RayDistributor(n_workers=4)
# ...
extracted_features = extract_features(..., distributor=distributor)
# ...

Code change summary

  • add RayDistributor definition in tsfresh.utilities.distribution
  • add RayDistributor document in docs/text/tsfresh_on_a_cluster.rst
  • Update pre-commit-config to enable future development
  • Update test-requirements.txt for UT
  • munually test the UT and document generation locally

@TheaperDeng
Copy link
Author

@nils-braun It would be great to have some suggestions to avoid changing pre-commit-config version and to the PR itself

@nils-braun nils-braun self-requested a review June 28, 2023 07:29
Copy link
Collaborator

@nils-braun nils-braun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TheaperDeng !
Really nice addition. I had a few comments but I am fine in general with the changes.

Two additional questions:

  • did you had the chance to run some speed tests? Is it faster/slower than other options?
  • I know that ray also has a feature for datasets, which would allow for data locality. Now, you need to move all data from the main node to the worker nodes. Did you had a look into this as well? Is this worth exploring?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheaperDeng - I have update the pre-commit file on the main branch to use the newest versions. Can you please merge in the newest changes and resolve the merge conflicts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure of the test requirements changed on main to have a more "modern" or typical repository structure. Those changes will go into the setup.cfg file once you merge in newest main.

@@ -8,4 +8,6 @@ seaborn>=0.7.1
ipython>=5.3.0
notebook>=4.4.1
pandas-datareader>=0.5.0
ray>=2.5.0
protobuf<=3.20.3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that needed? Could you maybe add a comment because I do not see protobuf being used directly


ray.init(address=address, **rayinit_config)
self.n_workers = n_workers
self.cpu_per_worker = max(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not an expert in ray, but this assumes that you have a heterogeneous cluster where each machine has the same number of CPUs, or? Why is this setting needed at all? Does ray not use all CPUs of a machine by default (again, not an expert in ray!)

import ray

ray.init(address=address, **rayinit_config)
self.n_workers = n_workers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of workers is defined by the ray cluster size when starting the cluster and can not be controlled by the user st this point, or? So the user needs to make sure to always pass in the correct number of workers according to the cluster. Can this also be retrieved from ray? We do something similar for dask.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I would recommend the following: as the number of worker property currently is not changing the cluster deployment, I would prefer if it is filled automatically. If this is not possible, we should remove the default value of 1 and maybe rename the parameter to make sure users know they need to set it to the number of cluster workers.
Now, it might look to users as if they can control the number of workers in ther cluster using this variable (which I think they can not)

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = RayDistributor(address="ray://123.45.67.89:10001")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of workers is not retrieved automatically , don't you need to pass it here because the default is 1?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you use a lowercase distributor? The object is not a class but an instance.

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = RayDistributor(n_workers=3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understood, this will not automatically start a ray cluster with 3 workers, or?

Ray is an optional dependency and users who needs to use Ray to distribute the calculations should install
it first by `pip install ray`.

Ray is a easy-to-use developing framework for distributed computing workload. Users could use it on single node or scale
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a few words on why a user would choose the ray distributor and not use any other distributors? I am totally fine with having it in the code-base, I just want to make sure users are not confused on what to choose. What are benefits compared to e.g. dask?
What understood, using ray allows to parallelize the computation but does not help for out-of-memory data. And it is of course useful if you already run a fay cluster

class LocalRayDistributorTestCase(DataTestCase):
def test_ray_cluster_extraction_one_worker(self):

Distributor = RayDistributor(n_workers=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same nit as before, can you use lowercase?


def test_ray_cluster_extraction_two_worker(self):

Distributor = RayDistributor(n_workers=2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again the question: this is not creating a ray cluster with two workers, or? It is technically the same cluster as without this option - you just change the chunking.
Not sure if this is expected.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to actually start a 2-worker local ray cluster?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants