Skip to content

Commit

Permalink
Fix multithread request wait
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Germain <[email protected]>
  • Loading branch information
FlorentGermain-Bull committed Oct 4, 2023
1 parent 0922d15 commit 5f205ef
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 34 deletions.
4 changes: 2 additions & 2 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
}

if (&ompi_request_empty != req) {
ompi_request_wait_completion (req);
ompi_request_wait_completion (&req);
rc = req->req_status.MPI_ERROR;
ompi_comm_request_return ((ompi_comm_request_t *) req);
}
Expand Down Expand Up @@ -909,7 +909,7 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm
}

if (&ompi_request_empty != req) {
ompi_request_wait_completion (req);
ompi_request_wait_completion (&req);
rc = req->req_status.MPI_ERROR;
ompi_comm_request_return ((ompi_comm_request_t *) req);
}
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/coll/ftagree/coll_ftagree_earlyreturning.c
Original file line number Diff line number Diff line change
Expand Up @@ -3200,7 +3200,7 @@ int mca_coll_ftagree_era_intra(void *contrib,
rc = mca_coll_ftagree_iera_intra(contrib, dt_count, dt, op, group, grp_update, comm, &req, module);
if(OPAL_UNLIKELY( OMPI_SUCCESS != rc ))
return rc;
ompi_request_wait_completion(req);
ompi_request_wait_completion(&req);
rc = req->req_status.MPI_ERROR;
ompi_request_free(&req);
return rc;
Expand Down
6 changes: 4 additions & 2 deletions ompi/mca/pml/cm/pml_cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ mca_pml_cm_recv(void *addr,
return ret;
}

ompi_request_wait_completion(&req.req_ompi);
ompi_request_t *ompi_req = &req.req_ompi;
ompi_request_wait_completion(&ompi_req);

if (MPI_STATUS_IGNORE != status) {
OMPI_COPY_STATUS(status, req.req_ompi.req_status, false);
Expand Down Expand Up @@ -533,7 +534,8 @@ mca_pml_cm_mrecv(void *buf,
return ret;
}

ompi_request_wait_completion(&recvreq->req_base.req_ompi);
ompi_request_t *ompi_req = &recvreq->req_base.req_ompi;
ompi_request_wait_completion(&ompi_req);

if (MPI_STATUS_IGNORE != status) {
OMPI_COPY_STATUS(status, recvreq->req_base.req_ompi.req_status, false);
Expand Down
6 changes: 4 additions & 2 deletions ompi/mca/pml/ob1/pml_ob1_iprobe.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ int mca_pml_ob1_probe(int src,
MCA_PML_OB1_RECV_REQUEST_INIT(&recvreq, NULL, 0, &ompi_mpi_char.dt, src, tag, comm, false);
MCA_PML_OB1_RECV_REQUEST_START(&recvreq);

ompi_request_wait_completion(&recvreq.req_recv.req_base.req_ompi);
ompi_request_t *ompi_req = &recvreq.req_recv.req_base.req_ompi;
ompi_request_wait_completion(&ompi_req);
rc = recvreq.req_recv.req_base.req_ompi.req_status.MPI_ERROR;
if( MPI_STATUS_IGNORE != status ) {
OMPI_COPY_STATUS(status, recvreq.req_recv.req_base.req_ompi.req_status, false);
Expand Down Expand Up @@ -159,7 +160,8 @@ mca_pml_ob1_mprobe(int src,
src, tag, comm, false);
MCA_PML_OB1_RECV_REQUEST_START(recvreq);

ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi);
ompi_request_t *ompi_req = &recvreq->req_recv.req_base.req_ompi;
ompi_request_wait_completion(&ompi_req);
rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR;
if( MPI_STATUS_IGNORE != status ) {
OMPI_COPY_STATUS(status, recvreq->req_recv.req_base.req_ompi.req_status, false);
Expand Down
12 changes: 8 additions & 4 deletions ompi/mca/pml/ob1/pml_ob1_irecv.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ int mca_pml_ob1_recv(void *addr,
PERUSE_RECV);

MCA_PML_OB1_RECV_REQUEST_START(recvreq);
ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi);
ompi_request_t *ompi_req = &recvreq->req_recv.req_base.req_ompi;
ompi_request_wait_completion(&ompi_req);

if (recvreq->req_recv.req_base.req_pml_complete) {
/* make buffer defined when the request is completed */
Expand All @@ -153,7 +154,8 @@ int mca_pml_ob1_recv(void *addr,
#if OPAL_ENABLE_FT_MPI
if( OPAL_UNLIKELY( MPI_ERR_PROC_FAILED_PENDING == rc )) {
ompi_request_cancel(&recvreq->req_recv.req_base.req_ompi);
ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi);
ompi_request_t *ft_req = &recvreq->req_recv.req_base.req_ompi;
ompi_request_wait_completion(&ft_req);
rc = MPI_ERR_PROC_FAILED;
}
#endif
Expand Down Expand Up @@ -358,7 +360,8 @@ mca_pml_ob1_mrecv( void *buf,

ompi_message_return(*message);
*message = MPI_MESSAGE_NULL;
ompi_request_wait_completion(&(recvreq->req_recv.req_base.req_ompi));
ompi_request_t *ompi_req = &recvreq->req_recv.req_base.req_ompi;
ompi_request_wait_completion(&ompi_req);

MCA_PML_OB1_RECV_FRAG_RETURN(frag);

Expand All @@ -369,7 +372,8 @@ mca_pml_ob1_mrecv( void *buf,
#if OPAL_ENABLE_FT_MPI
if( OPAL_UNLIKELY( MPI_ERR_PROC_FAILED_PENDING == rc )) {
ompi_request_cancel(&recvreq->req_recv.req_base.req_ompi);
ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi);
ompi_request_t *ft_req = &recvreq->req_recv.req_base.req_ompi;
ompi_request_wait_completion(&ft_req);
rc = MPI_ERR_PROC_FAILED;
}
#endif
Expand Down
5 changes: 3 additions & 2 deletions ompi/mca/pml/ob1/pml_ob1_isend.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ int mca_pml_ob1_send(const void *buf,
return rc;
}

ompi_request_wait_completion (brequest);
ompi_request_wait_completion (&brequest);
ompi_request_free (&brequest);
return OMPI_SUCCESS;
}
Expand Down Expand Up @@ -324,7 +324,8 @@ int mca_pml_ob1_send(const void *buf,

MCA_PML_OB1_SEND_REQUEST_START_W_SEQ(sendreq, endpoint, seqn, rc);
if (OPAL_LIKELY(rc == OMPI_SUCCESS)) {
ompi_request_wait_completion(&sendreq->req_send.req_base.req_ompi);
ompi_request_t *ompi_req = &sendreq->req_send.req_base.req_ompi;
ompi_request_wait_completion(&ompi_req);

rc = sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR;
}
Expand Down
21 changes: 18 additions & 3 deletions ompi/request/req_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ int ompi_request_default_wait(
ompi_request_t ** req_ptr,
ompi_status_public_t * status)
{
ompi_request_t *req = *req_ptr;
ompi_request_wait_completion(req_ptr);

ompi_request_wait_completion(req);
ompi_request_t *req = *req_ptr;

/* make sure we get the correct status */
opal_atomic_rmb();
Expand Down Expand Up @@ -78,10 +78,25 @@ int ompi_request_default_wait(
return req->req_status.MPI_ERROR;
}

/* Swap the request with MPI_REQUEST_NULL
* Make sure ompi_request_free is called only once
*/
ompi_request_t *old_req = *req_ptr;
ompi_request_t *new_req = MPI_REQUEST_NULL;

retry:
if (new_req == old_req) {
return OMPI_SUCCESS;
}

if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(req_ptr, &old_req, new_req)) {
goto retry;
}

/* If there's an error while freeing the request, assume that the
request is still there. Otherwise, Bad Things will happen
later! */
return ompi_request_free(req_ptr);
return ompi_request_free(&old_req);
}


Expand Down
78 changes: 60 additions & 18 deletions ompi/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,53 +447,84 @@ static inline bool ompi_request_tag_is_collective(int tag) {
* Wait a particular request for completion
*/

static inline void ompi_request_wait_completion(ompi_request_t *req)
static inline void ompi_request_wait_completion(ompi_request_t **req)
{
if (opal_using_threads ()) {
if(!REQUEST_COMPLETE(req)) {
if(!REQUEST_COMPLETE(*req)) {
void *_tmp_ptr;
ompi_wait_sync_t sync;


#if OPAL_ENABLE_FT_MPI
redo:
if(OPAL_UNLIKELY( ompi_request_is_failed(req) )) {
if(OPAL_UNLIKELY( ompi_request_is_failed(*req) )) {
return;
}
#endif /* OPAL_ENABLE_FT_MPI */
_tmp_ptr = REQUEST_PENDING;

WAIT_SYNC_INIT(&sync, 1);

if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) {
/* CAS sync on the req->req_complete field.
* - If the request is PENDING, this will let know other waiting
* threads and the ompi_request_complete that we are waiting on it.
*
* - If the request is already completed, clean sync and exit
*
* - If another thread is already waiting on this request, stack on it
* We will signal its sync when the request is completed
*/
if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&(*req)->req_complete, &_tmp_ptr, &sync)) {
/* I'm the first one to wait on this request. */
SYNC_WAIT(&sync);
} else {
/* completed before we had a chance to swap in the sync object */
} else if (REQUEST_COMPLETE(*req)) {
/* Completed before we had a chance to swap in the sync object
* Clean sync and exit */
WAIT_SYNC_SIGNALLED(&sync);
} else {
/* Another thread is waiting on the request.
* It's sync is stored in _tmp_ptr */
stack_retry:
/* Try to stack our sync on the request */
if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&(*req)->req_complete, &_tmp_ptr, &sync)) {
/* Successfully stacked on the request */
SYNC_WAIT(&sync);

/* Request is completed, continue unstacking */
wait_sync_update((ompi_wait_sync_t*) _tmp_ptr, 1, sync.status);
} else if (REQUEST_COMPLETE(*req)) {
/* Completed before I could stack on the request.
* Clean sync and exit */
WAIT_SYNC_SIGNALLED(&sync);
} else {
/* Someone else has successfully stacked its sync.
* Retry */
goto stack_retry;
}
}

#if OPAL_ENABLE_FT_MPI
if (OPAL_UNLIKELY(OMPI_SUCCESS != sync.status)) {
OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearming req %p", sync.status, (void*)&sync, (void*)req));
OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearming req %p", sync.status, (void*)&sync, (void*)(*req)));
_tmp_ptr = &sync;
if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, REQUEST_PENDING)) {
opal_output_verbose(10, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearmed req %p", sync.status, (void*)&sync, (void*)req);
if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&(*req)->req_complete, &_tmp_ptr, REQUEST_PENDING)) {
opal_output_verbose(10, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearmed req %p", sync.status, (void*)&sync, (void*)(*req));
WAIT_SYNC_RELEASE(&sync);
goto redo;
}
}
#endif /* OPAL_ENABLE_FT_MPI */
assert(REQUEST_COMPLETE(req));
assert(REQUEST_COMPLETE(*req));
WAIT_SYNC_RELEASE(&sync);
}
opal_atomic_rmb();
}
opal_atomic_rmb();
} else {
while(!REQUEST_COMPLETE(req)) {
while(!REQUEST_COMPLETE(*req)) {
opal_progress();
#if OPAL_ENABLE_FT_MPI
/* Check to make sure that process failure did not break the
* request. */
if(OPAL_UNLIKELY( ompi_request_is_failed(req) )) {
if(OPAL_UNLIKELY( ompi_request_is_failed(*req) )) {
break;
}
#endif /* OPAL_ENABLE_FT_MPI */
Expand Down Expand Up @@ -530,10 +561,21 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa
/* make sure everything in the request is visible before we mark it complete */
opal_atomic_wmb();

ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
REQUEST_COMPLETED);
if( REQUEST_PENDING != tmp_sync ) {
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
void *_tmp_ptr = REQUEST_PENDING;

if(!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_COMPLETED)) {
/* CAS failled: someone is waiting on this request
* Its sync structure is stored in _tmp_ptr
* Mark the request as completed and store the waiting thread sync.
*/
while (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_COMPLETED)) {
}

/* In the case where another thread concurrently changed the request to REQUEST_PENDING or REQUEST_COMPLETED */
if( REQUEST_PENDING != _tmp_ptr
&& REQUEST_COMPLETED != _tmp_ptr) {
wait_sync_update((ompi_wait_sync_t*) _tmp_ptr, 1, request->req_status.MPI_ERROR);
}
}
} else {
request->req_complete = REQUEST_COMPLETED;
Expand Down

0 comments on commit 5f205ef

Please sign in to comment.