Skip to content

Commit

Permalink
add grpc envs (#1305)
Browse files Browse the repository at this point in the history
  • Loading branch information
BalaBalaYi authored Oct 24, 2024
1 parent 4e45ae8 commit 829b519
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
18 changes: 13 additions & 5 deletions dlrover/python/elastic_agent/master_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
def retry_grpc_request(func):
def wrapper(self, *args, **kwargs):
retry = kwargs.get("retry", 10)
execption = None
exception = None
for i in range(retry):
try:
return func(self, *args, **kwargs)
Expand All @@ -39,15 +39,22 @@ def wrapper(self, *args, **kwargs):
logger.warning(
f"Retry {i} to {class_name}.{func_name} with failure",
)
execption = e
exception = e
time.sleep(5)
if execption:
logger.error(execption)
raise execption
if exception:
logger.error(exception)
raise exception

return wrapper


def init_grpc_env():
# At the cost of increased performance overhead, these provide greater
# stability in concurrent scenarios.
env_utils.set_env("GRPC_ENABLE_FORK_SUPPORT", "true")
env_utils.set_env("GRPC_POLL_STRATEGY", "poll")


class MasterClient(Singleton):
"""MasterClient provides some APIs connect with the master
service via gRPC call.
Expand Down Expand Up @@ -75,6 +82,7 @@ def __init__(self, master_addr, node_id, node_type, timeout=5):
f"Build master client with master_addr: {master_addr}, "
f"node_id: {node_id}, node_type: {node_type}."
)
init_grpc_env()
self._timeout = timeout
self._master_addr = master_addr
self._channel = grpc.build_channel(master_addr)
Expand Down
5 changes: 4 additions & 1 deletion dlrover/python/tests/test_master_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time
import unittest

from dlrover.python.common import grpc
from dlrover.python.common import env_utils, grpc
from dlrover.python.common.constants import (
NodeStatus,
NodeType,
Expand All @@ -35,6 +35,9 @@ def tearDown(self):
self._master.stop()

def test_open_channel(self):
self.assertEqual(env_utils.get_env("GRPC_ENABLE_FORK_SUPPORT"), "true")
self.assertEqual(env_utils.get_env("GRPC_POLL_STRATEGY"), "poll")
self.assertEqual(self._master_client._timeout, 0.5)
self.assertEqual(self._master_client._timeout, 0.5)
self._master_client.close_channel()
self._master_client.open_channel()
Expand Down

0 comments on commit 829b519

Please sign in to comment.