Skip to content

Commit

Permalink
Make sure threads waiting on continuation request execute continuatio…
Browse files Browse the repository at this point in the history
…ns while waiting

No other threads should execute them if the continuation request is limited
to polling only.

Signed-off-by: Joseph Schuchart <[email protected]>
  • Loading branch information
devreal committed Apr 18, 2022
1 parent aa9677d commit e014717
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 84 deletions.
103 changes: 50 additions & 53 deletions ompi/mpiext/continue/c/continuation.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ struct ompi_cont_request_t {
ompi_request_t super;
opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */
bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */
bool cont_in_wait; /**< Whether the continuation request is currently waited on */
opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */
uint32_t continue_max_poll; /**< max number of local continuations to execute at once */
opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */
ompi_wait_sync_t *sync; /**< Sync object this continuation request is attached to */
};

static void ompi_cont_request_construct(ompi_cont_request_t* cont_req)
Expand All @@ -98,10 +98,10 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req)
cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */
opal_atomic_lock_init(&cont_req->cont_lock, false);
cont_req->cont_enqueue_complete = false;
cont_req->cont_in_wait = false;
cont_req->cont_num_active = 0;
cont_req->continue_max_poll = UINT32_MAX;
cont_req->cont_complete_list = NULL;
cont_req->sync = NULL;
}

static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req)
Expand Down Expand Up @@ -156,10 +156,13 @@ static opal_mutex_t request_cont_lock;
*/
static bool progress_callback_registered = false;

/**
* Thread-local list of continuation requests that should be progressed.
*/
static opal_thread_local opal_list_t *thread_progress_list = NULL;
struct lazy_list_s {
opal_list_t list;
bool is_initialized;
};
typedef struct lazy_list_s lazy_list_t;

static opal_thread_local lazy_list_t thread_progress_list = { .is_initialized = false };

static inline
void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req,
Expand All @@ -179,6 +182,10 @@ void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req,
/* signal that all continuations were found complete */
ompi_request_complete(&cont_req->super, true);
}
if (NULL != cont_req->sync) {
/* release the sync object */
OPAL_THREAD_ADD_FETCH32(&cont_req->sync->num_req_need_progress, -1);
}
if (take_lock && using_threads) {
opal_atomic_unlock(&cont_req->cont_lock);
}
Expand All @@ -191,12 +198,7 @@ void ompi_continue_cont_release(ompi_continuation_t *cont)
ompi_cont_request_t *cont_req = cont->cont_req;
assert(OMPI_REQUEST_CONT == cont_req->super.req_type);

/* if a thread is waiting on the request, we got here when
* the thread started executing the continuations, so the continuation
* request is complete already */
if (!cont_req->cont_in_wait) {
ompi_continue_cont_req_release(cont_req, 1, true);
}
ompi_continue_cont_req_release(cont_req, 1, true);
OBJ_RELEASE(cont_req);

#ifdef OPAL_ENABLE_DEBUG
Expand Down Expand Up @@ -240,9 +242,13 @@ int ompi_continue_progress_n(const uint32_t max)
in_progress = 1;

const bool using_threads = opal_using_threads();
if (NULL != thread_progress_list) {

/* execute thread-local continuations first
* (e.g., from continuation requests the current thread is waiting on) */
lazy_list_t *tl_list = &thread_progress_list;
if (tl_list->is_initialized) {
ompi_cont_request_t *cont_req;
OPAL_LIST_FOREACH(cont_req, thread_progress_list, ompi_cont_request_t) {
OPAL_LIST_FOREACH(cont_req, &tl_list->list, ompi_cont_request_t) {
ompi_continuation_t *cb;
if (opal_list_is_empty(cont_req->cont_complete_list)) continue;
while (max > completed) {
Expand Down Expand Up @@ -289,6 +295,12 @@ static int ompi_continue_progress_callback()
return ompi_continue_progress_n(1);
}

static int ompi_continue_wait_progress_callback()
{
return ompi_continue_progress_n(UINT32_MAX);
}


int ompi_continue_progress_request(ompi_request_t *req)
{
if (in_progress) return 0;
Expand Down Expand Up @@ -329,60 +341,53 @@ int ompi_continue_progress_request(ompi_request_t *req)


/**
* Register the provided continuation request to be included in the
* global progress loop (used while a thread is waiting for the contnuation
* request to complete).
* We move all local continuations into the global continuation list
* and mark the continuation request such that future continuations
* are directly put into the global continuations list.
* Once the wait completed (i.e., all continuations registered with the
* continuation request) we unmark it (see ompi_continue_deregister_request_progress).
* Register the continuation request so that it will be progressed even if
* it is poll-only and the thread is waiting on the provided sync object.
*/
int ompi_continue_register_request_progress(ompi_request_t *req)
int ompi_continue_register_request_progress(ompi_request_t *req, ompi_wait_sync_t *sync)
{
ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req;

if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS;

opal_atomic_lock(&cont_req->cont_lock);
lazy_list_t *cont_req_list = &thread_progress_list;

cont_req->cont_in_wait = true;

ompi_continue_cont_req_release(cont_req, opal_list_get_size(cont_req->cont_complete_list), false);
/* check that the thread-local list is initialized */
if (!cont_req_list->is_initialized) {
OBJ_CONSTRUCT(&cont_req_list->list, opal_list_t);
cont_req_list->is_initialized = true;
}

opal_atomic_unlock(&cont_req->cont_lock);
/* add the continuation request to the thread-local list */
opal_list_append(&cont_req_list->list, &cont_req->super.super.super);

if (NULL == thread_progress_list) {
thread_progress_list = OBJ_NEW(opal_list_t);
/* register with the sync object */
if (NULL != sync) {
sync->num_req_need_progress++;
sync->progress_cb = &ompi_continue_wait_progress_callback;
}

/* enqueue the continuation request to allow for progress by this thread */
opal_list_append(thread_progress_list, &req->super.super);
cont_req->sync = sync;

return OMPI_SUCCESS;
}

/**
* Remove the continuation request from being progressed by the global progress
* loop (after a wait completes).
* Remove the poll-only continuation request from the thread's progress list after
* it has completed.
*/
int ompi_continue_deregister_request_progress(ompi_request_t *req)
{
ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req;

if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS;

/* make sure we execute all outstanding continuations */
uint32_t tmp_max_poll = cont_req->continue_max_poll;
cont_req->continue_max_poll = UINT32_MAX;
ompi_continue_progress_request(req);
cont_req->continue_max_poll = tmp_max_poll;

cont_req->cont_in_wait = false;

/* let the sync know we're done, it may suspend the thread now */
if (NULL != cont_req->sync) {
cont_req->sync->num_req_need_progress--;
}

/* remove the continuation request from the thread-local progress list */
opal_list_remove_item(thread_progress_list, &req->super.super);
opal_list_remove_item(&thread_progress_list.list, &req->super.super);

return OMPI_SUCCESS;
}
Expand Down Expand Up @@ -439,13 +444,6 @@ ompi_continue_enqueue_runnable(ompi_continuation_t *cont)
if (NULL != cont_req->cont_complete_list) {
opal_atomic_lock(&cont_req->cont_lock);
opal_list_append(cont_req->cont_complete_list, &cont->super.super);
if (cont_req->cont_in_wait) {
/* if a thread is waiting for this request to complete, signal completions
* the continuations will be executed at the end of the wait
* but we need to ensure that the request is marked complete first
*/
ompi_continue_cont_req_release(cont_req, 1, false);
}
opal_atomic_unlock(&cont_req->cont_lock);
} else {
OPAL_THREAD_LOCK(&request_cont_lock);
Expand Down Expand Up @@ -601,15 +599,14 @@ int ompi_continue_attach(
requests[i] = MPI_REQUEST_NULL;
}
}

}
}

assert(count >= num_registered);
int num_complete = count - num_registered;
int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active,
-num_complete);
if (0 == last_num_active && 0 == num_registered) {
if (0 == last_num_active) {
if (cont_req->cont_enqueue_complete) {
/* enqueue for later processing */
ompi_continue_enqueue_runnable(cont);
Expand Down
14 changes: 8 additions & 6 deletions ompi/mpiext/continue/c/continuation.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "ompi/mpiext/continue/c/mpiext_continue_c.h"


struct ompi_request_t;

BEGIN_C_DECLS

/**
Expand All @@ -38,18 +40,18 @@ int ompi_continuation_fini(void);
* Register a request with local completion list for progressing through
* the progress engine.
*/
int ompi_continue_register_request_progress(ompi_request_t *cont_req);
int ompi_continue_register_request_progress(struct ompi_request_t *cont_req, ompi_wait_sync_t *sync);

/**
* Deregister a request with local completion list from progressing through
* the progress engine.
*/
int ompi_continue_deregister_request_progress(ompi_request_t *cont_req);
int ompi_continue_deregister_request_progress(struct ompi_request_t *cont_req);

/**
* Progress a continuation request that has local completions.
*/
int ompi_continue_progress_request(ompi_request_t *cont_req);
int ompi_continue_progress_request(struct ompi_request_t *cont_req);

/**
* Attach a continuation to a set of operations represented by \c requests.
Expand All @@ -60,9 +62,9 @@ int ompi_continue_progress_request(ompi_request_t *cont_req);
* can be used to query for and progress outstanding continuations.
*/
int ompi_continue_attach(
ompi_request_t *cont_req,
struct ompi_request_t *cont_req,
int count,
ompi_request_t *requests[],
struct ompi_request_t *requests[],
MPIX_Continue_cb_function *cont_cb,
void *cont_data,
ompi_status_public_t statuses[]);
Expand All @@ -71,7 +73,7 @@ int ompi_continue_attach(
/**
* Allocate a new continuation request.
*/
int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info);
int ompi_continue_allocate_request(struct ompi_request_t **cont_req, ompi_info_t *info);

END_C_DECLS

Expand Down
33 changes: 9 additions & 24 deletions ompi/request/req_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,8 @@ int ompi_request_default_wait(
{
ompi_request_t *req = *req_ptr;

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == req->req_type) {
/* let the continuations be processed as part of the global progress loop
* while we're waiting for their completion */
ompi_continue_register_request_progress(req);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */


ompi_request_wait_completion(req);

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == req->req_type) {
ompi_continue_deregister_request_progress(req);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */

#if OPAL_ENABLE_FT_MPI
/* Special case for MPI_ANY_SOURCE */
if( MPI_ERR_PROC_FAILED_PENDING == req->req_status.MPI_ERROR ) {
Expand Down Expand Up @@ -144,13 +129,6 @@ int ompi_request_default_wait_any(size_t count,

request = requests[i];

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == request->req_type) {
have_cont_req = true;
ompi_continue_register_request_progress(request);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */

/* Check for null or completed persistent request. For
* MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
*/
Expand All @@ -167,6 +145,13 @@ int ompi_request_default_wait_any(size_t count,
}
}

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == request->req_type) {
have_cont_req = true;
ompi_continue_register_request_progress(request, &sync);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */

#if OPAL_ENABLE_FT_MPI
if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) {
completed = i;
Expand Down Expand Up @@ -319,7 +304,7 @@ int ompi_request_default_wait_all( size_t count,

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == request->req_type) {
ompi_continue_register_request_progress(request);
ompi_continue_register_request_progress(request, &sync);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */

Expand Down Expand Up @@ -590,7 +575,7 @@ int ompi_request_default_wait_some(size_t count,

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == request->req_type) {
ompi_continue_register_request_progress(request);
ompi_continue_register_request_progress(request, &sync);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */

Expand Down
30 changes: 30 additions & 0 deletions ompi/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
#include "ompi/constants.h"
#include "ompi/runtime/params.h"

#if OMPI_HAVE_MPI_EXT_CONTINUE
#include "ompi/mpiext/continue/c/continuation.h"
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */

BEGIN_C_DECLS

/**
Expand Down Expand Up @@ -465,7 +469,20 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
WAIT_SYNC_INIT(&sync, 1);

if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) {
#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == req->req_type) {
/* let the continuations be processed as part of the global progress loop
* while we're waiting for their completion */
ompi_continue_register_request_progress(req, &sync);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
SYNC_WAIT(&sync);

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == req->req_type) {
ompi_continue_deregister_request_progress(req);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
} else {
/* completed before we had a chance to swap in the sync object */
WAIT_SYNC_SIGNALLED(&sync);
Expand All @@ -487,6 +504,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
}
opal_atomic_rmb();
} else {
#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == req->req_type) {
/* let the continuations be processed as part of the global progress loop
* while we're waiting for their completion */
ompi_continue_register_request_progress(req, NULL);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
while(!REQUEST_COMPLETE(req)) {
opal_progress();
#if OPAL_ENABLE_FT_MPI
Expand All @@ -497,6 +521,12 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
}
#endif /* OPAL_ENABLE_FT_MPI */
}

#if OMPI_HAVE_MPI_EXT_CONTINUE
if (OMPI_REQUEST_CONT == req->req_type) {
ompi_continue_deregister_request_progress(req);
}
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
}
}

Expand Down
Loading

0 comments on commit e014717

Please sign in to comment.