Skip to content

Commit

Permalink
Optimization 3: adds on-the-fly endpoint caching to minimize the time…
Browse files Browse the repository at this point in the history
… spent establishing connections
  • Loading branch information
ilumsden committed Jan 10, 2024
1 parent 8d32fb6 commit 9c2f02b
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 106 deletions.
4 changes: 2 additions & 2 deletions src/dyad/dtl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ set(FLUX_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/flux_dtl.h)
set(FLUX_PUBLIC_HEADERS )

# UCX implementation for DTL
set(UCX_DTL_SRC ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.c)
set(UCX_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.h)
set(UCX_DTL_SRC ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.c ${CMAKE_CURRENT_SOURCE_DIR}/ucx_ep_cache.cpp)
set(UCX_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/ucx_dtl.h ${CMAKE_CURRENT_SOURCE_DIR}/ucx_ep_cache.h)
set(UCX_PUBLIC_HEADERS )

if(DYAD_DATA_PLANE STREQUAL "UCX")
Expand Down
165 changes: 61 additions & 104 deletions src/dyad/dtl/ucx_dtl.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ extern const base64_maps_t base64_maps_rfc4648;
// Tag mask for UCX Tag send/recv
#define DYAD_UCX_TAG_MASK UINT64_MAX

// Macro function used to simplify checking the status
// of UCX operations
#define UCX_STATUS_FAIL(status) (status != UCS_OK)

// Define a request struct to be used in handling
// async UCX operations
struct ucx_request {
Expand Down Expand Up @@ -61,7 +57,7 @@ static void dyad_send_callback (void* req, ucs_status_t status)
real_req->completed = 1;
}

void dyad_ucx_ep_err_handler (void* arg, ucp_ep_h ep, ucs_status_t status)
static void dyad_ucx_ep_err_handler (void* arg, ucp_ep_h ep, ucs_status_t status)
{
flux_t* h = (flux_t*)arg;
FLUX_LOG_ERR (h, "An error occured on the UCP endpoint (status = %d)\n", status);
Expand Down Expand Up @@ -204,87 +200,6 @@ static dyad_rc_t ucx_free_buffer (dyad_perf_t* perf_handle,
return rc;
}

static inline dyad_rc_t ucx_connect (dyad_perf_t* perf_handle,
ucp_worker_h worker,
ucp_address_t* addr,
flux_t* h,
ucp_ep_h* ep)
{
ucp_ep_params_t params;
ucs_status_t status = UCS_OK;
DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_connect");
params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE
| UCP_EP_PARAM_FIELD_ERR_HANDLER;
params.address = addr;
params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
params.err_handler.cb = dyad_ucx_ep_err_handler;
params.err_handler.arg = (void*)h;
status = ucp_ep_create (worker, &params, ep);
if (UCX_STATUS_FAIL (status)) {
FLUX_LOG_ERR (h, "ucp_ep_create failed with status %d\n", (int)status);
DYAD_PERF_REGION_END (perf_handle, "ucx_connect");
return DYAD_RC_UCXCOMM_FAIL;
}
if (*ep == NULL) {
FLUX_LOG_ERR (h, "ucp_ep_create succeeded, but returned a NULL endpoint");
DYAD_PERF_REGION_END (perf_handle, "ucx_connect");
return DYAD_RC_UCXCOMM_FAIL;
}
DYAD_PERF_REGION_END (perf_handle, "ucx_connect");
return DYAD_RC_OK;
}

static inline dyad_rc_t ucx_disconnect (dyad_perf_t* perf_handle, ucp_worker_h worker, ucp_ep_h ep)
{
dyad_rc_t rc = DYAD_RC_OK;
ucs_status_t status = UCS_OK;
ucs_status_ptr_t stat_ptr;
DYAD_PERF_REGION_BEGIN (perf_handle, "ucx_disconnect");
if (ep != NULL) {
// ucp_tag_send_sync_nbx is the prefered version of this send
// since UCX 1.9 However, some systems (e.g., Lassen) may have
// an older verison This conditional compilation will use
// ucp_tag_send_sync_nbx if using UCX 1.9+, and it will use the
// deprecated ucp_tag_send_sync_nb if using UCX < 1.9.
#if UCP_API_VERSION >= UCP_VERSION(1, 10)
ucp_request_param_t close_params;
close_params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
close_params.flags = UCP_EP_CLOSE_FLAG_FORCE;
stat_ptr = ucp_ep_close_nbx (ep, &close_params);
#else
// TODO change to FORCE if we decide to enable err handleing
// mode
stat_ptr = ucp_ep_close_nb (ep, UCP_EP_CLOSE_MODE_FORCE);
#endif
// Don't use dyad_ucx_request_wait here because ep_close behaves
// differently than other UCX calls
if (stat_ptr != NULL) {
// Endpoint close is in-progress.
// Wait until finished
if (UCS_PTR_IS_PTR (stat_ptr)) {
do {
ucp_worker_progress (worker);
status = ucp_request_check_status (stat_ptr);
} while (status == UCS_INPROGRESS);
ucp_request_free (stat_ptr);
}
// An error occurred during endpoint closure
// However, the endpoint can no longer be used
// Get the status code for reporting
else {
status = UCS_PTR_STATUS (stat_ptr);
}
if (UCX_STATUS_FAIL (status)) {
rc = DYAD_RC_UCXEP_FAIL;
goto ucx_disconnect_region_finish;
}
}
}
ucx_disconnect_region_finish:
DYAD_PERF_REGION_END (perf_handle, "ucx_disconnect");
return rc;
}

static inline ucs_status_ptr_t ucx_send_no_wait (dyad_dtl_t* self, void* buf, size_t buflen)
{
ucs_status_ptr_t stat_ptr;
Expand Down Expand Up @@ -566,6 +481,7 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self,
dtl_handle->net_buf = NULL;
dtl_handle->max_transfer_size = UCX_MAX_TRANSFER_SIZE;
dtl_handle->ep = NULL;
dtl_handle->ep_cache = NULL;
dtl_handle->local_address = NULL;
dtl_handle->local_addr_len = 0;
dtl_handle->remote_address = NULL;
Expand Down Expand Up @@ -642,6 +558,13 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self,
dtl_handle->local_address = worker_attrs.address;
dtl_handle->local_addr_len = worker_attrs.address_length;

// Initialize endpoint cache
rc = dyad_ucx_ep_cache_init (&(dtl_handle->ep_cache));
if (DYAD_IS_ERROR (rc)) {
FLUX_LOG_ERR (h, "Cannot create endpoint cache (err code = %d)", (int)rc);
goto error;
}

// Allocate a buffer of max transfer size using UCX
ucx_allocate_buffer (self->perf_handle,
dtl_handle->h,
Expand All @@ -667,6 +590,7 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self,
FLUX_LOG_ERR (h, "Warmup for UCX DTL failed");
goto error;
}
dtl_handle->ep = NULL;

DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_init");

Expand Down Expand Up @@ -883,15 +807,41 @@ dyad_rc_t dyad_dtl_ucx_establish_connection (dyad_dtl_t* self)
DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_establish_connection");
if (comm_mode == DYAD_COMM_SEND) {
FLUX_LOG_INFO (dtl_handle->h, "Create UCP endpoint for communication with consumer\n");
rc = ucx_connect (self->perf_handle,
dtl_handle->ucx_worker,
dtl_handle->remote_address,
dtl_handle->h,
&(dtl_handle->ep));
rc = dyad_ucx_ep_cache_find (dtl_handle->ep_cache,
dtl_handle->remote_address,
dtl_handle->remote_addr_len,
&(dtl_handle->ep));
if (DYAD_IS_ERROR (rc)) {
FLUX_LOG_ERR (dtl_handle->h, "Could not create UCP endpoint");
goto dtl_ucx_establish_connection_region_finish;
rc = dyad_ucx_ep_cache_insert (dtl_handle->ep_cache,
dtl_handle->remote_address,
dtl_handle->remote_addr_len,
dtl_handle->ucx_worker,
dtl_handle->h);
if (DYAD_IS_ERROR (rc)) {
DYAD_LOG_ERR (dtl_handle, "Failed to create UCP endpoint");
goto dtl_ucx_establish_connection_region_finish;
}
rc = dyad_ucx_ep_cache_find (dtl_handle->ep_cache,
dtl_handle->remote_address,
dtl_handle->remote_addr_len,
&(dtl_handle->ep));
if (DYAD_IS_ERROR (rc)) {
DYAD_LOG_ERR (dtl_handle,
"Successfully created UCP endpoint, but can't retrieve it from cache "
"(return code = %d)",
(int)rc);
goto dtl_ucx_establish_connection_region_finish;
}
}
// rc = ucx_connect (self->perf_handle,
// dtl_handle->ucx_worker,
// dtl_handle->remote_address,
// dtl_handle->h,
// &(dtl_handle->ep));
// if (DYAD_IS_ERROR (rc)) {
// FLUX_LOG_ERR (dtl_handle->h, "Could not create UCP endpoint");
// goto dtl_ucx_establish_connection_region_finish;
// }
if (dtl_handle->debug) {
ucp_ep_print_info (dtl_handle->ep, stderr);
}
Expand Down Expand Up @@ -967,20 +917,23 @@ dyad_rc_t dyad_dtl_ucx_close_connection (dyad_dtl_t* self)
DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_close_connection");
if (comm_mode == DYAD_COMM_SEND) {
if (dtl_handle != NULL) {
rc = ucx_disconnect (self->perf_handle, dtl_handle->ucx_worker, dtl_handle->ep);
if (DYAD_IS_ERROR (rc)) {
FLUX_LOG_ERR (dtl_handle->h,
"Could not successfully close Endpoint! However, endpoint was "
"released.");
}
// TODO replace this code (either here or elsewhere) with LRU eviction
// rc = ucx_disconnect (self->perf_handle, dtl_handle->ucx_worker, dtl_handle->ep);
// if (DYAD_IS_ERROR (rc)) {
// FLUX_LOG_ERR (dtl_handle->h,
// "Could not successfully close Endpoint! However, endpoint was "
// "released.");
// }
dtl_handle->ep = NULL;
// Sender doesn't have a consumer address at this time
// So, free the consumer address when closing the connection
if (dtl_handle->remote_address != NULL) {
free (dtl_handle->remote_address);
dtl_handle->remote_address = NULL;
dtl_handle->remote_addr_len = 0;
}
// NOTE: currently removing the deallocation of the remote address
// because it is still in use by the endpoint cache
// if (dtl_handle->remote_address != NULL) {
// free (dtl_handle->remote_address);
dtl_handle->remote_address = NULL;
dtl_handle->remote_addr_len = 0;
// }
dtl_handle->comm_tag = 0;
}
FLUX_LOG_INFO (dtl_handle->h, "UCP endpoint close successful\n");
Expand Down Expand Up @@ -1020,6 +973,10 @@ dyad_rc_t dyad_dtl_ucx_finalize (dyad_dtl_t** self)
dyad_dtl_ucx_close_connection (*self);
dtl_handle->ep = NULL;
}
if (dtl_handle->ep_cache != NULL) {
dyad_ucx_ep_cache_finalize (&(dtl_handle->ep_cache), dtl_handle->ucx_worker);
dtl_handle->ep_cache = NULL;
}
// Release consumer address if not already released
if (dtl_handle->local_address != NULL) {
ucp_worker_release_address (dtl_handle->ucx_worker, dtl_handle->local_address);
Expand Down
2 changes: 2 additions & 0 deletions src/dyad/dtl/ucx_dtl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define DYAD_DTL_UCX_H

#include <dyad/dtl/dyad_dtl_impl.h>
#include <dyad/dtl/ucx_ep_cache.h>
#include <stdlib.h>
#include <ucp/api/ucp.h>

Expand All @@ -20,6 +21,7 @@ struct dyad_dtl_ucx {
size_t remote_addr_len;
ucp_ep_h ep;
ucp_tag_t comm_tag;
ucx_ep_cache_h ep_cache;
};

typedef struct dyad_dtl_ucx dyad_dtl_ucx_t;
Expand Down
Loading

0 comments on commit 9c2f02b

Please sign in to comment.