From 5f205ef18b4b7e3f0f99646c091f4acd0e7845d0 Mon Sep 17 00:00:00 2001 From: "GERMAIN, FLORENT" Date: Wed, 4 Oct 2023 13:40:35 +0200 Subject: [PATCH] Fix multithread request wait Signed-off-by: Florent Germain --- ompi/communicator/comm_cid.c | 4 +- .../ftagree/coll_ftagree_earlyreturning.c | 2 +- ompi/mca/pml/cm/pml_cm.h | 6 +- ompi/mca/pml/ob1/pml_ob1_iprobe.c | 6 +- ompi/mca/pml/ob1/pml_ob1_irecv.c | 12 ++- ompi/mca/pml/ob1/pml_ob1_isend.c | 5 +- ompi/request/req_wait.c | 21 ++++- ompi/request/request.h | 78 ++++++++++++++----- 8 files changed, 100 insertions(+), 34 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 07970e8354f..20a592dd685 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -544,7 +544,7 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, } if (&ompi_request_empty != req) { - ompi_request_wait_completion (req); + ompi_request_wait_completion (&req); rc = req->req_status.MPI_ERROR; ompi_comm_request_return ((ompi_comm_request_t *) req); } @@ -909,7 +909,7 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm } if (&ompi_request_empty != req) { - ompi_request_wait_completion (req); + ompi_request_wait_completion (&req); rc = req->req_status.MPI_ERROR; ompi_comm_request_return ((ompi_comm_request_t *) req); } diff --git a/ompi/mca/coll/ftagree/coll_ftagree_earlyreturning.c b/ompi/mca/coll/ftagree/coll_ftagree_earlyreturning.c index ea363b2ef53..a108e3658a8 100644 --- a/ompi/mca/coll/ftagree/coll_ftagree_earlyreturning.c +++ b/ompi/mca/coll/ftagree/coll_ftagree_earlyreturning.c @@ -3200,7 +3200,7 @@ int mca_coll_ftagree_era_intra(void *contrib, rc = mca_coll_ftagree_iera_intra(contrib, dt_count, dt, op, group, grp_update, comm, &req, module); if(OPAL_UNLIKELY( OMPI_SUCCESS != rc )) return rc; - ompi_request_wait_completion(req); + ompi_request_wait_completion(&req); rc = req->req_status.MPI_ERROR; ompi_request_free(&req); return rc; diff --git a/ompi/mca/pml/cm/pml_cm.h b/ompi/mca/pml/cm/pml_cm.h index cee5b05f764..93707a29b1f 100644 --- a/ompi/mca/pml/cm/pml_cm.h +++ b/ompi/mca/pml/cm/pml_cm.h @@ -204,7 +204,8 @@ mca_pml_cm_recv(void *addr, return ret; } - ompi_request_wait_completion(&req.req_ompi); + ompi_request_t *ompi_req = &req.req_ompi; + ompi_request_wait_completion(&ompi_req); if (MPI_STATUS_IGNORE != status) { OMPI_COPY_STATUS(status, req.req_ompi.req_status, false); @@ -533,7 +534,8 @@ mca_pml_cm_mrecv(void *buf, return ret; } - ompi_request_wait_completion(&recvreq->req_base.req_ompi); + ompi_request_t *ompi_req = &recvreq->req_base.req_ompi; + ompi_request_wait_completion(&ompi_req); if (MPI_STATUS_IGNORE != status) { OMPI_COPY_STATUS(status, recvreq->req_base.req_ompi.req_status, false); diff --git a/ompi/mca/pml/ob1/pml_ob1_iprobe.c b/ompi/mca/pml/ob1/pml_ob1_iprobe.c index 4d6a0eb8dfd..e0694739ed1 100644 --- a/ompi/mca/pml/ob1/pml_ob1_iprobe.c +++ b/ompi/mca/pml/ob1/pml_ob1_iprobe.c @@ -69,7 +69,8 @@ int mca_pml_ob1_probe(int src, MCA_PML_OB1_RECV_REQUEST_INIT(&recvreq, NULL, 0, &ompi_mpi_char.dt, src, tag, comm, false); MCA_PML_OB1_RECV_REQUEST_START(&recvreq); - ompi_request_wait_completion(&recvreq.req_recv.req_base.req_ompi); + ompi_request_t *ompi_req = &recvreq.req_recv.req_base.req_ompi; + ompi_request_wait_completion(&ompi_req); rc = recvreq.req_recv.req_base.req_ompi.req_status.MPI_ERROR; if( MPI_STATUS_IGNORE != status ) { OMPI_COPY_STATUS(status, recvreq.req_recv.req_base.req_ompi.req_status, false); @@ -159,7 +160,8 @@ mca_pml_ob1_mprobe(int src, src, tag, comm, false); MCA_PML_OB1_RECV_REQUEST_START(recvreq); - ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi); + ompi_request_t *ompi_req = &recvreq->req_recv.req_base.req_ompi; + ompi_request_wait_completion(&ompi_req); rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR; if( MPI_STATUS_IGNORE != status ) { OMPI_COPY_STATUS(status, recvreq->req_recv.req_base.req_ompi.req_status, false); diff --git a/ompi/mca/pml/ob1/pml_ob1_irecv.c b/ompi/mca/pml/ob1/pml_ob1_irecv.c index 4ccb8ea00f2..f5393907e02 100644 --- a/ompi/mca/pml/ob1/pml_ob1_irecv.c +++ b/ompi/mca/pml/ob1/pml_ob1_irecv.c @@ -133,7 +133,8 @@ int mca_pml_ob1_recv(void *addr, PERUSE_RECV); MCA_PML_OB1_RECV_REQUEST_START(recvreq); - ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi); + ompi_request_t *ompi_req = &recvreq->req_recv.req_base.req_ompi; + ompi_request_wait_completion(&ompi_req); if (recvreq->req_recv.req_base.req_pml_complete) { /* make buffer defined when the request is completed */ @@ -153,7 +154,8 @@ int mca_pml_ob1_recv(void *addr, #if OPAL_ENABLE_FT_MPI if( OPAL_UNLIKELY( MPI_ERR_PROC_FAILED_PENDING == rc )) { ompi_request_cancel(&recvreq->req_recv.req_base.req_ompi); - ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi); + ompi_request_t *ft_req = &recvreq->req_recv.req_base.req_ompi; + ompi_request_wait_completion(&ft_req); rc = MPI_ERR_PROC_FAILED; } #endif @@ -358,7 +360,8 @@ mca_pml_ob1_mrecv( void *buf, ompi_message_return(*message); *message = MPI_MESSAGE_NULL; - ompi_request_wait_completion(&(recvreq->req_recv.req_base.req_ompi)); + ompi_request_t *ompi_req = &recvreq->req_recv.req_base.req_ompi; + ompi_request_wait_completion(&ompi_req); MCA_PML_OB1_RECV_FRAG_RETURN(frag); @@ -369,7 +372,8 @@ mca_pml_ob1_mrecv( void *buf, #if OPAL_ENABLE_FT_MPI if( OPAL_UNLIKELY( MPI_ERR_PROC_FAILED_PENDING == rc )) { ompi_request_cancel(&recvreq->req_recv.req_base.req_ompi); - ompi_request_wait_completion(&recvreq->req_recv.req_base.req_ompi); + ompi_request_t *ft_req = &recvreq->req_recv.req_base.req_ompi; + ompi_request_wait_completion(&ft_req); rc = MPI_ERR_PROC_FAILED; } #endif diff --git a/ompi/mca/pml/ob1/pml_ob1_isend.c b/ompi/mca/pml/ob1/pml_ob1_isend.c index 8c9334764d7..1d84f198464 100644 --- a/ompi/mca/pml/ob1/pml_ob1_isend.c +++ b/ompi/mca/pml/ob1/pml_ob1_isend.c @@ -279,7 +279,7 @@ int mca_pml_ob1_send(const void *buf, return rc; } - ompi_request_wait_completion (brequest); + ompi_request_wait_completion (&brequest); ompi_request_free (&brequest); return OMPI_SUCCESS; } @@ -324,7 +324,8 @@ int mca_pml_ob1_send(const void *buf, MCA_PML_OB1_SEND_REQUEST_START_W_SEQ(sendreq, endpoint, seqn, rc); if (OPAL_LIKELY(rc == OMPI_SUCCESS)) { - ompi_request_wait_completion(&sendreq->req_send.req_base.req_ompi); + ompi_request_t *ompi_req = &sendreq->req_send.req_base.req_ompi; + ompi_request_wait_completion(&ompi_req); rc = sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR; } diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index d66eccf9f79..8daee5d02ae 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -35,9 +35,9 @@ int ompi_request_default_wait( ompi_request_t ** req_ptr, ompi_status_public_t * status) { - ompi_request_t *req = *req_ptr; + ompi_request_wait_completion(req_ptr); - ompi_request_wait_completion(req); + ompi_request_t *req = *req_ptr; /* make sure we get the correct status */ opal_atomic_rmb(); @@ -78,10 +78,25 @@ int ompi_request_default_wait( return req->req_status.MPI_ERROR; } + /* Swap the request with MPI_REQUEST_NULL + * Make sure ompi_request_free is called only once + */ + ompi_request_t *old_req = *req_ptr; + ompi_request_t *new_req = MPI_REQUEST_NULL; + +retry: + if (new_req == old_req) { + return OMPI_SUCCESS; + } + + if (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(req_ptr, &old_req, new_req)) { + goto retry; + } + /* If there's an error while freeing the request, assume that the request is still there. Otherwise, Bad Things will happen later! */ - return ompi_request_free(req_ptr); + return ompi_request_free(&old_req); } diff --git a/ompi/request/request.h b/ompi/request/request.h index e2e62a2d7ad..de8454c24d1 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -447,17 +447,17 @@ static inline bool ompi_request_tag_is_collective(int tag) { * Wait a particular request for completion */ -static inline void ompi_request_wait_completion(ompi_request_t *req) +static inline void ompi_request_wait_completion(ompi_request_t **req) { if (opal_using_threads ()) { - if(!REQUEST_COMPLETE(req)) { + if(!REQUEST_COMPLETE(*req)) { void *_tmp_ptr; ompi_wait_sync_t sync; #if OPAL_ENABLE_FT_MPI redo: - if(OPAL_UNLIKELY( ompi_request_is_failed(req) )) { + if(OPAL_UNLIKELY( ompi_request_is_failed(*req) )) { return; } #endif /* OPAL_ENABLE_FT_MPI */ @@ -465,35 +465,66 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) WAIT_SYNC_INIT(&sync, 1); - if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) { + /* CAS sync on the req->req_complete field. + * - If the request is PENDING, this will let know other waiting + * threads and the ompi_request_complete that we are waiting on it. + * + * - If the request is already completed, clean sync and exit + * + * - If another thread is already waiting on this request, stack on it + * We will signal its sync when the request is completed + */ + if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&(*req)->req_complete, &_tmp_ptr, &sync)) { + /* I'm the first one to wait on this request. */ SYNC_WAIT(&sync); - } else { - /* completed before we had a chance to swap in the sync object */ + } else if (REQUEST_COMPLETE(*req)) { + /* Completed before we had a chance to swap in the sync object + * Clean sync and exit */ WAIT_SYNC_SIGNALLED(&sync); + } else { + /* Another thread is waiting on the request. + * It's sync is stored in _tmp_ptr */ +stack_retry: + /* Try to stack our sync on the request */ + if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&(*req)->req_complete, &_tmp_ptr, &sync)) { + /* Successfully stacked on the request */ + SYNC_WAIT(&sync); + + /* Request is completed, continue unstacking */ + wait_sync_update((ompi_wait_sync_t*) _tmp_ptr, 1, sync.status); + } else if (REQUEST_COMPLETE(*req)) { + /* Completed before I could stack on the request. + * Clean sync and exit */ + WAIT_SYNC_SIGNALLED(&sync); + } else { + /* Someone else has successfully stacked its sync. + * Retry */ + goto stack_retry; + } } #if OPAL_ENABLE_FT_MPI if (OPAL_UNLIKELY(OMPI_SUCCESS != sync.status)) { - OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearming req %p", sync.status, (void*)&sync, (void*)req)); + OPAL_OUTPUT_VERBOSE((50, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearming req %p", sync.status, (void*)&sync, (void*)(*req))); _tmp_ptr = &sync; - if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, REQUEST_PENDING)) { - opal_output_verbose(10, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearmed req %p", sync.status, (void*)&sync, (void*)req); + if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&(*req)->req_complete, &_tmp_ptr, REQUEST_PENDING)) { + opal_output_verbose(10, ompi_ftmpi_output_handle, "Status %d reported for sync %p rearmed req %p", sync.status, (void*)&sync, (void*)(*req)); WAIT_SYNC_RELEASE(&sync); goto redo; } } #endif /* OPAL_ENABLE_FT_MPI */ - assert(REQUEST_COMPLETE(req)); + assert(REQUEST_COMPLETE(*req)); WAIT_SYNC_RELEASE(&sync); - } - opal_atomic_rmb(); + } + opal_atomic_rmb(); } else { - while(!REQUEST_COMPLETE(req)) { + while(!REQUEST_COMPLETE(*req)) { opal_progress(); #if OPAL_ENABLE_FT_MPI /* Check to make sure that process failure did not break the * request. */ - if(OPAL_UNLIKELY( ompi_request_is_failed(req) )) { + if(OPAL_UNLIKELY( ompi_request_is_failed(*req) )) { break; } #endif /* OPAL_ENABLE_FT_MPI */ @@ -530,10 +561,21 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa /* make sure everything in the request is visible before we mark it complete */ opal_atomic_wmb(); - ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete, - REQUEST_COMPLETED); - if( REQUEST_PENDING != tmp_sync ) { - wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR); + void *_tmp_ptr = REQUEST_PENDING; + + if(!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_COMPLETED)) { + /* CAS failled: someone is waiting on this request + * Its sync structure is stored in _tmp_ptr + * Mark the request as completed and store the waiting thread sync. + */ + while (!OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&request->req_complete, &_tmp_ptr, REQUEST_COMPLETED)) { + } + + /* In the case where another thread concurrently changed the request to REQUEST_PENDING or REQUEST_COMPLETED */ + if( REQUEST_PENDING != _tmp_ptr + && REQUEST_COMPLETED != _tmp_ptr) { + wait_sync_update((ompi_wait_sync_t*) _tmp_ptr, 1, request->req_status.MPI_ERROR); + } } } else { request->req_complete = REQUEST_COMPLETED;