diff --git a/src/dyad/common/dyad_rc.h b/src/dyad/common/dyad_rc.h index bb3e67eb..979e6519 100644 --- a/src/dyad/common/dyad_rc.h +++ b/src/dyad/common/dyad_rc.h @@ -38,9 +38,10 @@ enum dyad_core_return_codes { DYAD_RC_UCXINIT_FAIL = -14, // UCX initialization failed DYAD_RC_UCXWAIT_FAIL = -15, // UCX wait (either custom or // 'ucp_worker_wait') failed - DYAD_RC_UCXCOMM_FAIL = -16, // UCX communication routine failed - DYAD_RC_UCXMMAP_FAIL = -17, // Failed to perform operations with ucp_mem_map - DYAD_RC_RPC_FINISHED = -18, // The Flux RPC responded with ENODATA (i.e., + DYAD_RC_UCXEP_FAIL = -16, // An operation on a ucp_ep_h failed + DYAD_RC_UCXCOMM_FAIL = -17, // UCX communication routine failed + DYAD_RC_UCXMMAP_FAIL = -18, // Failed to perform operations with ucp_mem_map + DYAD_RC_RPC_FINISHED = -19, // The Flux RPC responded with ENODATA (i.e., // end of stream) sooner than expected DYAD_RC_BAD_B64DECODE = -18, // Decoding of data w/ base64 failed DYAD_RC_BAD_COMM_MODE = -19, // Invalid comm mode provided to DTL diff --git a/src/dyad/dtl/ucx_dtl.c b/src/dyad/dtl/ucx_dtl.c index e882db82..2104c0c5 100644 --- a/src/dyad/dtl/ucx_dtl.c +++ b/src/dyad/dtl/ucx_dtl.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -35,6 +36,7 @@ static void dyad_ucx_request_init (void* request) // Define a function that ucp_tag_msg_recv_nbx will use // as a callback to signal the completion of the async receive +// TODO(Ian): See if we can get msg size from recv_info #if UCP_API_VERSION >= UCP_VERSION(1, 10) static void dyad_recv_callback (void* request, ucs_status_t status, @@ -202,6 +204,292 @@ static dyad_rc_t ucx_free_buffer (dyad_perf_t* perf_handle, return rc; } +static inline dyad_rc_t ucx_connect (dyad_perf_t* perf_handle, + ucp_worker_h worker, + ucp_address_t* addr, + flux_t* h, + ucp_ep_h ep) +{ + ucp_ep_params_t params; + ucs_status_t status = UCS_OK; + DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_connect"); + params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE + | UCP_EP_PARAM_FIELD_ERR_HANDLER; + params.address = addr; + params.err_mode = UCP_ERR_HANDLING_MODE_PEER; + params.err_handler.cb = dyad_ucx_ep_err_handler; + params.err_handler.arg = (void*)h; + status = ucp_ep_create (worker, ¶ms, &ep); + if (UCX_STATUS_FAIL (status)) { + FLUX_LOG_ERR (h, "ucp_ep_create failed with status %d\n", (int)status); + DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); + return DYAD_RC_UCXCOMM_FAIL; + } + DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); + return DYAD_RC_OK; +} + +static inline dyad_rc_t ucx_disconnect (dyad_perf_t* perf_handle, ucp_worker_h worker, ucp_ep_h ep) +{ + dyad_rc_t rc = DYAD_RC_OK; + ucs_status_t status = UCS_OK; + ucs_status_ptr_t stat_ptr; + DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_disconnect"); + if (ep != NULL) { + // ucp_tag_send_sync_nbx is the prefered version of this send + // since UCX 1.9 However, some systems (e.g., Lassen) may have + // an older verison This conditional compilation will use + // ucp_tag_send_sync_nbx if using UCX 1.9+, and it will use the + // deprecated ucp_tag_send_sync_nb if using UCX < 1.9. +#if UCP_API_VERSION >= UCP_VERSION(1, 10) + ucp_request_param_t close_params; + close_params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS; + close_params.flags = UCP_EP_CLOSE_FLAG_FORCE; + stat_ptr = ucp_ep_close_nbx (ep, &close_params); +#else + // TODO change to FORCE if we decide to enable err handleing + // mode + stat_ptr = ucp_ep_close_nb (ep, UCP_EP_CLOSE_MODE_FORCE); +#endif + // Don't use dyad_ucx_request_wait here because ep_close behaves + // differently than other UCX calls + if (stat_ptr != NULL) { + // Endpoint close is in-progress. + // Wait until finished + if (UCS_PTR_IS_PTR (stat_ptr)) { + do { + ucp_worker_progress (worker); + status = ucp_request_check_status (stat_ptr); + } while (status == UCS_INPROGRESS); + ucp_request_free (stat_ptr); + } + // An error occurred during endpoint closure + // However, the endpoint can no longer be used + // Get the status code for reporting + else { + status = UCS_PTR_STATUS (stat_ptr); + } + if (UCX_STATUS_FAIL (status)) { + rc = DYAD_RC_UCXEP_FAIL; + goto ucx_disconnect_region_finish; + } + } + } +ucx_disconnect_region_finish: + DYAD_PERF_REGION_END (perf_handle, "ucx_disconnect"); + return rc; +} + +static inline ucs_status_ptr_t ucx_send_no_wait (dyad_dtl_t* self, void* buf, size_t buflen) +{ + ucs_status_ptr_t stat_ptr; + dyad_ucx_request_t* req = NULL; + dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + dyad_perf_t* perf_handle = self->perf_handle; + if (dtl_handle->ep == NULL) { + FLUX_LOG_INFO (dtl_handle->h, + "UCP endpoint was not created prior to invoking " + "send!\n"); + stat_ptr = (void*)UCS_ERR_NOT_CONNECTED; + goto ucx_send_no_wait_done; + } + // ucp_tag_send_sync_nbx is the prefered version of this send since UCX 1.9 + // However, some systems (e.g., Lassen) may have an older verison + // This conditional compilation will use ucp_tag_send_sync_nbx if using + // UCX 1.9+, and it will use the deprecated ucp_tag_send_sync_nb if using + // UCX < 1.9. + DYAD_PERF_REGION_BEGIN (perf_handle, "ucp_tag_send"); +#if UCP_API_VERSION >= UCP_VERSION(1, 10) + ucp_request_param_t params; + params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK; + params.cb.send = dyad_send_callback; + FLUX_LOG_INFO (dtl_handle->h, "Sending data to consumer with ucp_tag_send_nbx\n"); + stat_ptr = ucp_tag_send_nbx (dtl_handle->ep, buf, buflen, dtl_handle->comm_tag, ¶ms); +#else + FLUX_LOG_INFO (dtl_handle->h, + "Sending %lu bytes of data to consumer with " + "ucp_tag_send_nb\n", + buflen); + stat_ptr = ucp_tag_send_nb (dtl_handle->ep, + buf, + buflen, + UCP_DATATYPE_CONTIG, + dtl_handle->comm_tag, + dyad_send_callback); +#endif + DYAD_PERF_REGION_END (perf_handle, "ucp_tag_send"); +ucx_send_no_wait_done: + return stat_ptr; +} + +static inline ucs_status_ptr_t ucx_recv_no_wait (dyad_dtl_t* self, void** buf, size_t* buflen) +{ + dyad_rc_t rc = DYAD_RC_OK; + ucs_status_t status = UCS_OK; + ucp_tag_message_h msg = NULL; + ucp_tag_recv_info_t msg_info; + ucs_status_ptr_t stat_ptr = NULL; + dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + dyad_perf_t* perf_handle = self->perf_handle; + FLUX_LOG_INFO (dtl_handle->h, "Poll UCP for incoming data\n"); + // TODO: replace this loop with a resiliency response over RPC + // TODO(Ian): explore whether removing probe makes the overall + // recv faster or not + DYAD_PERF_REGION_BEGIN (perf_handle, "ucp_tag_probe"); + do { + ucp_worker_progress (dtl_handle->ucx_worker); + msg = ucp_tag_probe_nb (dtl_handle->ucx_worker, + dtl_handle->comm_tag, + DYAD_UCX_TAG_MASK, + 1, // Remove the message from UCP tracking + // Requires calling ucp_tag_msg_recv_nb + // with the ucp_tag_message_h to retrieve message + &msg_info); + } while (msg == NULL); + DYAD_PERF_REGION_END (perf_handle, "ucp_tag_probe"); + // TODO: This version of the polling code is not supposed to spin-lock, + // unlike the code above. Currently, it does not work. Once it starts + // working, we can replace the code above with a version of this code. + // + // while (true) + // { + // // Probe the tag recv event at the top + // // of the worker's queue + // FLUX_LOG_INFO (dtl_handle->h, "Probe UCP worker with tag %lu\n", + // dtl_handle->comm_tag); msg = ucp_tag_probe_nb( + // dtl_handle->ucx_worker, + // dtl_handle->comm_tag, + // DYAD_UCX_TAG_MASK, + // 1, // Remove the message from UCP tracking + // // Requires calling ucp_tag_msg_recv_nb + // // with the ucp_tag_message_h to retrieve message + // &msg_info + // ); + // // If data has arrived from the producer plugin, + // // break the loop + // if (msg != NULL) + // { + // FLUX_LOG_INFO (dtl_handle->h, "Data has arrived, so end + // polling\n"); break; + // } + // // If data has not arrived, check if there are + // // any other events in the worker's queue. + // // If so, start the loop over to handle the next event + // else if (ucp_worker_progress(dtl_handle->ucx_worker)) + // { + // FLUX_LOG_INFO (dtl_handle->h, "Progressed UCP worker to check if + // any other UCP events are available\n"); continue; + // } + // // No other events are queued. So, we will wait on new + // // events to come in. By using 'ucp_worker_wait' for this, + // // we let the OS do other work in the meantime (no spin locking). + // FLUX_LOG_INFO (dtl_handle->h, "Launch pre-emptable wait until UCP + // worker gets new events\n"); status = + // ucp_worker_wait(dtl_handle->ucx_worker); + // // If the wait fails, log an error + // if (UCX_STATUS_FAIL(status)) + // { + // FLUX_LOG_ERR (dtl_handle->h, "Could not wait on the message from + // the producer plugin\n"); return DYAD_RC_UCXWAIT_FAIL; + // } + // } + // The metadata retrived from the probed tag recv event contains + // the size of the data to be sent. + // So, use that size to allocate a buffer + FLUX_LOG_INFO (dtl_handle->h, + "Got message with tag %lu and size %lu\n", + msg_info.sender_tag, + msg_info.length); + *buflen = msg_info.length; + rc = self->get_buffer (self, *buflen, buf); + if (DYAD_IS_ERROR (rc)) { + *buf = NULL; + *buflen = 0; + stat_ptr = (ucs_status_ptr_t)UCS_ERR_NO_MEMORY; + goto ucx_recv_no_wait_done; + } + FLUX_LOG_INFO (dtl_handle->h, "Receive data using async UCX operation\n"); + DYAD_PERF_REGION_BEGIN (perf_handle, "ucp_tag_msg_recv"); +#if UCP_API_VERSION >= UCP_VERSION(1, 10) + // Define the settings for the recv operation + // + // The settings enabled are: + // * Define callback for the recv because it is async + // * Restrict memory buffers to host-only since we aren't directly + // dealing with GPU memory + ucp_request_param_t recv_params; + // TODO consider enabling UCP_OP_ATTR_FIELD_MEMH to speedup + // the recv operation if using RMA behind the scenes + recv_params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE; + recv_params.cb.recv = dyad_recv_callback; + // Constraining to Host memory (as opposed to GPU memory) + // allows UCX to potentially perform some optimizations + recv_params.memory_type = UCS_MEMORY_TYPE_HOST; + // Perform the async recv operation using the probed tag recv event + stat_ptr = ucp_tag_msg_recv_nbx (dtl_handle->ucx_worker, *buf, *buflen, msg, &recv_params); +#else + stat_ptr = ucp_tag_msg_recv_nb (dtl_handle->ucx_worker, + *buf, + *buflen, + UCP_DATATYPE_CONTIG, + msg, + dyad_recv_callback); +#endif + DYAD_PERF_REGION_END (perf_handle, "ucp_tag_msg_recv"); +ucx_recv_no_wait_done: + return stat_ptr; +} + +static dyad_rc_t ucx_warmup (dyad_dtl_t* self) +{ + dyad_rc_t rc = DYAD_RC_OK; + void* send_buf = NULL; + void* recv_buf = NULL; + size_t recv_buf_len = 0; + ucs_status_ptr_t send_stat_ptr = NULL; + ucs_status_ptr_t recv_stat_ptr = NULL; + ucs_status_t send_status = UCS_OK; + ucs_status_t recv_status = UCS_OK; + ucp_ep_h warmup_ep = NULL; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucx_warmup"); + rc = dyad_dtl_ucx_get_buffer (self, 1, &send_buf); + if (DYAD_IS_ERROR (rc)) { + goto warmup_region_done; + } + recv_buf = malloc (1); + if (recv_buf == NULL) { + dyad_dtl_ucx_return_buffer (self, &send_buf); + rc = DYAD_RC_SYSFAIL; + goto warmup_region_done; + } + rc = ucx_connect (self->perf_handle, + self->private.ucx_dtl_handle->ucx_worker, + self->private.ucx_dtl_handle->consumer_address, + self->private.ucx_dtl_handle->h, + self->private.ucx_dtl_handle->ep); + send_stat_ptr = ucx_send_no_wait (self, send_buf, 1); + recv_stat_ptr = ucx_recv_no_wait (self, &recv_buf, &recv_buf_len); + recv_status = + dyad_ucx_request_wait (self->private.ucx_dtl_handle, recv_stat_ptr, self->perf_handle); + send_status = + dyad_ucx_request_wait (self->private.ucx_dtl_handle, send_stat_ptr, self->perf_handle); + ucx_disconnect (self->perf_handle, + self->private.ucx_dtl_handle->ucx_worker, + self->private.ucx_dtl_handle->ep); + dyad_dtl_ucx_return_buffer (self, &send_buf); + if (UCX_STATUS_FAIL (recv_status) || UCX_STATUS_FAIL (send_status)) { + rc = DYAD_RC_UCXCOMM_FAIL; + goto warmup_region_done; + } + assert (recv_buf_len == 1); + free (recv_buf); + rc = DYAD_RC_OK; + +warmup_region_done: + DYAD_PERF_REGION_END (self->perf_handle, "ucx_warmup"); + return rc; +} + dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, @@ -329,7 +617,14 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, self->recv = dyad_dtl_ucx_recv; self->close_connection = dyad_dtl_ucx_close_connection; + ucx_warmup (self); + // Unset the address on the server because it's not needed + if (dtl_handle->comm_mode == DYAD_COMM_SEND) { + dtl_handle->consumer_address = NULL; + dtl_handle->addr_len = 0; + } DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_init"); + return DYAD_RC_OK; error:; @@ -531,43 +826,34 @@ dyad_rc_t dyad_dtl_ucx_return_buffer (dyad_dtl_t* self, void** data_buf) dyad_rc_t dyad_dtl_ucx_establish_connection (dyad_dtl_t* self) { dyad_rc_t rc = DYAD_RC_OK; - ucp_ep_params_t params; - ucs_status_t status = UCS_OK; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; dyad_dtl_comm_mode_t comm_mode = dtl_handle->comm_mode; DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_establish_connection"); if (comm_mode == DYAD_COMM_SEND) { - params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE - | UCP_EP_PARAM_FIELD_ERR_HANDLER; - params.address = dtl_handle->consumer_address; - params.err_mode = UCP_ERR_HANDLING_MODE_PEER; - params.err_handler.cb = dyad_ucx_ep_err_handler; - params.err_handler.arg = (void*)dtl_handle->h; FLUX_LOG_INFO (dtl_handle->h, "Create UCP endpoint for communication with consumer\n"); - status = ucp_ep_create (dtl_handle->ucx_worker, ¶ms, &dtl_handle->ep); - if (status != UCS_OK) { - FLUX_LOG_ERR (dtl_handle->h, "ucp_ep_create failed with status %d\n", (int)status); - rc = DYAD_RC_UCXCOMM_FAIL; + rc = ucx_connect (self->perf_handle, + dtl_handle->ucx_worker, + dtl_handle->consumer_address, + dtl_handle->h, + dtl_handle->ep); + if (DYAD_IS_ERROR (rc)) { + FLUX_LOG_ERR (dtl_handle->h, "Could not create UCP endpoint"); goto dtl_ucx_establish_connection_region_finish; } if (dtl_handle->debug) { ucp_ep_print_info (dtl_handle->ep, stderr); } rc = DYAD_RC_OK; - goto dtl_ucx_establish_connection_region_finish; } else if (comm_mode == DYAD_COMM_RECV) { FLUX_LOG_INFO (dtl_handle->h, "No explicit connection establishment needed for UCX " "receiver\n"); rc = DYAD_RC_OK; - goto dtl_ucx_establish_connection_region_finish; } else { FLUX_LOG_ERR (dtl_handle->h, "Invalid communication mode: %d\n", comm_mode); // TODO create new RC for this rc = DYAD_RC_BAD_COMM_MODE; - goto dtl_ucx_establish_connection_region_finish; } - rc = DYAD_RC_OK; dtl_ucx_establish_connection_region_finish: DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_establish_connection"); return rc; @@ -578,49 +864,18 @@ dyad_rc_t dyad_dtl_ucx_send (dyad_dtl_t* self, void* buf, size_t buflen) dyad_rc_t rc = DYAD_RC_OK; ucs_status_ptr_t stat_ptr; ucs_status_t status = UCS_OK; - dyad_ucx_request_t* req = NULL; - dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_send"); - if (dtl_handle->ep == NULL) { - FLUX_LOG_INFO (dtl_handle->h, - "UCP endpoint was not created prior to invoking " - "send!\n"); - rc = DYAD_RC_UCXCOMM_FAIL; - goto dtl_ucx_send_region_finish; - } - // ucp_tag_send_sync_nbx is the prefered version of this send since UCX 1.9 - // However, some systems (e.g., Lassen) may have an older verison - // This conditional compilation will use ucp_tag_send_sync_nbx if using - // UCX 1.9+, and it will use the deprecated ucp_tag_send_sync_nb if using - // UCX < 1.9. - DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucp_tag_send"); -#if UCP_API_VERSION >= UCP_VERSION(1, 10) - ucp_request_param_t params; - params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK; - params.cb.send = dyad_send_callback; - FLUX_LOG_INFO (dtl_handle->h, "Sending data to consumer with ucp_tag_send_nbx\n"); - stat_ptr = ucp_tag_send_nbx (dtl_handle->ep, buf, buflen, dtl_handle->comm_tag, ¶ms); -#else - FLUX_LOG_INFO (dtl_handle->h, - "Sending %lu bytes of data to consumer with " - "ucp_tag_send_nb\n", - buflen); - stat_ptr = ucp_tag_send_nb (dtl_handle->ep, - buf, - buflen, - UCP_DATATYPE_CONTIG, - dtl_handle->comm_tag, - dyad_send_callback); -#endif - DYAD_PERF_REGION_END (self->perf_handle, "ucp_tag_send"); - FLUX_LOG_INFO (dtl_handle->h, "Processing UCP send request\n"); - status = dyad_ucx_request_wait (dtl_handle, stat_ptr, self->perf_handle); + stat_ptr = ucx_send_no_wait (self, buf, buflen); + FLUX_LOG_INFO (self->private.ucx_dtl_handle->h, "Processing UCP send request\n"); + status = dyad_ucx_request_wait (self->private.ucx_dtl_handle, stat_ptr, self->perf_handle); if (status != UCS_OK) { - FLUX_LOG_ERR (dtl_handle->h, "UCP Tag Send failed (status = %d)!\n", (int)status); + FLUX_LOG_ERR (self->private.ucx_dtl_handle->h, + "UCP Tag Send failed (status = %d)!\n", + (int)status); rc = DYAD_RC_UCXCOMM_FAIL; goto dtl_ucx_send_region_finish; } - FLUX_LOG_INFO (dtl_handle->h, "Data send with UCP succeeded\n"); + FLUX_LOG_INFO (self->private.ucx_dtl_handle->h, "Data send with UCP succeeded\n"); rc = DYAD_RC_OK; dtl_ucx_send_region_finish: DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_send"); @@ -630,128 +885,24 @@ dyad_rc_t dyad_dtl_ucx_send (dyad_dtl_t* self, void* buf, size_t buflen) dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) { dyad_rc_t rc = DYAD_RC_OK; - ucs_status_t status = UCS_OK; - ucp_tag_message_h msg = NULL; - ucp_tag_recv_info_t msg_info; ucs_status_ptr_t stat_ptr = NULL; - dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_recv"); - FLUX_LOG_INFO (dtl_handle->h, "Poll UCP for incoming data\n"); - // TODO replace this loop with a resiliency response over RPC - DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucp_tag_probe"); - do { - ucp_worker_progress (dtl_handle->ucx_worker); - msg = ucp_tag_probe_nb (dtl_handle->ucx_worker, - dtl_handle->comm_tag, - DYAD_UCX_TAG_MASK, - 1, // Remove the message from UCP tracking - // Requires calling ucp_tag_msg_recv_nb - // with the ucp_tag_message_h to retrieve message - &msg_info); - } while (msg == NULL); - DYAD_PERF_REGION_END (self->perf_handle, "ucp_tag_probe"); - // TODO: This version of the polling code is not supposed to spin-lock, - // unlike the code above. Currently, it does not work. Once it starts - // working, we can replace the code above with a version of this code. - // - // while (true) - // { - // // Probe the tag recv event at the top - // // of the worker's queue - // FLUX_LOG_INFO (dtl_handle->h, "Probe UCP worker with tag %lu\n", - // dtl_handle->comm_tag); msg = ucp_tag_probe_nb( - // dtl_handle->ucx_worker, - // dtl_handle->comm_tag, - // DYAD_UCX_TAG_MASK, - // 1, // Remove the message from UCP tracking - // // Requires calling ucp_tag_msg_recv_nb - // // with the ucp_tag_message_h to retrieve message - // &msg_info - // ); - // // If data has arrived from the producer plugin, - // // break the loop - // if (msg != NULL) - // { - // FLUX_LOG_INFO (dtl_handle->h, "Data has arrived, so end - // polling\n"); break; - // } - // // If data has not arrived, check if there are - // // any other events in the worker's queue. - // // If so, start the loop over to handle the next event - // else if (ucp_worker_progress(dtl_handle->ucx_worker)) - // { - // FLUX_LOG_INFO (dtl_handle->h, "Progressed UCP worker to check if - // any other UCP events are available\n"); continue; - // } - // // No other events are queued. So, we will wait on new - // // events to come in. By using 'ucp_worker_wait' for this, - // // we let the OS do other work in the meantime (no spin locking). - // FLUX_LOG_INFO (dtl_handle->h, "Launch pre-emptable wait until UCP - // worker gets new events\n"); status = - // ucp_worker_wait(dtl_handle->ucx_worker); - // // If the wait fails, log an error - // if (UCX_STATUS_FAIL(status)) - // { - // FLUX_LOG_ERR (dtl_handle->h, "Could not wait on the message from - // the producer plugin\n"); return DYAD_RC_UCXWAIT_FAIL; - // } - // } - // The metadata retrived from the probed tag recv event contains - // the size of the data to be sent. - // So, use that size to allocate a buffer - FLUX_LOG_INFO (dtl_handle->h, - "Got message with tag %lu and size %lu\n", - msg_info.sender_tag, - msg_info.length); - *buflen = msg_info.length; - rc = self->get_buffer (self, *buflen, buf); - if (DYAD_IS_ERROR (rc)) { - *buf = NULL; - *buflen = 0; - goto dtl_ucx_recv_region_finish; - } - FLUX_LOG_INFO (dtl_handle->h, "Receive data using async UCX operation\n"); - DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucp_tag_msg_recv"); -#if UCP_API_VERSION >= UCP_VERSION(1, 10) - // Define the settings for the recv operation - // - // The settings enabled are: - // * Define callback for the recv because it is async - // * Restrict memory buffers to host-only since we aren't directly - // dealing with GPU memory - ucp_request_param_t recv_params; - // TODO consider enabling UCP_OP_ATTR_FIELD_MEMH to speedup - // the recv operation if using RMA behind the scenes - recv_params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_MEMORY_TYPE; - recv_params.cb.recv = dyad_recv_callback; - // Constraining to Host memory (as opposed to GPU memory) - // allows UCX to potentially perform some optimizations - recv_params.memory_type = UCS_MEMORY_TYPE_HOST; - // Perform the async recv operation using the probed tag recv event - stat_ptr = ucp_tag_msg_recv_nbx (dtl_handle->ucx_worker, *buf, *buflen, msg, &recv_params); -#else - stat_ptr = ucp_tag_msg_recv_nb (dtl_handle->ucx_worker, - *buf, - *buflen, - UCP_DATATYPE_CONTIG, - msg, - dyad_recv_callback); -#endif - DYAD_PERF_REGION_END (self->perf_handle, "ucp_tag_msg_recv"); + ucs_status_t status = UCS_OK; // Wait on the recv operation to complete - FLUX_LOG_INFO (dtl_handle->h, "Wait for UCP recv operation to complete\n"); + stat_ptr = ucx_recv_no_wait (self, buf, buflen); + FLUX_LOG_INFO (self->private.ucx_dtl_handle->h, "Wait for UCP recv operation to complete\n"); status = dyad_ucx_request_wait (self->private.ucx_dtl_handle, stat_ptr, self->perf_handle); // If the recv operation failed, log an error, free the data buffer, // and set the buffer pointer to NULL if (UCX_STATUS_FAIL (status)) { - FLUX_LOG_ERR (dtl_handle->h, "UCX recv failed!\n"); + FLUX_LOG_ERR (self->private.ucx_dtl_handle->h, "UCX recv failed!\n"); free (*buf); *buf = NULL; rc = DYAD_RC_UCXCOMM_FAIL; goto dtl_ucx_recv_region_finish; } - FLUX_LOG_INFO (dtl_handle->h, "Data receive using UCX is successful\n"); - FLUX_LOG_INFO (dtl_handle->h, "Received %lu bytes from producer\n", *buflen); + FLUX_LOG_INFO (self->private.ucx_dtl_handle->h, "Data receive using UCX is successful\n"); + FLUX_LOG_INFO (self->private.ucx_dtl_handle->h, "Received %lu bytes from producer\n", *buflen); rc = DYAD_RC_OK; dtl_ucx_recv_region_finish: DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_recv"); @@ -761,59 +912,18 @@ dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) dyad_rc_t dyad_dtl_ucx_close_connection (dyad_dtl_t* self) { dyad_rc_t rc = DYAD_RC_OK; - ucs_status_t status = UCS_OK; - ucs_status_ptr_t stat_ptr; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; dyad_dtl_comm_mode_t comm_mode = dtl_handle->comm_mode; DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_close_connection"); if (comm_mode == DYAD_COMM_SEND) { if (dtl_handle != NULL) { - if (dtl_handle->ep != NULL) { - // ucp_tag_send_sync_nbx is the prefered version of this send - // since UCX 1.9 However, some systems (e.g., Lassen) may have - // an older verison This conditional compilation will use - // ucp_tag_send_sync_nbx if using UCX 1.9+, and it will use the - // deprecated ucp_tag_send_sync_nb if using UCX < 1.9. - FLUX_LOG_INFO (dtl_handle->h, "Start async closing of UCP endpoint\n"); -#if UCP_API_VERSION >= UCP_VERSION(1, 10) - ucp_request_param_t close_params; - close_params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS; - close_params.flags = UCP_EP_CLOSE_FLAG_FORCE; - stat_ptr = ucp_ep_close_nbx (dtl_handle->ep, &close_params); -#else - // TODO change to FORCE if we decide to enable err handleing - // mode - stat_ptr = ucp_ep_close_nb (dtl_handle->ep, UCP_EP_CLOSE_MODE_FORCE); -#endif - FLUX_LOG_INFO (dtl_handle->h, "Wait for endpoint closing to finish\n"); - // Don't use dyad_ucx_request_wait here because ep_close behaves - // differently than other UCX calls - if (stat_ptr != NULL) { - // Endpoint close is in-progress. - // Wait until finished - if (UCS_PTR_IS_PTR (stat_ptr)) { - do { - ucp_worker_progress (dtl_handle->ucx_worker); - status = ucp_request_check_status (stat_ptr); - } while (status == UCS_INPROGRESS); - ucp_request_free (stat_ptr); - } - // An error occurred during endpoint closure - // However, the endpoint can no longer be used - // Get the status code for reporting - else { - status = UCS_PTR_STATUS (stat_ptr); - } - if (status != UCS_OK) { - FLUX_LOG_ERR (dtl_handle->h, - "Could not successfully close Endpoint " - "(status = %d)! However, endpoint was " - "released.\n", - status); - } - } - dtl_handle->ep = NULL; + rc = ucx_disconnect (self->perf_handle, dtl_handle->ucx_worker, dtl_handle->ep); + if (DYAD_IS_ERROR (rc)) { + FLUX_LOG_ERR (dtl_handle->h, + "Could not successfully close Endpoint! However, endpoint was " + "released."); } + dtl_handle->ep = NULL; // Sender doesn't have a consumer address at this time // So, free the consumer address when closing the connection if (dtl_handle->consumer_address != NULL) { diff --git a/src/dyad/wrapper/wrapper.c b/src/dyad/wrapper/wrapper.c index 3f334330..cd00ea20 100644 --- a/src/dyad/wrapper/wrapper.c +++ b/src/dyad/wrapper/wrapper.c @@ -34,11 +34,10 @@ using namespace std; // std::clock () #endif // defined(__cplusplus) #include +#include #include #include // dirname #include - -#include // #include "wrapper.h" #include #include @@ -48,6 +47,10 @@ using namespace std; // std::clock () extern "C" { #endif +// Note: +// To ensure we don't have multiple initialization, we need the following: +// 1) The DYAD context (ctx below) must be static +// 2) The DYAD context should be on the heap (done w/ malloc in dyad_init) static __thread dyad_ctx_t *ctx = NULL; static void dyad_wrapper_init (void) __attribute__ ((constructor)); static void dyad_wrapper_fini (void) __attribute__ ((destructor)); @@ -174,9 +177,7 @@ DYAD_DLL_EXPORTED FILE *fopen (const char *path, const char *mode) } if (!(ctx && ctx->h) || (ctx && !ctx->reenter) || !path) { - IPRINTF (ctx, - "DYAD_SYNC: fopen sync not applicable for \"%s\".\n", - ((path) ? path : "")); + IPRINTF (ctx, "DYAD_SYNC: fopen sync not applicable for \"%s\".\n", ((path) ? path : "")); goto real_call; } @@ -251,9 +252,7 @@ real_call:; // semicolon here to avoid the error int wronly = is_wronly (fd); if (wronly == -1) { - DPRINTF (ctx, - "Failed to check the mode of the file with fcntl: %s\n", - strerror (errno)); + DPRINTF (ctx, "Failed to check the mode of the file with fcntl: %s\n", strerror (errno)); } if (to_sync && wronly == 1) { @@ -334,9 +333,7 @@ real_call:; int wronly = is_wronly (fd); if (wronly == -1) { - DPRINTF (ctx, - "Failed to check the mode of the file with fcntl: %s\n", - strerror (errno)); + DPRINTF (ctx, "Failed to check the mode of the file with fcntl: %s\n", strerror (errno)); } if (to_sync && wronly == 1) {