Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting Java gateway process exited before sending its port number during init_spark() #383

Open
swapkh91 opened this issue Oct 6, 2023 · 11 comments

Comments

@swapkh91
Copy link

swapkh91 commented Oct 6, 2023

I'm trying a test using raydp. I have setup Ray Cluster on GKE using the below dockerfile

FROM rayproject/ray:2.7.0-py310-cpu

ARG HTTP_PROXY
ARG HTTPS_PROXY

# set http_proxy & https_proxy
ENV http_proxy=${HTTP_PROXY}
ENV https_proxy=${HTTPS_PROXY}

RUN pip install -U pandas 'dask[complete']

# install java, create workdir and install raydp
# You could change the raydp to raydp-nightly if you want to try the master branch code
RUN sudo http_proxy=${HTTP_PROXY} https_proxy=${HTTPS_PROXY} apt-get update -y \
    && sudo apt-get install -y gcc \
#     && sudo http_proxy=${HTTP_PROXY} https_proxy=${HTTPS_PROXY} apt-get install -y openjdk-8-jdk \
    && sudo http_proxy=${HTTP_PROXY} https_proxy=${HTTPS_PROXY} apt-get install -y openjdk-11-jdk \
    && sudo mkdir /raydp \
    && sudo chown -R ray /raydp \
    && $HOME/anaconda3/bin/pip --no-cache-dir install raydp

WORKDIR /raydp

# unset http_proxy & https_proxy
ENV http_proxy=
ENV https_proxy=

I have port forwarded the gke pod and I'm able to connect to it using ray.init(address="ray://localhost:10001")

When i try to connect raydp through

spark = raydp.init_spark(app_name='RayDP Example2',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

I get the following error
Exception: Java gateway process exited before sending its port number

Full stacktrace

(RayDPSparkMaster pid=3236) Error occurred during initialization of VM
(RayDPSparkMaster pid=3236) agent library failed to init: instrument
(RayDPSparkMaster pid=3236) Error opening zip file or JAR manifest missing : /home/swapnesh/.local/lib/python3.10/site-packages/raydp/jars/raydp-agent-1.6.0.jar

Python version:	3.10.8
Ray version:	2.7.0
Dashboard:	http://10.0.0.26:8265
(RayDPSparkMaster pid=3236) Error occurred during initialization of VM
(RayDPSparkMaster pid=3236) agent library failed to init: instrument
(RayDPSparkMaster pid=3236) Error opening zip file or JAR manifest missing : /home/swapnesh/.local/lib/python3.10/site-packages/raydp/jars/raydp-agent-1.6.0.jar
---------------------------------------------------------------------------
RayTaskError                              Traceback (most recent call last)
/home/swapnesh/ray_example/raydp_test.ipynb Cell 6 line 1
----> 1 spark = raydp.init_spark(app_name='RayDP Example2',
      2                          num_executors=2,
      3                          executor_cores=2,
      4                          executor_memory='4GB')

File ~/.local/lib/python3.10/site-packages/raydp/context.py:215, in init_spark(app_name, num_executors, executor_cores, executor_memory, enable_hive, fault_tolerant_mode, placement_group_strategy, placement_group, placement_group_bundle_indexes, configs)
    207 try:
    208     _global_spark_context = _SparkContext(
    209         app_name, num_executors, executor_cores, executor_memory, enable_hive,
    210         fault_tolerant_mode,
   (...)
    213         placement_group_bundle_indexes,
    214         configs)
--> 215     return _global_spark_context.get_or_create_session()
    216 except:
    217     if _global_spark_context is not None:

File ~/.local/lib/python3.10/site-packages/raydp/context.py:121, in _SparkContext.get_or_create_session(self)
    119     return self._spark_session
    120 self._prepare_placement_group()
--> 121 spark_cluster = self._get_or_create_spark_cluster()
    122 self._spark_session = spark_cluster.get_spark_session()
    123 if self._fault_tolerant_mode:

File ~/.local/lib/python3.10/site-packages/raydp/context.py:86, in _SparkContext._get_or_create_spark_cluster(self)
     84 if self._spark_cluster is not None:
     85     return self._spark_cluster
---> 86 self._spark_cluster = SparkCluster(self._app_name,
     87                                    self._num_executors,
     88                                    self._executor_cores,
     89                                    self._executor_memory,
     90                                    self._enable_hive,
     91                                    self._configs)
     92 return self._spark_cluster

File ~/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster.py:52, in SparkCluster.__init__(self, app_name, num_executors, executor_cores, executor_memory, enable_hive, configs)
     50 self._configs = configs
     51 self._prepare_spark_configs()
---> 52 self._set_up_master(resources=self._get_master_resources(self._configs), kwargs=None)
     53 self._spark_session: SparkSession = None

File ~/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster.py:72, in SparkCluster._set_up_master(self, resources, kwargs)
     68 else:
     69     self._spark_master_handle = RayDPSparkMaster.options(name=spark_master_name) \
     70         .remote(self._configs)
---> 72 ray.get(self._spark_master_handle.start_up.remote())

File ~/.local/lib/python3.10/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/.local/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:102, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
     98 if client_mode_should_convert():
     99     # Legacy code
    100     # we only convert init function if RAY_CLIENT_MODE=1
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
--> 102         return getattr(ray, func.__name__)(*args, **kwargs)
    103 return func(*args, **kwargs)

File ~/.local/lib/python3.10/site-packages/ray/util/client/api.py:42, in _ClientAPI.get(self, vals, timeout)
     35 def get(self, vals, *, timeout=None):
     36     """get is the hook stub passed on to replace `ray.get`
     37 
     38     Args:
     39         vals: [Client]ObjectRef or list of these refs to retrieve.
     40         timeout: Optional timeout in milliseconds
     41     """
---> 42     return self.worker.get(vals, timeout=timeout)

File ~/.local/lib/python3.10/site-packages/ray/util/client/worker.py:434, in Worker.get(self, vals, timeout)
    432     op_timeout = max_blocking_operation_time
    433 try:
--> 434     res = self._get(to_get, op_timeout)
    435     break
    436 except GetTimeoutError:

File ~/.local/lib/python3.10/site-packages/ray/util/client/worker.py:462, in Worker._get(self, ref, timeout)
    460         logger.exception("Failed to deserialize {}".format(chunk.error))
    461         raise
--> 462     raise err
    463 if chunk.total_size > OBJECT_TRANSFER_WARNING_SIZE and log_once(
    464     "client_object_transfer_size_warning"
    465 ):
    466     size_gb = chunk.total_size / 2**30

RayTaskError: ray::RayDPSparkMaster.start_up() (pid=3236, ip=10.0.0.26, actor_id=df8587726cf63ef0ef08dc091a000000, repr=<raydp.spark.ray_cluster_master.RayDPSparkMaster object at 0x7829cbe16b00>)
  File "/home/swapnesh/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster_master.py", line 68, in start_up
  File "/home/swapnesh/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster_master.py", line 158, in _launch_gateway
Exception: Java gateway process exited before sending its port number

Libraries on my laptop:

ray version - 2.7.0
raydp version - 1.6.0
java version - openjdk version "11.0.20.1" 2023-08-24
@kira-lin
Copy link
Collaborator

Hi @swapkh91 ,
Sorry for the late reply. Ray 2.7.0 is released recently and might not be compatible with RayDP 1.6.0. Can you try Ray 2.6?

@swapkh91
Copy link
Author

swapkh91 commented Oct 17, 2023

Hi @swapkh91 , Sorry for the late reply. Ray 2.7.0 is released recently and might not be compatible with RayDP 1.6.0. Can you try Ray 2.6?

@kira-lin thanks, I'll try and get back. Any limitation on java version?

@kira-lin
Copy link
Collaborator

Sorry for inconvenience. We only tested java 8. Java 11 should be fine though.

@swapkh91
Copy link
Author

@kira-lin I tested it with ray 2.6.2, getting same error. I'll explain how I'm trying to connect, maybe some issue in the process

the ray cluster is on GKE
I have port forwarded it on my laptop through
kubectl port-forward --address 0.0.0.0 service/raycluster-autoscaler-head-svc 10001:10001

I then connect using

ray.init(address="ray://localhost:10001")
spark = raydp.init_spark(app_name='RayDP Example2',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

Now this init_spark comamnd gives the above error

I checked the logs through dashboard
image

:job_id:03000000
:actor_name:RayDPSparkMaster
Error opening zip file or JAR manifest missing : /home/swapnesh/.local/lib/python3.10/site-packages/raydp/jars/raydp-agent-1.6.0.jar

Why is it showing the jar file path of my laptop? It is present there though, I checked

@kira-lin
Copy link
Collaborator

Why is it showing the jar file path of my laptop? It is present there though, I checked

Oops, this seems a bug. We'll try to fix this. For now, you can wrap this init_spark and things you want to do with spark in an remote actor, that should be fine. Thanks for identifying this bug.

@swapkh91
Copy link
Author

Why is it showing the jar file path of my laptop? It is present there though, I checked

Oops, this seems a bug. We'll try to fix this. For now, you can wrap this init_spark and things you want to do with spark in an remote actor, that should be fine. Thanks for identifying this bug.

@kira-lin got it, I'll try that. Also, I noticed that raydp has dependency ray >= 2.1.0 as here. So this installs ray 2.7.1 when I do pip install raydp
I then have to manually do pip install --force-reinstall ray==2.6.2 to downgrade

@psr-ai
Copy link

psr-ai commented Nov 28, 2023

He @swapkh91 I am also getting the same error. Did you find the solution for this?

@kira-lin
Copy link
Collaborator

hi @raiprabh ,

For now, you can wrap this init_spark and things you want to do with spark in an remote actor, that should be fine. Thanks for identifying this bug.

You can try this solution. We don't have enough bandwidth to work on this project now, so you are welcome to submit a PR to fix this if you have a solution @swapkh91 . We just need to use the path of the remote machines.

@Mayurji
Copy link

Mayurji commented Apr 17, 2024

@kira-lin, Is there any update on this issue?

@Blarc
Copy link

Blarc commented Jun 24, 2024

I also get this error when running the following code:

if __name__ == "__main__":
    import ray
    import raydp

    ray.init(
        address="ray://localhost:10001"
    )

    spark = ray.remote(
        raydp.init_spark("NYCTAXI data processing",
                         num_executors=2,
                         executor_cores=1,
                         executor_memory="500M",
                         configs={"spark.shuffle.service.enabled": "true"})
    )

    data = ray.remote(
        spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(NYC_TRAIN_CSV)
    )

Seems that wrapping the functions into ray.remote doesn't help?

@Blarc
Copy link

Blarc commented Jun 28, 2024

The following worked for me:

import time

import ray
import raydp
import pandas as pd


@ray.remote
class PySparkDriver:
    def __init__(self):
        self.spark = raydp.init_spark("RayDP Example",
                                      num_executors=2,
                                      executor_cores=1,
                                      executor_memory="1GB")

    def foo(self):
        return self.spark.range(1000).repartition(10).count()


if __name__ == "__main__":
    ray.init(
        address="ray://localhost:10001"
    )

    driver = PySparkDriver.remote()
    print(ray.get(driver.foo.remote()))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants