-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Release GIL around C functions #391
base: branch-0.13
Are you sure you want to change the base?
Changes from 7 commits
34b65dc
a603c54
7d3b137
4d97b4a
d808ba5
5cbc5e2
c293849
65721dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,26 +53,27 @@ cdef create_future_from_comm_status(ucs_status_ptr_t status, | |
return ret | ||
|
||
|
||
cdef void _send_callback(void *request, ucs_status_t status): | ||
cdef void _send_callback(void *request, ucs_status_t status) nogil: | ||
cdef ucp_request *req = <ucp_request*> request | ||
if req.future == NULL: | ||
# This callback function was called before ucp_tag_send_nb() returned | ||
req.finished = True | ||
return | ||
cdef object future = <object> req.future | ||
cdef object log_str = <object> req.log_str | ||
if asyncio.get_event_loop().is_closed(): | ||
pass | ||
elif status == UCS_ERR_CANCELED: | ||
future.set_exception(UCXCanceled()) | ||
elif status != UCS_OK: | ||
msg = "Error sending%s " %(" \"%s\":" % log_str if log_str else ":") | ||
msg += (<object> ucs_status_string(status)).decode("utf-8") | ||
future.set_exception(UCXError(msg)) | ||
else: | ||
future.set_result(True) | ||
Py_DECREF(future) | ||
Py_DECREF(log_str) | ||
with gil: | ||
future = <object> req.future | ||
log_str = <object> req.log_str | ||
if asyncio.get_event_loop().is_closed(): | ||
pass | ||
elif status == UCS_ERR_CANCELED: | ||
future.set_exception(UCXCanceled()) | ||
elif status != UCS_OK: | ||
msg = "Error sending%s " %(" \"%s\":" % log_str if log_str else ":") | ||
msg += (<object> ucs_status_string(status)).decode("utf-8") | ||
future.set_exception(UCXError(msg)) | ||
else: | ||
future.set_result(True) | ||
Py_DECREF(future) | ||
Py_DECREF(log_str) | ||
ucp_request_reset(request) | ||
ucp_request_free(request) | ||
|
||
|
@@ -82,43 +83,47 @@ def tag_send(uintptr_t ucp_ep, buffer, size_t nbytes, | |
cdef ucp_ep_h ep = <ucp_ep_h><uintptr_t>ucp_ep | ||
cdef void *data = <void*><uintptr_t>(get_buffer_data(buffer, | ||
check_writable=False)) | ||
cdef ucp_send_callback_t _send_cb = <ucp_send_callback_t>_send_callback | ||
cdef ucs_status_ptr_t status = ucp_tag_send_nb(ep, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
tag, | ||
_send_cb) | ||
cdef ucp_send_callback_t _send_cb | ||
cdef ucs_status_ptr_t status | ||
with nogil: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be with the gil since it's a callback executed by C ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our Cython callbacks are acquiring the GIL before they call into Python. So we need to be sure to release the GIL beforehand. |
||
_send_cb = <ucp_send_callback_t>_send_callback | ||
status = ucp_tag_send_nb(ep, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
tag, | ||
_send_cb) | ||
return create_future_from_comm_status(status, nbytes, pending_msg) | ||
|
||
|
||
cdef void _tag_recv_callback(void *request, ucs_status_t status, | ||
ucp_tag_recv_info_t *info): | ||
ucp_tag_recv_info_t *info) nogil: | ||
cdef ucp_request *req = <ucp_request*> request | ||
if req.future == NULL: | ||
# This callback function was called before ucp_tag_recv_nb() returned | ||
req.finished = True | ||
req.received = info.length | ||
return | ||
cdef object future = <object> req.future | ||
cdef object log_str = <object> req.log_str | ||
msg = "Error receiving%s " %(" \"%s\":" % log_str if log_str else ":") | ||
if asyncio.get_event_loop().is_closed(): | ||
pass | ||
elif status == UCS_ERR_CANCELED: | ||
future.set_exception(UCXCanceled()) | ||
elif status != UCS_OK: | ||
msg += (<object> ucs_status_string(status)).decode("utf-8") | ||
future.set_exception(UCXError(msg)) | ||
elif info.length != req.expected_receive: | ||
msg += "length mismatch: %d (got) != %d (expected)" % ( | ||
info.length, req.expected_receive | ||
) | ||
future.set_exception(UCXError(msg)) | ||
else: | ||
future.set_result(True) | ||
Py_DECREF(future) | ||
Py_DECREF(log_str) | ||
with gil: | ||
future = <object> req.future | ||
log_str = <object> req.log_str | ||
msg = "Error receiving%s " %(" \"%s\":" % log_str if log_str else ":") | ||
if asyncio.get_event_loop().is_closed(): | ||
pass | ||
elif status == UCS_ERR_CANCELED: | ||
future.set_exception(UCXCanceled()) | ||
elif status != UCS_OK: | ||
msg += (<object> ucs_status_string(status)).decode("utf-8") | ||
future.set_exception(UCXError(msg)) | ||
elif info.length != req.expected_receive: | ||
msg += "length mismatch: %d (got) != %d (expected)" % ( | ||
info.length, req.expected_receive | ||
) | ||
future.set_exception(UCXError(msg)) | ||
else: | ||
future.set_result(True) | ||
Py_DECREF(future) | ||
Py_DECREF(log_str) | ||
ucp_request_reset(request) | ||
ucp_request_free(request) | ||
|
||
|
@@ -128,59 +133,64 @@ def tag_recv(uintptr_t ucp_worker, buffer, size_t nbytes, | |
cdef ucp_worker_h worker = <ucp_worker_h><uintptr_t>ucp_worker | ||
cdef void *data = <void*><uintptr_t>(get_buffer_data(buffer, | ||
check_writable=True)) | ||
cdef ucp_tag_recv_callback_t _tag_recv_cb = ( | ||
<ucp_tag_recv_callback_t>_tag_recv_callback | ||
) | ||
cdef ucs_status_ptr_t status = ucp_tag_recv_nb(worker, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
tag, | ||
-1, | ||
_tag_recv_cb) | ||
cdef ucp_tag_recv_callback_t _tag_recv_cb | ||
cdef ucs_status_ptr_t status | ||
with nogil: | ||
_tag_recv_cb = <ucp_tag_recv_callback_t>_tag_recv_callback | ||
status = ucp_tag_recv_nb(worker, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
tag, | ||
-1, | ||
_tag_recv_cb) | ||
return create_future_from_comm_status(status, nbytes, pending_msg) | ||
|
||
|
||
def stream_send(uintptr_t ucp_ep, buffer, size_t nbytes, pending_msg=None): | ||
cdef ucp_ep_h ep = <ucp_ep_h><uintptr_t>ucp_ep | ||
cdef void *data = <void*><uintptr_t>(get_buffer_data(buffer, | ||
check_writable=False)) | ||
cdef ucp_send_callback_t _send_cb = <ucp_send_callback_t>_send_callback | ||
cdef ucs_status_ptr_t status = ucp_stream_send_nb(ep, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
_send_cb, | ||
0) | ||
cdef ucp_send_callback_t _send_cb | ||
cdef ucs_status_ptr_t status | ||
with nogil: | ||
_send_cb = <ucp_send_callback_t>_send_callback | ||
status = ucp_stream_send_nb(ep, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
_send_cb, | ||
0) | ||
return create_future_from_comm_status(status, nbytes, pending_msg) | ||
|
||
|
||
cdef void _stream_recv_callback(void *request, ucs_status_t status, | ||
size_t length): | ||
size_t length) nogil: | ||
cdef ucp_request *req = <ucp_request*> request | ||
if req.future == NULL: | ||
# This callback function was called before ucp_stream_recv_nb() returned | ||
req.finished = True | ||
req.received = length | ||
return | ||
cdef object future = <object> req.future | ||
cdef object log_str = <object> req.log_str | ||
msg = "Error receiving %s" %(" \"%s\":" % log_str if log_str else ":") | ||
if asyncio.get_event_loop().is_closed(): | ||
pass | ||
elif status == UCS_ERR_CANCELED: | ||
future.set_exception(UCXCanceled()) | ||
elif status != UCS_OK: | ||
msg += (<object> ucs_status_string(status)).decode("utf-8") | ||
future.set_exception(UCXError(msg)) | ||
elif length != req.expected_receive: | ||
msg += "length mismatch: %d (got) != %d (expected)" % ( | ||
length, req.expected_receive) | ||
future.set_exception(UCXError(msg)) | ||
else: | ||
future.set_result(True) | ||
Py_DECREF(future) | ||
Py_DECREF(log_str) | ||
with gil: | ||
future = <object> req.future | ||
log_str = <object> req.log_str | ||
msg = "Error receiving %s" %(" \"%s\":" % log_str if log_str else ":") | ||
if asyncio.get_event_loop().is_closed(): | ||
pass | ||
elif status == UCS_ERR_CANCELED: | ||
future.set_exception(UCXCanceled()) | ||
elif status != UCS_OK: | ||
msg += (<object> ucs_status_string(status)).decode("utf-8") | ||
future.set_exception(UCXError(msg)) | ||
elif length != req.expected_receive: | ||
msg += "length mismatch: %d (got) != %d (expected)" % ( | ||
length, req.expected_receive) | ||
future.set_exception(UCXError(msg)) | ||
else: | ||
future.set_result(True) | ||
Py_DECREF(future) | ||
Py_DECREF(log_str) | ||
ucp_request_reset(request) | ||
ucp_request_free(request) | ||
|
||
|
@@ -191,14 +201,15 @@ def stream_recv(uintptr_t ucp_ep, buffer, size_t nbytes, pending_msg=None): | |
check_writable=True)) | ||
cdef size_t length | ||
cdef ucp_request *req | ||
cdef ucp_stream_recv_callback_t _stream_recv_cb = ( | ||
<ucp_stream_recv_callback_t>_stream_recv_callback | ||
) | ||
cdef ucs_status_ptr_t status = ucp_stream_recv_nb(ep, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
_stream_recv_cb, | ||
&length, | ||
0) | ||
cdef ucp_stream_recv_callback_t _stream_recv_cb | ||
cdef ucs_status_ptr_t status | ||
with nogil: | ||
_stream_recv_cb = <ucp_stream_recv_callback_t>_stream_recv_callback | ||
status = ucp_stream_recv_nb(ep, | ||
data, | ||
nbytes, | ||
ucp_dt_make_contig(1), | ||
_stream_recv_cb, | ||
&length, | ||
0) | ||
return create_future_from_comm_status(status, nbytes, pending_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be with the gil ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh , any call back needs to be with the gil, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right.