diff --git a/src/dyad/dtl/CMakeLists.txt b/src/dyad/dtl/CMakeLists.txt index 2648ecfb..b9e0d13c 100644 --- a/src/dyad/dtl/CMakeLists.txt +++ b/src/dyad/dtl/CMakeLists.txt @@ -9,8 +9,8 @@ set(FLUX_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/flux_dtl.h) set(FLUX_PUBLIC_HEADERS ) # UCX implementation for DTL -set(UCX_DTL_SRC ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.c) -set(UCX_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.h) +set(UCX_DTL_SRC ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.c ${CMAKE_CURRENT_SOURCE_DIR}/ucx_ep_cache.cpp) +set(UCX_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.h ${CMAKE_CURRENT_SOURCE_DIR}/ucx_ep_cache.h) set(UCX_PUBLIC_HEADERS ) if(DYAD_DATA_PLANE STREQUAL "UCX") diff --git a/src/dyad/dtl/ucx_dtl.c b/src/dyad/dtl/ucx_dtl.c index 6359322d..cabb59d3 100644 --- a/src/dyad/dtl/ucx_dtl.c +++ b/src/dyad/dtl/ucx_dtl.c @@ -14,10 +14,6 @@ extern const base64_maps_t base64_maps_rfc4648; // Tag mask for UCX Tag send/recv #define DYAD_UCX_TAG_MASK UINT64_MAX -// Macro function used to simplify checking the status -// of UCX operations -#define UCX_STATUS_FAIL(status) (status != UCS_OK) - // Define a request struct to be used in handling // async UCX operations struct ucx_request { @@ -61,7 +57,7 @@ static void dyad_send_callback (void* req, ucs_status_t status) real_req->completed = 1; } -void dyad_ucx_ep_err_handler (void* arg, ucp_ep_h ep, ucs_status_t status) +static void dyad_ucx_ep_err_handler (void* arg, ucp_ep_h ep, ucs_status_t status) { flux_t* h = (flux_t*)arg; FLUX_LOG_ERR (h, "An error occured on the UCP endpoint (status = %d)\n", status); @@ -204,87 +200,6 @@ static dyad_rc_t ucx_free_buffer (dyad_perf_t* perf_handle, return rc; } -static inline dyad_rc_t ucx_connect (dyad_perf_t* perf_handle, - ucp_worker_h worker, - ucp_address_t* addr, - flux_t* h, - ucp_ep_h* ep) -{ - ucp_ep_params_t params; - ucs_status_t status = UCS_OK; - DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_connect"); - params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE - | UCP_EP_PARAM_FIELD_ERR_HANDLER; - params.address = addr; - params.err_mode = UCP_ERR_HANDLING_MODE_PEER; - params.err_handler.cb = dyad_ucx_ep_err_handler; - params.err_handler.arg = (void*)h; - status = ucp_ep_create (worker, ¶ms, ep); - if (UCX_STATUS_FAIL (status)) { - FLUX_LOG_ERR (h, "ucp_ep_create failed with status %d\n", (int)status); - DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); - return DYAD_RC_UCXCOMM_FAIL; - } - if (*ep == NULL) { - FLUX_LOG_ERR (h, "ucp_ep_create succeeded, but returned a NULL endpoint"); - DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); - return DYAD_RC_UCXCOMM_FAIL; - } - DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); - return DYAD_RC_OK; -} - -static inline dyad_rc_t ucx_disconnect (dyad_perf_t* perf_handle, ucp_worker_h worker, ucp_ep_h ep) -{ - dyad_rc_t rc = DYAD_RC_OK; - ucs_status_t status = UCS_OK; - ucs_status_ptr_t stat_ptr; - DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_disconnect"); - if (ep != NULL) { - // ucp_tag_send_sync_nbx is the prefered version of this send - // since UCX 1.9 However, some systems (e.g., Lassen) may have - // an older verison This conditional compilation will use - // ucp_tag_send_sync_nbx if using UCX 1.9+, and it will use the - // deprecated ucp_tag_send_sync_nb if using UCX < 1.9. -#if UCP_API_VERSION >= UCP_VERSION(1, 10) - ucp_request_param_t close_params; - close_params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS; - close_params.flags = UCP_EP_CLOSE_FLAG_FORCE; - stat_ptr = ucp_ep_close_nbx (ep, &close_params); -#else - // TODO change to FORCE if we decide to enable err handleing - // mode - stat_ptr = ucp_ep_close_nb (ep, UCP_EP_CLOSE_MODE_FORCE); -#endif - // Don't use dyad_ucx_request_wait here because ep_close behaves - // differently than other UCX calls - if (stat_ptr != NULL) { - // Endpoint close is in-progress. - // Wait until finished - if (UCS_PTR_IS_PTR (stat_ptr)) { - do { - ucp_worker_progress (worker); - status = ucp_request_check_status (stat_ptr); - } while (status == UCS_INPROGRESS); - ucp_request_free (stat_ptr); - } - // An error occurred during endpoint closure - // However, the endpoint can no longer be used - // Get the status code for reporting - else { - status = UCS_PTR_STATUS (stat_ptr); - } - if (UCX_STATUS_FAIL (status)) { - rc = DYAD_RC_UCXEP_FAIL; - goto ucx_disconnect_region_finish; - } - } - } -ucx_disconnect_region_finish: - DYAD_PERF_REGION_END (perf_handle, "ucx_disconnect"); - return rc; -} - static inline ucs_status_ptr_t ucx_send_no_wait (dyad_dtl_t* self, void* buf, size_t buflen) { ucs_status_ptr_t stat_ptr; @@ -566,6 +481,7 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, dtl_handle->net_buf = NULL; dtl_handle->max_transfer_size = UCX_MAX_TRANSFER_SIZE; dtl_handle->ep = NULL; + dtl_handle->ep_cache = NULL; dtl_handle->local_address = NULL; dtl_handle->local_addr_len = 0; dtl_handle->remote_address = NULL; @@ -642,6 +558,13 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, dtl_handle->local_address = worker_attrs.address; dtl_handle->local_addr_len = worker_attrs.address_length; + // Initialize endpoint cache + rc = dyad_ucx_ep_cache_init (&(dtl_handle->ep_cache)); + if (DYAD_IS_ERROR (rc)) { + FLUX_LOG_ERR (h, "Cannot create endpoint cache (err code = %d)", (int)rc); + goto error; + } + // Allocate a buffer of max transfer size using UCX ucx_allocate_buffer (self->perf_handle, dtl_handle->h, @@ -667,6 +590,7 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, FLUX_LOG_ERR (h, "Warmup for UCX DTL failed"); goto error; } + dtl_handle->ep = NULL; DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_init"); @@ -883,15 +807,41 @@ dyad_rc_t dyad_dtl_ucx_establish_connection (dyad_dtl_t* self) DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_establish_connection"); if (comm_mode == DYAD_COMM_SEND) { FLUX_LOG_INFO (dtl_handle->h, "Create UCP endpoint for communication with consumer\n"); - rc = ucx_connect (self->perf_handle, - dtl_handle->ucx_worker, - dtl_handle->remote_address, - dtl_handle->h, - &(dtl_handle->ep)); + rc = dyad_ucx_ep_cache_find (dtl_handle->ep_cache, + dtl_handle->remote_address, + dtl_handle->remote_addr_len, + &(dtl_handle->ep)); if (DYAD_IS_ERROR (rc)) { - FLUX_LOG_ERR (dtl_handle->h, "Could not create UCP endpoint"); - goto dtl_ucx_establish_connection_region_finish; + rc = dyad_ucx_ep_cache_insert (dtl_handle->ep_cache, + dtl_handle->remote_address, + dtl_handle->remote_addr_len, + dtl_handle->ucx_worker, + dtl_handle->h); + if (DYAD_IS_ERROR (rc)) { + DYAD_LOG_ERR (dtl_handle, "Failed to create UCP endpoint"); + goto dtl_ucx_establish_connection_region_finish; + } + rc = dyad_ucx_ep_cache_find (dtl_handle->ep_cache, + dtl_handle->remote_address, + dtl_handle->remote_addr_len, + &(dtl_handle->ep)); + if (DYAD_IS_ERROR (rc)) { + DYAD_LOG_ERR (dtl_handle, + "Successfully created UCP endpoint, but can't retrieve it from cache " + "(return code = %d)", + (int)rc); + goto dtl_ucx_establish_connection_region_finish; + } } + // rc = ucx_connect (self->perf_handle, + // dtl_handle->ucx_worker, + // dtl_handle->remote_address, + // dtl_handle->h, + // &(dtl_handle->ep)); + // if (DYAD_IS_ERROR (rc)) { + // FLUX_LOG_ERR (dtl_handle->h, "Could not create UCP endpoint"); + // goto dtl_ucx_establish_connection_region_finish; + // } if (dtl_handle->debug) { ucp_ep_print_info (dtl_handle->ep, stderr); } @@ -967,20 +917,23 @@ dyad_rc_t dyad_dtl_ucx_close_connection (dyad_dtl_t* self) DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_close_connection"); if (comm_mode == DYAD_COMM_SEND) { if (dtl_handle != NULL) { - rc = ucx_disconnect (self->perf_handle, dtl_handle->ucx_worker, dtl_handle->ep); - if (DYAD_IS_ERROR (rc)) { - FLUX_LOG_ERR (dtl_handle->h, - "Could not successfully close Endpoint! However, endpoint was " - "released."); - } + // TODO replace this code (either here or elsewhere) with LRU eviction + // rc = ucx_disconnect (self->perf_handle, dtl_handle->ucx_worker, dtl_handle->ep); + // if (DYAD_IS_ERROR (rc)) { + // FLUX_LOG_ERR (dtl_handle->h, + // "Could not successfully close Endpoint! However, endpoint was " + // "released."); + // } dtl_handle->ep = NULL; // Sender doesn't have a consumer address at this time // So, free the consumer address when closing the connection - if (dtl_handle->remote_address != NULL) { - free (dtl_handle->remote_address); - dtl_handle->remote_address = NULL; - dtl_handle->remote_addr_len = 0; - } + // NOTE: currently removing the deallocation of the remote address + // because it is still in use by the endpoint cache + // if (dtl_handle->remote_address != NULL) { + // free (dtl_handle->remote_address); + dtl_handle->remote_address = NULL; + dtl_handle->remote_addr_len = 0; + // } dtl_handle->comm_tag = 0; } FLUX_LOG_INFO (dtl_handle->h, "UCP endpoint close successful\n"); @@ -1020,6 +973,10 @@ dyad_rc_t dyad_dtl_ucx_finalize (dyad_dtl_t** self) dyad_dtl_ucx_close_connection (*self); dtl_handle->ep = NULL; } + if (dtl_handle->ep_cache != NULL) { + dyad_ucx_ep_cache_finalize (&(dtl_handle->ep_cache), dtl_handle->ucx_worker); + dtl_handle->ep_cache = NULL; + } // Release consumer address if not already released if (dtl_handle->local_address != NULL) { ucp_worker_release_address (dtl_handle->ucx_worker, dtl_handle->local_address); diff --git a/src/dyad/dtl/ucx_dtl.h b/src/dyad/dtl/ucx_dtl.h index cd078677..bf4a2e5f 100644 --- a/src/dyad/dtl/ucx_dtl.h +++ b/src/dyad/dtl/ucx_dtl.h @@ -2,6 +2,7 @@ #define DYAD_DTL_UCX_H #include +#include #include #include @@ -20,6 +21,7 @@ struct dyad_dtl_ucx { size_t remote_addr_len; ucp_ep_h ep; ucp_tag_t comm_tag; + ucx_ep_cache_h ep_cache; }; typedef struct dyad_dtl_ucx dyad_dtl_ucx_t; diff --git a/src/dyad/dtl/ucx_ep_cache.cpp b/src/dyad/dtl/ucx_ep_cache.cpp new file mode 100644 index 00000000..63d5230a --- /dev/null +++ b/src/dyad/dtl/ucx_ep_cache.cpp @@ -0,0 +1,237 @@ +#include +#include + +#include +#include +#include +#include + +using key_type = std::pair; +using cache_type = std::unordered_map; + +// Adapted from the following code: +// https://github.com/LLNL/wcs/blob/6590f592a69fe0c553ebf27a1bc348ff1fbbd813/src/utils/seed.hpp#L23 +namespace std +{ +template <> +struct hash { + using arg_t = key_type; + using result_t = size_t; + + result_t operator() (const arg_t& a) const + { + hash hasher; + result_t hashed_val = 0ul; + uint8_t* byte_buf = reinterpret_cast (a.first); + for (size_t i = 0ul; i < a.second; ++i) { + hashed_val = hashed_val * 31 + hasher (*(byte_buf + i)); + } + return hashed_val; + } +}; +} // namespace std + +static void dyad_ucx_ep_err_handler (void* arg, ucp_ep_h ep, ucs_status_t status) +{ + flux_t* h = (flux_t*)arg; + FLUX_LOG_ERR (h, "An error occured on the UCP endpoint (status = %d)\n", status); +} + +dyad_rc_t ucx_connect (dyad_perf_t* perf_handle, + ucp_worker_h worker, + const ucp_address_t* addr, + flux_t* h, + ucp_ep_h* ep) +{ + ucp_ep_params_t params; + ucs_status_t status = UCS_OK; + DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_connect"); + params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE + | UCP_EP_PARAM_FIELD_ERR_HANDLER; + params.address = addr; + params.err_mode = UCP_ERR_HANDLING_MODE_PEER; + params.err_handler.cb = dyad_ucx_ep_err_handler; + params.err_handler.arg = (void*)h; + status = ucp_ep_create (worker, ¶ms, ep); + if (UCX_STATUS_FAIL (status)) { + FLUX_LOG_ERR (h, "ucp_ep_create failed with status %d\n", (int)status); + DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); + return DYAD_RC_UCXCOMM_FAIL; + } + if (*ep == NULL) { + FLUX_LOG_ERR (h, "ucp_ep_create succeeded, but returned a NULL endpoint"); + DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); + return DYAD_RC_UCXCOMM_FAIL; + } + DYAD_PERF_REGION_END (perf_handle, "ucx_connect"); + return DYAD_RC_OK; +} + +dyad_rc_t ucx_disconnect (dyad_perf_t* perf_handle, ucp_worker_h worker, ucp_ep_h ep) +{ + dyad_rc_t rc = DYAD_RC_OK; + ucs_status_t status = UCS_OK; + ucs_status_ptr_t stat_ptr; + DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_disconnect"); + if (ep != NULL) { + // ucp_tag_send_sync_nbx is the prefered version of this send + // since UCX 1.9 However, some systems (e.g., Lassen) may have + // an older verison This conditional compilation will use + // ucp_tag_send_sync_nbx if using UCX 1.9+, and it will use the + // deprecated ucp_tag_send_sync_nb if using UCX < 1.9. +#if UCP_API_VERSION >= UCP_VERSION(1, 10) + ucp_request_param_t close_params; + close_params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS; + close_params.flags = UCP_EP_CLOSE_FLAG_FORCE; + stat_ptr = ucp_ep_close_nbx (ep, &close_params); +#else + // TODO change to FORCE if we decide to enable err handleing + // mode + stat_ptr = ucp_ep_close_nb (ep, UCP_EP_CLOSE_MODE_FORCE); +#endif + // Don't use dyad_ucx_request_wait here because ep_close behaves + // differently than other UCX calls + if (stat_ptr != NULL) { + // Endpoint close is in-progress. + // Wait until finished + if (UCS_PTR_IS_PTR (stat_ptr)) { + do { + ucp_worker_progress (worker); + status = ucp_request_check_status (stat_ptr); + } while (status == UCS_INPROGRESS); + ucp_request_free (stat_ptr); + } + // An error occurred during endpoint closure + // However, the endpoint can no longer be used + // Get the status code for reporting + else { + status = UCS_PTR_STATUS (stat_ptr); + } + if (UCX_STATUS_FAIL (status)) { + rc = DYAD_RC_UCXEP_FAIL; + goto ucx_disconnect_region_finish; + } + } + } +ucx_disconnect_region_finish: + DYAD_PERF_REGION_END (perf_handle, "ucx_disconnect"); + return rc; +} + +dyad_rc_t dyad_ucx_ep_cache_init (ucx_ep_cache_h* cache) +{ + if (cache == nullptr || *cache != nullptr) { + return DYAD_RC_BADBUF; + } + *cache = reinterpret_cast (new (std::nothrow) cache_type ()); + if (*cache == nullptr) { + return DYAD_RC_SYSFAIL; + } + return DYAD_RC_OK; +} + +dyad_rc_t dyad_ucx_ep_cache_find (const ucx_ep_cache_h cache, + const ucp_address_t* addr, + const size_t addr_size, + ucp_ep_h* ep) +{ + dyad_rc_t rc = DYAD_RC_OK; + if (ep == nullptr || *ep != nullptr) { + return DYAD_RC_BADBUF; + } + try { + const cache_type* cpp_cache = reinterpret_cast (cache); + auto key = std::make_pair (const_cast (addr), + static_cast (addr_size)); + cache_type::const_iterator cache_it = cpp_cache->find (key); + if (cache_it == cpp_cache->cend ()) { + *ep = nullptr; + rc = DYAD_RC_NOTFOUND; + } else { + *ep = cache_it->second; + rc = DYAD_RC_OK; + } + } catch (...) { + *ep = nullptr; + rc = DYAD_RC_SYSFAIL; + } + return rc; +} + +dyad_rc_t dyad_ucx_ep_cache_insert (ucx_ep_cache_h cache, + const ucp_address_t* addr, + const size_t addr_size, + ucp_worker_h worker, + flux_t* h) +{ + dyad_rc_t rc = DYAD_RC_OK; + try { + cache_type* cpp_cache = reinterpret_cast (cache); + auto key = std::make_pair (const_cast (addr), + static_cast (addr_size)); + cache_type::const_iterator cache_it = cpp_cache->find (key); + if (cache_it != cpp_cache->cend ()) { + rc = DYAD_RC_OK; + } else { + ucp_ep_h ep; + rc = ucx_connect (nullptr, worker, addr, h, &ep); + if (!DYAD_IS_ERROR (rc)) { + cpp_cache->emplace (key, ep); + rc = DYAD_RC_OK; + } + } + } catch (...) { + rc = DYAD_RC_SYSFAIL; + } + return rc; +} + +static inline cache_type::iterator cache_remove_impl (cache_type* cache, + cache_type::iterator it, + ucp_worker_h worker) +{ + if (it != cache->end ()) { + ucx_disconnect (nullptr, worker, it->second); + // The UCP address was allocated with 'malloc' while unpacking + // the RPC message. So, we extract it from the key and free + // it after erasing the iterator + ucp_address_t* addr = it->first.first; + auto next_it = cache->erase (it); + free (addr); + return next_it; + } + return cache->end (); +} + +dyad_rc_t dyad_ucx_ep_cache_remove (ucx_ep_cache_h cache, + const ucp_address_t* addr, + const size_t addr_size, + ucp_worker_h worker) +{ + dyad_rc_t rc = DYAD_RC_OK; + try { + cache_type* cpp_cache = reinterpret_cast (cache); + auto key = std::make_pair (const_cast (addr), + static_cast (addr_size)); + cache_type::iterator cache_it = cpp_cache->find (key); + cache_remove_impl (cpp_cache, cache_it, worker); + rc = DYAD_RC_OK; + } catch (...) { + rc = DYAD_RC_SYSFAIL; + } + return rc; +} + +dyad_rc_t dyad_ucx_ep_cache_finalize (ucx_ep_cache_h* cache, ucp_worker_h worker) +{ + if (cache == nullptr || *cache == nullptr) { + return DYAD_RC_OK; + } + cache_type* cpp_cache = reinterpret_cast (*cache); + for (cache_type::iterator it = cpp_cache->begin (); it != cpp_cache->end ();) { + it = cache_remove_impl (cpp_cache, it, worker); + } + delete cpp_cache; + *cache = nullptr; + return DYAD_RC_OK; +} \ No newline at end of file diff --git a/src/dyad/dtl/ucx_ep_cache.h b/src/dyad/dtl/ucx_ep_cache.h new file mode 100644 index 00000000..5a38904a --- /dev/null +++ b/src/dyad/dtl/ucx_ep_cache.h @@ -0,0 +1,53 @@ +#ifndef DYAD_DTL_UCX_EP_CACHE_H +#define DYAD_DTL_UCX_EP_CACHE_H + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Macro function used to simplify checking the status +// of UCX operations +#define UCX_STATUS_FAIL(status) (status != UCS_OK) + +typedef void* ucx_ep_cache_h; + +dyad_rc_t ucx_connect (dyad_perf_t* perf_handle, + ucp_worker_h worker, + const ucp_address_t* addr, + flux_t* h, + ucp_ep_h* ep); + +dyad_rc_t ucx_disconnect (dyad_perf_t* perf_handle, ucp_worker_h worker, ucp_ep_h ep); + +// NOTE: in future, add option to configure replacement strategy +dyad_rc_t dyad_ucx_ep_cache_init (ucx_ep_cache_h* cache); + +// NOTE: not positive if UCP addresses are 100% unique by worker +dyad_rc_t dyad_ucx_ep_cache_find (const ucx_ep_cache_h cache, + const ucp_address_t* addr, + const size_t addr_size, + ucp_ep_h* ep); + +dyad_rc_t dyad_ucx_ep_cache_insert (ucx_ep_cache_h cache, + const ucp_address_t* addr, + const size_t addr_size, + ucp_worker_h worker, + flux_t* h); + +dyad_rc_t dyad_ucx_ep_cache_remove (ucx_ep_cache_h cache, + const ucp_address_t* addr, + const size_t addr_size, + ucp_worker_h worker); + +dyad_rc_t dyad_ucx_ep_cache_finalize (ucx_ep_cache_h* cache, ucp_worker_h worker); + +#ifdef __cplusplus +} +#endif + +#endif /* DYAD_DTL_UCX_EP_CACHE_H */ \ No newline at end of file