Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sockct close v2 #1168

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 104 additions & 66 deletions dlrover/python/common/multi_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,22 @@
Args:
path (str): a file path.
"""
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
path_dir = os.path.dirname(path)
os.makedirs(path_dir, exist_ok=True)
if os.path.exists(path):
os.unlink(path)
server.bind(path)
server.listen(0)
server = None
try:
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
path_dir = os.path.dirname(path)
os.makedirs(path_dir, exist_ok=True)
if os.path.exists(path):
os.unlink(path)
server.bind(path)
server.listen(0)
except OSError as e:
logger.error(

Check warning on line 76 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L75-L76

Added lines #L75 - L76 were not covered by tests
f"An error occurred while creating the socket server: {e}"
)
if server:
server.close()
raise

Check warning on line 81 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L79-L81

Added lines #L79 - L81 were not covered by tests
return server


Expand Down Expand Up @@ -212,13 +221,13 @@

@retry_socket
def _request(self, request: SocketRequest):
"""Create a socket client to requet the shared object."""
"""Create a socket client to request the shared object."""
client = _create_socket_client(self._socket_file)
message = pickle.dumps(request)
_socket_send(client, message)
recv_data = _socket_recv(client)
rcv_data = _socket_recv(client)
client.close()
response: LockAcquireResponse = pickle.loads(recv_data)
response: LockAcquireResponse = pickle.loads(rcv_data)
return response


Expand All @@ -242,26 +251,35 @@
else:
self._lock = None

def _deal_shared_lock_msg(self, connection):
try:
recv_data = _socket_recv(connection)
msg: SocketRequest = pickle.loads(recv_data)
if msg.method == "acquire":
response = LockAcquireResponse()
response.acquired = self.acquire(**msg.args)
elif msg.method == "locked":
response = LockedResponse()
response.locked = self.locked()

Check warning on line 263 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L262-L263

Added lines #L262 - L263 were not covered by tests
elif msg.method == "release":
self.release()
response.status = SUCCESS_CODE
except Exception:
response = SocketResponse()
response.status = ERROR_CODE
send_data = pickle.dumps(response)
_socket_send(connection, send_data)

def _sync(self):
while True:
connection, _ = self._server.accept()
try:
recv_data = _socket_recv(connection)
msg: SocketRequest = pickle.loads(recv_data)
if msg.method == "acquire":
response = LockAcquireResponse()
response.acquired = self.acquire(**msg.args)
elif msg.method == "locked":
response = LockedResponse()
response.locked = self.locked()
elif msg.method == "release":
self.release()
response.status = SUCCESS_CODE
except Exception:
response = SocketResponse()
response.status = ERROR_CODE
send_data = pickle.dumps(response)
_socket_send(connection, send_data)
connection, _ = self._server.accept()
try:
self._deal_shared_lock_msg(connection)
finally:
connection.close()
except Exception as e:
logger.error(f"An error in SharedLock occurred: {e}")

Check warning on line 282 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L281-L282

Added lines #L281 - L282 were not covered by tests

def acquire(self, blocking=True):
"""
Expand Down Expand Up @@ -363,30 +381,40 @@
else:
self._queue = None

def _deal_shared_queue_msg(self, connection):
try:
rcv_data = _socket_recv(connection)
msg: SocketRequest = pickle.loads(rcv_data)
response = SocketResponse()
if msg.method == "put":
self.put(**msg.args)
elif msg.method == "get":
response = QueueGetResponse()
response.obj = self.get(**msg.args)
elif msg.method == "qsize":
response = QueueSizeResponse()
response.size = self.qsize()
elif msg.method == "empty":
response = QueueEmptyResponse()
response.empty = self.empty()

Check warning on line 399 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L397-L399

Added lines #L397 - L399 were not covered by tests
response.status = SUCCESS_CODE
except Exception:
response = SocketResponse()
response.status = ERROR_CODE

Check warning on line 403 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L401-L403

Added lines #L401 - L403 were not covered by tests

message = pickle.dumps(response)
_socket_send(connection, message)

def _sync(self):
while True:
connection, _ = self._server.accept()
try:
recv_data = _socket_recv(connection)
msg: SocketRequest = pickle.loads(recv_data)
response = SocketResponse()
if msg.method == "put":
self.put(**msg.args)
elif msg.method == "get":
response = QueueGetResponse()
response.obj = self.get(**msg.args)
elif msg.method == "qsize":
response = QueueSizeResponse()
response.size = self.qsize()
elif msg.method == "empty":
response = QueueEmptyResponse()
response.empty = self.empty()
response.status = SUCCESS_CODE
except Exception:
response = SocketResponse()
response.status = ERROR_CODE
message = pickle.dumps(response)
_socket_send(connection, message)
connection, _ = self._server.accept()
try:
self._deal_shared_queue_msg(connection)
finally:
connection.close()
except Exception as e:
logger.error(f"An error in SharedQueue occurred: {e}")

Check warning on line 417 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L416-L417

Added lines #L416 - L417 were not covered by tests

def put(self, obj, block=True, timeout=None):
"""Put an object into the queue."""
Expand Down Expand Up @@ -471,28 +499,38 @@
name=f"shard_dict_{name}", create=self._create
)

def _deal_shared_dict_msg(self, connection):
try:
rcv_data = _socket_recv(connection)
msg: SocketRequest = pickle.loads(rcv_data)
response = DictMessage()
if msg.method == "set":
self.set(**msg.args)
elif msg.method == "get":
response = DictMessage()
response.meta_dict = self.get(**msg.args)
response.status = SUCCESS_CODE
except Exception as e:
response = SocketResponse()
response.status = ERROR_CODE
logger.error(e)

Check warning on line 516 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L513-L516

Added lines #L513 - L516 were not covered by tests
finally:
if not self._shared_queue.empty():
self._shared_queue.get(1)
message = pickle.dumps(response)
_socket_send(connection, message)

def _sync(self):
while True:
connection, _ = self._server.accept()
try:
recv_data = _socket_recv(connection)
msg: SocketRequest = pickle.loads(recv_data)
response = DictMessage()
if msg.method == "set":
self.set(**msg.args)
elif msg.method == "get":
response = DictMessage()
response.meta_dict = self.get(**msg.args)
response.status = SUCCESS_CODE
connection, _ = self._server.accept()
try:
self._deal_shared_dict_msg(connection)
finally:
# Make sure the connection is closed
connection.close()
except Exception as e:
response = SocketResponse()
response.status = ERROR_CODE
logger.error(e)
finally:
if not self._shared_queue.empty():
self._shared_queue.get(1)
message = pickle.dumps(response)
_socket_send(connection, message)
logger.error(f"An error in SharedDict occurred: {e}")

Check warning on line 533 in dlrover/python/common/multi_process.py

View check run for this annotation

Codecov / codecov/patch

dlrover/python/common/multi_process.py#L533

Added line #L533 was not covered by tests

def set(self, new_dict):
"""
Expand Down
Loading