From 6b7f4a162eab2f534117e348cd73f99d5ca7c36e Mon Sep 17 00:00:00 2001 From: Howard Pritchard Date: Thu, 10 Feb 2022 08:17:45 -0700 Subject: [PATCH] sessions: add support for ucx more Greatly simplify support for MPI_Comm_create_from_group and MPI_Intercomm_create_from_group by removing the need to support the 128-bit excid notion. Instead, make use of a PMIx capability - PMIX_GROUP_LOCAL_CID and the notion of PMIX_GROUP_INFO. This capability was introduced in Open PMIx 4.1.3. This capability allows us to piggy-back a local cid selected for the new communicator on the PMIx_Group_construct operation. Using this approach, a lot of the complex active message style operations implemented in the OB1 PML to support excids can be avoided. This PR also includes simplifications to the OFI MTL to make use of the PMIX_GROUP_LOCAL_CID feature. Infrastructure for debugging communicator management routines was also introduced, along with a new MCA parameter - mpi_comm_verbose. Related to #12566 Signed-off-by: Howard Pritchard --- ompi/communicator/comm.c | 13 +- ompi/communicator/comm_cid.c | 213 ++++++++++++--- ompi/communicator/comm_init.c | 23 +- ompi/communicator/communicator.h | 36 +++ ompi/mca/mtl/ofi/mtl_ofi.c | 43 +-- ompi/mca/mtl/ofi/mtl_ofi.h | 379 +-------------------------- ompi/mca/pml/ucx/pml_ucx.c | 31 ++- ompi/mca/pml/ucx/pml_ucx_component.c | 4 + ompi/mca/pml/ucx/pml_ucx_request.c | 2 +- ompi/mca/pml/ucx/pml_ucx_request.h | 4 +- ompi/runtime/ompi_mpi_params.c | 7 +- ompi/runtime/params.h | 8 +- 12 files changed, 293 insertions(+), 470 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index d7fb321e3f8..2a9afd352be 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -24,7 +24,7 @@ * Copyright (c) 2015 Mellanox Technologies. All rights reserved. * Copyright (c) 2017-2022 IBM Corporation. All rights reserved. * Copyright (c) 2021 Nanook Consulting. All rights reserved. - * Copyright (c) 2018-2022 Triad National Security, LLC. All rights + * Copyright (c) 2018-2024 Triad National Security, LLC. All rights * reserved. * Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ @@ -1738,7 +1738,7 @@ int ompi_intercomm_create_from_groups (ompi_group_t *local_group, int local_lead ompi_communicator_t **newintercomm) { ompi_communicator_t *newcomp = NULL, *local_comm, *leader_comm = MPI_COMM_NULL; - ompi_comm_extended_cid_block_t new_block; + ompi_comm_extended_cid_block_t new_block = {0}; bool i_am_leader = local_leader == local_group->grp_my_rank; ompi_proc_t **rprocs; uint64_t data[4]; @@ -1864,14 +1864,7 @@ int ompi_intercomm_create_from_groups (ompi_group_t *local_group, int local_lead return rc; } - /* will be using a communicator ID derived from the bridge communicator to save some time */ - new_block.block_cid.cid_base = data[1]; - new_block.block_cid.cid_sub.u64 = data[2]; - new_block.block_nextsub = 0; - new_block.block_nexttag = 0; - new_block.block_level = (int8_t) data[3]; - - rc = ompi_comm_nextcid (newcomp, NULL, NULL, (void *) tag, &new_block, false, OMPI_COMM_CID_GROUP_NEW); + rc = ompi_comm_nextcid (newcomp, NULL, NULL, (void *) tag, NULL, false, OMPI_COMM_CID_GROUP_NEW); if ( OMPI_SUCCESS != rc ) { OBJ_RELEASE(newcomp); return rc; diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index db97f7ea1b8..0475d63b6f4 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -310,21 +310,16 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu const void *arg0, const void *arg1, bool send_first, int mode, ompi_request_t **req) { - pmix_info_t pinfo, *results = NULL; + pmix_info_t *pinfo, *results = NULL; size_t nresults; - opal_process_name_t *name_array = NULL; - char *tag = NULL; - size_t proc_count; - size_t cid_base = 0; + opal_process_name_t opal_proc_name; bool cid_base_set = false; + char *tag = NULL; + size_t proc_count = 0, rproc_count = 0, tproc_count = 0, cid_base = 0UL, ninfo; int rc, leader_rank; - int ret = OMPI_SUCCESS; - pmix_proc_t *procs = NULL; - - rc = ompi_group_to_proc_name_array (newcomm->c_local_group, &name_array, &proc_count); - if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { - return rc; - } + pmix_proc_t *procs; + void *grpinfo = NULL, *list = NULL; + pmix_data_array_t darray; switch (mode) { case OMPI_COMM_CID_GROUP_NEW: @@ -341,15 +336,75 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu break; } - PMIX_INFO_LOAD(&pinfo, PMIX_GROUP_ASSIGN_CONTEXT_ID, NULL, PMIX_BOOL); + grpinfo = PMIx_Info_list_start(); + if (NULL == grpinfo) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + rc = PMIx_Info_list_add(grpinfo, PMIX_GROUP_ASSIGN_CONTEXT_ID, NULL, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__)); + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + list = PMIx_Info_list_start(); + + size_t c_index = (size_t)newcomm->c_index; + rc = PMIx_Info_list_add(list, PMIX_GROUP_LOCAL_CID, &c_index, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__)); + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + rc = PMIx_Info_list_convert(list, &darray); + if (PMIX_SUCCESS != rc) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_convert failed %s %d", PMIx_Error_string(rc), __LINE__)); + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + rc = PMIx_Info_list_add(grpinfo, PMIX_GROUP_INFO, &darray, PMIX_DATA_ARRAY); + PMIX_DATA_ARRAY_DESTRUCT(&darray); + if (PMIX_SUCCESS != rc) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__)); + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + rc = PMIx_Info_list_convert(grpinfo, &darray); + if (PMIX_SUCCESS != rc) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_convert failed %s %d", PMIx_Error_string(rc), __LINE__)); + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + pinfo = (pmix_info_t*)darray.array; + ninfo = darray.size; + + proc_count = newcomm->c_local_group->grp_proc_count; + if ( OMPI_COMM_IS_INTER (newcomm) ){ + rproc_count = newcomm->c_remote_group->grp_proc_count; + } + + PMIX_PROC_CREATE(procs, proc_count + rproc_count); - PMIX_PROC_CREATE(procs, proc_count); for (size_t i = 0 ; i < proc_count; ++i) { - OPAL_PMIX_CONVERT_NAME(&procs[i],&name_array[i]); + opal_proc_name = ompi_group_get_proc_name(newcomm->c_local_group, i); + OPAL_PMIX_CONVERT_NAME(&procs[i],&opal_proc_name); + } + for (size_t i = 0; i < rproc_count; ++i) { + opal_proc_name = ompi_group_get_proc_name(newcomm->c_remote_group, i); + OPAL_PMIX_CONVERT_NAME(&procs[proc_count+i],&opal_proc_name); } - rc = PMIx_Group_construct(tag, procs, proc_count, &pinfo, 1, &results, &nresults); - PMIX_INFO_DESTRUCT(&pinfo); + tproc_count = proc_count + rproc_count; + + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "calling PMIx_Group_construct - tag %s size %ld ninfo %ld cid_base %ld\n", + tag, tproc_count, ninfo, cid_base)); + rc = PMIx_Group_construct(tag, procs, tproc_count, pinfo, ninfo, &results, &nresults); + PMIX_DATA_ARRAY_DESTRUCT(&darray); if(PMIX_SUCCESS != rc) { char msg_string[1024]; switch (rc) { @@ -361,7 +416,7 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu "MPI_Comm_create_from_group/MPI_Intercomm_create_from_groups", msg_string); - ret = MPI_ERR_UNSUPPORTED_OPERATION; + rc = MPI_ERR_UNSUPPORTED_OPERATION; break; case PMIX_ERR_NOT_SUPPORTED: sprintf(msg_string,"PMIx server does not support PMIx Group operations"); @@ -370,10 +425,10 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu true, "MPI_Comm_create_from_group/MPI_Intercomm_create_from_groups", msg_string); - ret = MPI_ERR_UNSUPPORTED_OPERATION; + rc = MPI_ERR_UNSUPPORTED_OPERATION; break; default: - ret = opal_pmix_convert_status(rc); + rc = opal_pmix_convert_status(rc); break; } goto fn_exit; @@ -383,7 +438,7 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu if (PMIX_CHECK_KEY(&results[i], PMIX_GROUP_CONTEXT_ID)) { PMIX_VALUE_GET_NUMBER(rc, &results[i].value, cid_base, size_t); if(PMIX_SUCCESS != rc) { - ret = opal_pmix_convert_status(rc); + rc = opal_pmix_convert_status(rc); goto fn_exit; } cid_base_set = true; @@ -391,15 +446,20 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu } } + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Group_construct - tag %s size %ld ninfo %ld cid_base %ld\n", + tag, tproc_count, ninfo, cid_base)); + + /* destruct the group */ rc = PMIx_Group_destruct (tag, NULL, 0); if(PMIX_SUCCESS != rc) { - ret = opal_pmix_convert_status(rc); + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Group_destruct failed %s", PMIx_Error_string(rc))); + rc = opal_pmix_convert_status(rc); goto fn_exit; } if (!cid_base_set) { opal_show_help("help-comm.txt", "cid-base-not-set", true); - ret = OMPI_ERROR; + rc = OMPI_ERROR; goto fn_exit; } @@ -412,16 +472,19 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu } if(NULL != procs) { - PMIX_PROC_FREE(procs, proc_count); + PMIX_PROC_FREE(procs, tproc_count); procs = NULL; } - if(NULL != name_array) { - free (name_array); - name_array = NULL; + if (NULL != grpinfo) { + PMIx_Info_list_release(grpinfo); } - return ret; + if (NULL != list) { + PMIx_Info_list_release(list); + } + + return rc; } static int ompi_comm_nextcid_ext_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, @@ -446,6 +509,15 @@ static int ompi_comm_nextcid_ext_nb (ompi_communicator_t *newcomm, ompi_communic block = &comm->c_contextidb; } + for (unsigned int i = ompi_mpi_communicators.lowest_free ; i < mca_pml.pml_max_contextid ; ++i) { + bool flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, newcomm); + if (true == flag) { + newcomm->c_index = i; + break; + } + } + assert(newcomm->c_index > 2); + if (NULL == arg1) { if (OMPI_COMM_CID_GROUP == mode || OMPI_COMM_CID_GROUP_NEW == mode || !ompi_comm_extended_cid_block_available (&comm->c_contextidb)) { @@ -468,14 +540,6 @@ static int ompi_comm_nextcid_ext_nb (ompi_communicator_t *newcomm, ompi_communic (void) ompi_comm_extended_cid_block_new (block, &newcomm->c_contextidb, is_new_block); } - for (unsigned int i = ompi_mpi_communicators.lowest_free ; i < mca_pml.pml_max_contextid ; ++i) { - bool flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, newcomm); - if (true == flag) { - newcomm->c_index = i; - break; - } - } - newcomm->c_contextid = newcomm->c_contextidb.block_cid; opal_hash_table_set_value_ptr (&ompi_comm_hash, &newcomm->c_contextid, @@ -502,7 +566,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com functions but the pml does not support these functions so return not supported */ if (NULL == comm) { char msg_string[1024]; - sprintf(msg_string,"The PML being used - %s - does not support MPI sessions related features", + sprintf(msg_string,"The PML being used - %s - does not support MPI sessions related features", mca_pml_base_selected_component.pmlm_version.mca_component_name); opal_show_help("help-comm.txt", "MPI function not supported", @@ -886,6 +950,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; + uint32_t comm_size; int ret = 0; /* the caller should not pass NULL for comm (it may be the same as *newcomm) */ @@ -907,6 +972,25 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c request->context = &context->super; + /* Prep communicator for handling remote cids if needed */ + + if (!OMPI_COMM_IS_GLOBAL_INDEX(*newcomm)) { + if (OMPI_COMM_IS_INTER(*newcomm)) { + comm_size = ompi_comm_remote_size(*newcomm); + } else { + comm_size = ompi_comm_size(*newcomm); + } + + (*newcomm)->c_index_vec = (uint32_t *)calloc(comm_size, sizeof(uint32_t)); + if (NULL == (*newcomm)->c_index_vec) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (OMPI_COMM_IS_INTRA(*newcomm)) { + (*newcomm)->c_index_vec[(*newcomm)->c_my_rank] = (*newcomm)->c_index; + } + } + if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { @@ -963,6 +1047,61 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm return rc; } +int ompi_comm_get_remote_cid_from_pmix (ompi_communicator_t *comm, int dest, uint32_t *remote_cid) +{ + ompi_proc_t *ompi_proc; + pmix_proc_t pmix_proc; + pmix_info_t tinfo[2]; + pmix_value_t *val = NULL; + ompi_comm_extended_cid_t excid; + int rc = OMPI_SUCCESS; + size_t remote_cid64; + + assert(NULL != remote_cid); + + ompi_proc = ompi_comm_peer_lookup(comm, dest); + OPAL_PMIX_CONVERT_NAME(&pmix_proc, &ompi_proc->super.proc_name); + + PMIx_Info_construct(&tinfo[0]); + PMIX_INFO_LOAD(&tinfo[0], PMIX_TIMEOUT, &ompi_pmix_connect_timeout, PMIX_UINT32); + + excid = ompi_comm_get_extended_cid(comm); + + PMIX_INFO_CONSTRUCT(&tinfo[1]); + PMIX_INFO_LOAD(&tinfo[1], PMIX_GROUP_CONTEXT_ID, &excid.cid_base, PMIX_SIZE); + PMIX_INFO_SET_QUALIFIER(&tinfo[1]); + if (PMIX_SUCCESS != (rc = PMIx_Get(&pmix_proc, PMIX_GROUP_LOCAL_CID, tinfo, 2, &val))) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get failed for PMIX_GROUP_LOCAL_CID cid_base %ld %s", excid.cid_base, PMIx_Error_string(rc))); + rc = OMPI_ERR_NOT_FOUND; + goto done; + } + + if (NULL == val) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get failed for PMIX_GROUP_LOCAL_CID val returned NULL")); + rc = OMPI_ERR_NOT_FOUND; + goto done; + } + + if (val->type != PMIX_SIZE) { + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get failed for PMIX_GROUP_LOCAL_CID type mismatch")); + rc = OMPI_ERR_TYPE_MISMATCH; + goto done; + } + + PMIX_VALUE_GET_NUMBER(rc, val, remote_cid64, size_t); + rc = OMPI_SUCCESS; + *remote_cid = (uint32_t)remote_cid64; + comm->c_index_vec[dest] = (uint32_t)remote_cid64; + OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Get PMIX_GROUP_LOCAL_CID %d for cid_base %ld", *remote_cid, excid.cid_base)); + +done: + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + + return rc; +} + static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index a72a6661189..498bf4a1e70 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -23,7 +23,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2015-2019 Intel, Inc. All rights reserved. * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. - * Copyright (c) 2018-2022 Triad National Security, LLC. All rights + * Copyright (c) 2018-2024 Triad National Security, LLC. All rights * reserved. * Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved. * Copyright (c) 2023 NVIDIA Corporation. All rights reserved. @@ -69,6 +69,8 @@ ompi_predefined_communicator_t ompi_mpi_comm_self = {{{{0}}}}; ompi_predefined_communicator_t ompi_mpi_comm_null = {{{{0}}}}; ompi_communicator_t *ompi_mpi_comm_parent = NULL; +int ompi_comm_output = -1; + static bool ompi_comm_intrinsic_init; ompi_predefined_communicator_t *ompi_mpi_comm_world_addr = @@ -97,6 +99,14 @@ static int ompi_comm_finalize (void); */ int ompi_comm_init(void) { + + /* create output stream */ + + if (ompi_comm_output == -1) { + ompi_comm_output = opal_output_open(NULL); + opal_output_set_verbosity(ompi_comm_output, ompi_comm_verbose_level); + } + /* Setup communicator array */ OBJ_CONSTRUCT(&ompi_mpi_communicators, opal_pointer_array_t); if( OPAL_SUCCESS != opal_pointer_array_init(&ompi_mpi_communicators, 16, @@ -392,6 +402,11 @@ static int ompi_comm_finalize (void) /* finalize communicator requests */ ompi_comm_request_fini (); + /* close output stream */ + + opal_output_close(ompi_comm_output); + ompi_comm_output = -1; + /* release a reference to the attributes subsys */ return ompi_attr_put_ref(); } @@ -417,6 +432,7 @@ static void ompi_comm_construct(ompi_communicator_t* comm) comm->c_coll = NULL; comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; comm->instance = NULL; + comm->c_index_vec = NULL; /* * magic numerology - see TOPDIR/ompi/include/mpif-values.pl @@ -518,6 +534,11 @@ static void ompi_comm_destruct(ompi_communicator_t* comm) comm->c_name = NULL; } + if (NULL != comm->c_index_vec) { + free (comm->c_index_vec); + comm->c_index_vec = NULL; + } + #if OPAL_ENABLE_FT_MPI if( NULL != comm->agreement_specific ) { OBJ_RELEASE( comm->agreement_specific ); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 3a230b68025..1714a09befc 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -153,6 +153,8 @@ OMPI_DECLSPEC extern opal_hash_table_t ompi_comm_hash; OMPI_DECLSPEC extern opal_pointer_array_t ompi_mpi_communicators; OMPI_DECLSPEC extern opal_pointer_array_t ompi_comm_f_to_c_table; +OMPI_DECLSPEC extern int ompi_comm_output; + struct ompi_comm_extended_cid_t { uint64_t cid_base; union { @@ -284,6 +286,10 @@ struct ompi_communicator_t { uint32_t c_epoch; /* Identifier used to differentiate between two communicators using the same c_contextid (not at the same time, obviously) */ #endif + /* vector used to store remote cid values for communicators not using + * a global cid, i.e. when OMPI_COMM_IS_GLOBAL_INDEX(comm) returns 0. + */ + uint32_t *c_index_vec; /* Non-blocking collective tag. These tags might be shared between * all non-blocking collective modules (to avoid message collision * between them in the case where multiple outstanding non-blocking @@ -535,6 +541,30 @@ static inline uint32_t ompi_comm_get_local_cid (const ompi_communicator_t* comm) return comm->c_index; } +int ompi_comm_get_remote_cid_from_pmix (ompi_communicator_t *comm, int dest, uint32_t *remote_cid); + +/** + * Get remote cid for the communicator. In the case of communicators created + * using methods that don't supply an input communicator, i.e. + * MPI_Comm_create_from_group, the remote cid may be different from the local cid. + */ +static inline int ompi_comm_get_remote_cid (ompi_communicator_t *comm, int dest, uint32_t *remote_cid) +{ + int rc = OMPI_SUCCESS; + + assert(NULL != remote_cid); + + if (OPAL_LIKELY(OMPI_COMM_IS_GLOBAL_INDEX(comm))) { + *remote_cid = comm->c_index; + } else if (0 != comm->c_index_vec[dest]) { + *remote_cid = comm->c_index_vec[dest]; + } else { + rc = ompi_comm_get_remote_cid_from_pmix(comm, dest, remote_cid); + } + + return rc; +} + /** * Get the extended context ID for the communicator, suitable for passing * to ompi_comm_lookup_cid for getting the communicator back @@ -614,6 +644,12 @@ static inline struct ompi_proc_t* ompi_comm_peer_lookup (const ompi_communicator return ompi_group_peer_lookup(comm->c_remote_group,peer_id); } +static inline bool ompi_comm_instances_same(const ompi_communicator_t *comm1, + const ompi_communicator_t *comm2) +{ + return comm1->instance == comm2->instance; +} + #if OPAL_ENABLE_FT_MPI /* * Support for MPI_ANY_SOURCE point-to-point operations diff --git a/ompi/mca/mtl/ofi/mtl_ofi.c b/ompi/mca/mtl/ofi/mtl_ofi.c index 35cb52443d9..fcabc55baa4 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi.c +++ b/ompi/mca/mtl/ofi/mtl_ofi.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2013-2020 Intel, Inc. All rights reserved. - * Copyright (c) 2021-2022 Triad National Security, LLC. All rights + * Copyright (c) 2021-2024 Triad National Security, LLC. All rights * reserved. * * $COPYRIGHT$ @@ -14,8 +14,6 @@ OMPI_DECLSPEC extern mca_mtl_ofi_component_t mca_mtl_ofi_component; -OBJ_CLASS_INSTANCE(mca_mtl_comm_t, opal_object_t, NULL, NULL); - mca_mtl_ofi_module_t ompi_mtl_ofi = { { (int)((1ULL << MTL_OFI_CID_BIT_COUNT_1) - 1), /* max cid */ @@ -346,43 +344,10 @@ int ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl, struct ompi_communicator_t *comm) { int ret = OMPI_SUCCESS; - uint32_t comm_size; - mca_mtl_comm_t* mtl_comm; mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ? OFI_REGULAR_EP : OFI_SCALABLE_EP; - if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { - mtl_comm = OBJ_NEW(mca_mtl_comm_t); - - if (OMPI_COMM_IS_INTER(comm)) { - comm_size = ompi_comm_remote_size(comm); - } else { - comm_size = ompi_comm_size(comm); - } - mtl_comm->c_index_vec = (c_index_vec_t *)malloc(sizeof(c_index_vec_t) * comm_size); - if (NULL == mtl_comm->c_index_vec) { - ret = OMPI_ERR_OUT_OF_RESOURCE; - OBJ_RELEASE(mtl_comm); - goto error; - } else { - for (uint32_t i=0; i < comm_size; i++) { - mtl_comm->c_index_vec[i].c_index_state = MCA_MTL_OFI_CID_NOT_EXCHANGED; - } - } - if (OMPI_COMM_IS_INTRA(comm)) { - mtl_comm->c_index_vec[comm->c_my_rank].c_index = comm->c_index; - mtl_comm->c_index_vec[comm->c_my_rank].c_index_state = MCA_MTL_OFI_CID_EXCHANGED; - } - - comm->c_mtl_comm = mtl_comm; - - } else { - - comm->c_mtl_comm = NULL; - - } - /* * If thread grouping enabled, add new OFI context for each communicator * other than MPI_COMM_SELF. @@ -412,12 +377,6 @@ int ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl, mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ? OFI_REGULAR_EP : OFI_SCALABLE_EP; - if(NULL != comm->c_mtl_comm) { - free(comm->c_mtl_comm->c_index_vec); - OBJ_RELEASE(comm->c_mtl_comm); - comm->c_mtl_comm = NULL; - } - /* * Clean up OFI contexts information. */ diff --git a/ompi/mca/mtl/ofi/mtl_ofi.h b/ompi/mca/mtl/ofi/mtl_ofi.h index aae756b0518..ee8a24fca69 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi.h +++ b/ompi/mca/mtl/ofi/mtl_ofi.h @@ -77,32 +77,6 @@ int ompi_mtl_ofi_progress_no_inline(void); extern opal_thread_local int ompi_mtl_ofi_per_thread_ctx; #endif -#define MCA_MTL_OFI_CID_NOT_EXCHANGED 2 -#define MCA_MTL_OFI_CID_EXCHANGING 1 -#define MCA_MTL_OFI_CID_EXCHANGED 0 - -typedef struct { - uint32_t c_index:30; - uint32_t c_index_state:2; -} c_index_vec_t; - -typedef struct mca_mtl_comm_t { - opal_object_t super; - c_index_vec_t *c_index_vec; -} mca_mtl_comm_t; - -OBJ_CLASS_DECLARATION(mca_mtl_comm_t); - -struct mca_mtl_ofi_cid_hdr_t { - ompi_comm_extended_cid_t hdr_cid; - int16_t hdr_src_c_index; - int32_t hdr_src; - bool need_response; - bool ofi_cq_data; -}; - -typedef struct mca_mtl_ofi_cid_hdr_t mca_mtl_ofi_cid_hdr_t; - /* Set OFI context for operations which generate completion events */ __opal_attribute_always_inline__ static inline void set_thread_context(int ctxt) @@ -513,135 +487,6 @@ ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id) return ompi_mtl_ofi.comm_to_context[comm_id]; } -__opal_attribute_always_inline__ static inline int -ompi_mtl_ofi_post_recv_excid_buffer(bool blocking, struct ompi_communicator_t *comm, int src); - -__opal_attribute_always_inline__ static inline int -ompi_mtl_ofi_send_excid(struct mca_mtl_base_module_t *mtl, - struct ompi_communicator_t *comm, - int dest, - bool ofi_cq_data, - bool is_send); - -__opal_attribute_always_inline__ static inline int -ompi_mtl_ofi_recv_excid_error_callback(struct fi_cq_err_entry *error, - ompi_mtl_ofi_request_t *ofi_req) -{ - ompi_status_public_t *status; - assert(ofi_req->super.ompi_req); - status = &ofi_req->super.ompi_req->req_status; - status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits); - status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error); - - switch (error->err) { - case FI_ETRUNC: - status->MPI_ERROR = MPI_ERR_TRUNCATE; - break; - case FI_ECANCELED: - status->_cancelled = true; - break; - default: - status->MPI_ERROR = MPI_ERR_INTERN; - } - - ofi_req->super.completion_callback(&ofi_req->super); - return OMPI_SUCCESS; -} - -__opal_attribute_always_inline__ static inline int -ompi_mtl_ofi_post_recv_excid_buffer_callback(struct fi_cq_tagged_entry *wc, - ompi_mtl_ofi_request_t *ofi_req) -{ - ofi_req->completion_count--; - int ret; - mca_mtl_ofi_cid_hdr_t *buffer = (mca_mtl_ofi_cid_hdr_t *)ofi_req->buffer; - ompi_comm_extended_cid_t excid; - ompi_communicator_t *comm; - int src = buffer->hdr_src; - mca_mtl_comm_t *mtl_comm; - - excid.cid_base = buffer->hdr_cid.cid_base; - excid.cid_sub.u64 = buffer->hdr_cid.cid_sub.u64; - for (int i = 0; i < 8; i++) { - excid.cid_sub.u8[i] = buffer->hdr_cid.cid_sub.u8[i]; - } - - comm = ompi_comm_lookup_cid(excid); - if (comm == NULL) { - comm = ompi_comm_lookup(buffer->hdr_src_c_index); - } - - if (comm == NULL) { - return OMPI_SUCCESS; - } - - mtl_comm = comm->c_mtl_comm; - - if (mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED - && buffer->need_response) { - mtl_comm->c_index_vec[src].c_index = buffer->hdr_src_c_index; - mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGED; - ret = ompi_mtl_ofi_send_excid(ofi_req->mtl, comm, src, buffer->ofi_cq_data, false); - } else { - mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGED; - mtl_comm->c_index_vec[src].c_index = buffer->hdr_src_c_index; - } - - ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); - return ret; -} - -__opal_attribute_always_inline__ static inline int -ompi_mtl_ofi_post_recv_excid_buffer(bool blocking, struct ompi_communicator_t *comm, int src) -{ - int ctxt_id = 0; - ssize_t ret; - ompi_mtl_ofi_request_t *ofi_req = malloc(sizeof(ompi_mtl_ofi_request_t)); - mca_mtl_ofi_cid_hdr_t *start = malloc(sizeof(mca_mtl_ofi_cid_hdr_t)); - size_t length = sizeof(mca_mtl_ofi_cid_hdr_t); - mca_mtl_comm_t *mtl_comm; - - mtl_comm = comm->c_mtl_comm; - - set_thread_context(ctxt_id); - - ofi_req->type = OMPI_MTL_OFI_RECV; - ofi_req->event_callback = ompi_mtl_ofi_post_recv_excid_buffer_callback; - ofi_req->error_callback = ompi_mtl_ofi_recv_excid_error_callback; - ofi_req->buffer = start; - ofi_req->length = length; - ofi_req->convertor = NULL; - ofi_req->req_started = false; - ofi_req->status.MPI_ERROR = OMPI_SUCCESS; - ofi_req->remote_addr = 0UL; - ofi_req->match_bits = 0UL; - ofi_req->completion_count = 1; - ofi_req->comm = comm; - - OFI_RETRY_UNTIL_DONE(fi_recv(ompi_mtl_ofi.ofi_ctxt[0].rx_ep, - start, - length, - NULL, - FI_ADDR_UNSPEC, - (void *)&ofi_req->ctx), ret); - if (OPAL_UNLIKELY(0 > ret)) { - if (NULL != ofi_req->buffer) { - free(ofi_req->buffer); - } - MTL_OFI_LOG_FI_ERR(ret, "fi_recv failed"); - return ompi_mtl_ofi_get_error(ret); - } - - if (blocking) { - assert(src != -1); - while (mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { - ompi_mtl_ofi_progress(); - } - } - - return OMPI_SUCCESS; -} - __opal_attribute_always_inline__ static inline int ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req, struct ompi_communicator_t *comm, @@ -692,139 +537,6 @@ ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req, return OMPI_SUCCESS; } -/* - * this routine is invoked in the case of communicators which are not using a - * global cid, i.e. those created using MPI_Comm_create_from_group/ - * MPI_Intercomm_create_from_groups in order to exchange the local cid used - * by the sender for this supplied communicator. This function is only invoked - * for the first message sent to a given receiver. - */ -static int -ompi_mtl_ofi_send_excid(struct mca_mtl_base_module_t *mtl, - struct ompi_communicator_t *comm, - int dest, - bool ofi_cq_data, - bool is_send) -{ - ssize_t ret = OMPI_SUCCESS; - ompi_mtl_ofi_request_t *ofi_req = NULL; - int ctxt_id = 0; - mca_mtl_ofi_cid_hdr_t *start = NULL; - ompi_proc_t *ompi_proc = NULL; - mca_mtl_ofi_endpoint_t *endpoint = NULL; - fi_addr_t sep_peer_fiaddr = 0; - mca_mtl_comm_t *mtl_comm; - - ofi_req = (ompi_mtl_ofi_request_t *)malloc(sizeof(ompi_mtl_ofi_request_t)); - if (NULL == ofi_req) { - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto fn_exit; - } - - start = (mca_mtl_ofi_cid_hdr_t *)malloc(sizeof(mca_mtl_ofi_cid_hdr_t)); - if (NULL == start) { - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto fn_exit; - } - - mtl_comm = comm->c_mtl_comm; - - ctxt_id = 0; - set_thread_context(ctxt_id); - - /** - * Create a send request, start it and wait until it completes. - */ - ofi_req->type = OMPI_MTL_OFI_SEND; - ofi_req->event_callback = ompi_mtl_ofi_send_excid_callback; - ofi_req->error_callback = ompi_mtl_ofi_send_error_callback; - ofi_req->buffer = start; - - ompi_proc = ompi_comm_peer_lookup(comm, dest); - endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc); - - /* For Scalable Endpoints, gather target receive context */ - sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits); - - start->hdr_cid = comm->c_contextid; - start->hdr_src = comm->c_my_rank; - start->hdr_src_c_index = comm->c_index; - start->ofi_cq_data = ofi_cq_data; - if (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { - start->need_response = true; - } else { - start->need_response = false; - } - size_t length = sizeof(mca_mtl_ofi_cid_hdr_t); - - ofi_req->length = length; - ofi_req->status.MPI_ERROR = OMPI_SUCCESS; - ofi_req->completion_count = 0; - if (OPAL_UNLIKELY(length > endpoint->mtl_ofi_module->max_msg_size)) { - opal_show_help("help-mtl-ofi.txt", - "message too big", false, - length, endpoint->mtl_ofi_module->max_msg_size); - ret = OMPI_ERROR; - goto fn_exit; - } - - if (ompi_mtl_ofi.max_inject_size >= length) { - if (ofi_cq_data) { - OFI_RETRY_UNTIL_DONE(fi_injectdata(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, - start, - length, - comm->c_my_rank, - sep_peer_fiaddr), ret); - } else { - OFI_RETRY_UNTIL_DONE(fi_inject(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, - start, - length, - sep_peer_fiaddr), ret); - } - if (OPAL_UNLIKELY(0 > ret)) { - MTL_OFI_LOG_FI_ERR(ret, - ofi_cq_data ? "fi_injectdata failed" - : "fi_inject failed"); - - } - } else { - ofi_req->completion_count = 1; - if (ofi_cq_data) { - OFI_RETRY_UNTIL_DONE(fi_senddata(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, - start, - length, - NULL, - comm->c_my_rank, - sep_peer_fiaddr, - (void *) &ofi_req->ctx), ret); - } else { - OFI_RETRY_UNTIL_DONE(fi_send(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, - start, - length, - NULL, - sep_peer_fiaddr, - (void *) &ofi_req->ctx), ret); - } - if (OPAL_UNLIKELY(0 > ret)) { - MTL_OFI_LOG_FI_ERR(ret, - ofi_cq_data ? "fi_tsenddata failed" - : "fi_tsend failed"); - } - } - - ret = ompi_mtl_ofi_get_error(ret); - ofi_req->status.MPI_ERROR = ret; - -fn_exit: - - if ((OMPI_SUCCESS != ret) || (ofi_req->completion_count == 0)) { - if (NULL != ofi_req) free(ofi_req); - if (NULL != start) free(start); - } - - return ret; -} - __opal_attribute_always_inline__ static inline int ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl, struct ompi_communicator_t *comm, @@ -836,7 +548,8 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl, { ssize_t ret = OMPI_SUCCESS; ompi_mtl_ofi_request_t ofi_req; - int ompi_ret, ctxt_id = 0, c_index_for_tag; + int ompi_ret, ctxt_id = 0; + uint32_t c_index_for_tag; void *start; bool free_after; size_t length; @@ -846,29 +559,10 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl, ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */ fi_addr_t src_addr = 0; fi_addr_t sep_peer_fiaddr = 0; - mca_mtl_comm_t *mtl_comm; - - if (OPAL_LIKELY(OMPI_COMM_IS_GLOBAL_INDEX(comm))) { - c_index_for_tag = comm->c_index; - } else { - mtl_comm = comm->c_mtl_comm; - if (mtl_comm->c_index_vec[dest].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { - mtl_comm->c_index_vec[dest].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; - ompi_ret = ompi_mtl_ofi_send_excid(mtl, comm, dest, ofi_cq_data, true); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { - return ompi_ret; - } - } - if (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { - while (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { - ompi_ret = ompi_mtl_ofi_post_recv_excid_buffer(true, comm, dest); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { - return ompi_ret; - } - } - } - c_index_for_tag = mtl_comm->c_index_vec[dest].c_index; + ompi_ret = ompi_comm_get_remote_cid(comm, dest, &c_index_for_tag); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { + return ompi_ret; } ompi_mtl_ofi_set_mr_null(&ofi_req); @@ -1096,7 +790,8 @@ ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl, { ssize_t ret = OMPI_SUCCESS; ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request; - int ompi_ret, ctxt_id = 0, c_index_for_tag; + int ompi_ret, ctxt_id = 0; + uint32_t c_index_for_tag; void *start; size_t length; bool free_after; @@ -1105,24 +800,12 @@ ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl, mca_mtl_ofi_endpoint_t *endpoint = NULL; ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */ fi_addr_t sep_peer_fiaddr = 0; - mca_mtl_comm_t *mtl_comm; ompi_mtl_ofi_set_mr_null(ofi_req); - if (OMPI_COMM_IS_GLOBAL_INDEX(comm)) { - c_index_for_tag = comm->c_index; - } else { - mtl_comm = comm->c_mtl_comm; - if (mtl_comm->c_index_vec[dest].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { - mtl_comm->c_index_vec[dest].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; - ompi_ret = ompi_mtl_ofi_send_excid(mtl, comm, dest, ofi_cq_data, true); - } - if (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { - while (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { - ompi_ret = ompi_mtl_ofi_post_recv_excid_buffer(true, comm, dest); - } - } - c_index_for_tag = mtl_comm->c_index_vec[dest].c_index; + ompi_ret = ompi_comm_get_remote_cid(comm, dest, &c_index_for_tag); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { + return ompi_ret; } if (ompi_mtl_ofi.total_ctxts_used > 0) { @@ -1373,23 +1056,9 @@ ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t *mtl, void *start; size_t length; bool free_after; - mca_mtl_comm_t *mtl_comm; ompi_mtl_ofi_set_mr_null(ofi_req); - if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { - mtl_comm = comm->c_mtl_comm; - if ((src == MPI_ANY_SOURCE || mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) && - !ompi_mtl_ofi.has_posted_initial_buffer) { - ompi_mtl_ofi.has_posted_initial_buffer = true; - ompi_ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); - } - if (src >= 0 && mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { - mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; - ompi_ret = ompi_mtl_ofi_send_excid(mtl, comm, src, ofi_cq_data, false); - } - } - if (ompi_mtl_ofi.total_ctxts_used > 0) { ctxt_id = comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used; } else { @@ -1661,20 +1330,6 @@ ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t *mtl, struct fi_msg_tagged msg; uint64_t msgflags = FI_PEEK | FI_COMPLETION; int ctxt_id = 0; - mca_mtl_comm_t *mtl_comm; - - if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { - mtl_comm = comm->c_mtl_comm; - if ((src == MPI_ANY_SOURCE || mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) && - !ompi_mtl_ofi.has_posted_initial_buffer) { - ompi_mtl_ofi.has_posted_initial_buffer = true; - ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); - } - if (src >= 0 && mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { - mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; - ret = ompi_mtl_ofi_send_excid(mtl, comm, src, ofi_cq_data, false); - } - } if (ompi_mtl_ofi.total_ctxts_used > 0) { ctxt_id = comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used; @@ -1761,20 +1416,6 @@ ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t *mtl, struct fi_msg_tagged msg; uint64_t msgflags = FI_PEEK | FI_CLAIM | FI_COMPLETION; int ctxt_id = 0; - mca_mtl_comm_t *mtl_comm; - - if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { - mtl_comm = comm->c_mtl_comm; - if ((src == MPI_ANY_SOURCE || mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) - && !ompi_mtl_ofi.has_posted_initial_buffer) { - ompi_mtl_ofi.has_posted_initial_buffer = true; - ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); - } - if (src >= 0 && mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { - mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; - ret = ompi_mtl_ofi_send_excid(mtl, comm, src, ofi_cq_data, false); - } - } if (ompi_mtl_ofi.total_ctxts_used > 0) { ctxt_id = comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used; diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index dd16a27b154..c748b02e12f 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -739,6 +739,8 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat struct ompi_communicator_t* comm, struct ompi_request_t **request) { + int rc; + uint32_t cid; mca_pml_ucx_persistent_request_t *req; ucp_ep_h ep; @@ -755,12 +757,17 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat return OMPI_ERROR; } + rc = ompi_comm_get_remote_cid(comm, dst, &cid); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + req->ompi.req_state = OMPI_REQUEST_INACTIVE; req->ompi.req_mpi_object.comm = comm; req->flags = MCA_PML_UCX_REQUEST_FLAG_SEND; req->buffer = (void *)buf; req->count = count; - req->tag = PML_UCX_MAKE_SEND_TAG(tag, comm); + req->tag = PML_UCX_MAKE_SEND_TAG(tag, comm, cid); req->send.mode = mode; req->send.ep = ep; req->ompi_datatype = datatype; @@ -885,7 +892,9 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, struct ompi_communicator_t* comm, struct ompi_request_t **request) { + int rc; ompi_request_t *req; + uint32_t cid; ucp_ep_h ep; PML_UCX_TRACE_SEND("i%ssend request *%p", @@ -897,15 +906,18 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, if (OPAL_UNLIKELY(NULL == ep)) { return OMPI_ERROR; } - + rc = ompi_comm_get_remote_cid(comm, dst, &cid); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } #if HAVE_DECL_UCP_TAG_SEND_NBX req = (ompi_request_t*)mca_pml_ucx_common_send_nbx(ep, buf, count, datatype, - PML_UCX_MAKE_SEND_TAG(tag, comm), mode, + PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode, &mca_pml_ucx_get_op_data(datatype)->op_param.isend); #else req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype, mca_pml_ucx_get_datatype(datatype), - PML_UCX_MAKE_SEND_TAG(tag, comm), mode, + PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode, mca_pml_ucx_send_completion); #endif @@ -1002,7 +1014,9 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i int tag, mca_pml_base_send_mode_t mode, struct ompi_communicator_t* comm) { + int rc; ucp_ep_h ep; + uint32_t cid; PML_UCX_TRACE_SEND("%s", buf, count, datatype, dst, tag, mode, comm, mode == MCA_PML_BASE_SEND_BUFFERED ? "bsend" : "send"); @@ -1019,17 +1033,22 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i OMPI_SPC_BYTES_SENT_USER, OMPI_SPC_BYTES_SENT_MPI); #endif + rc = ompi_comm_get_remote_cid(comm, dst, &cid); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + #if HAVE_DECL_UCP_TAG_SEND_NBR if (OPAL_LIKELY((MCA_PML_BASE_SEND_BUFFERED != mode) && (MCA_PML_BASE_SEND_SYNCHRONOUS != mode))) { return mca_pml_ucx_send_nbr(ep, buf, count, datatype, - PML_UCX_MAKE_SEND_TAG(tag, comm)); + PML_UCX_MAKE_SEND_TAG(tag, comm, cid)); } #endif return mca_pml_ucx_send_nb(ep, buf, count, datatype, mca_pml_ucx_get_datatype(datatype), - PML_UCX_MAKE_SEND_TAG(tag, comm), mode); + PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode); } int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm, diff --git a/ompi/mca/pml/ucx/pml_ucx_component.c b/ompi/mca/pml/ucx/pml_ucx_component.c index 5639e2b1f34..ec095e19fef 100644 --- a/ompi/mca/pml/ucx/pml_ucx_component.c +++ b/ompi/mca/pml/ucx/pml_ucx_component.c @@ -145,6 +145,10 @@ mca_pml_ucx_component_init(int* priority, bool enable_progress_threads, *priority = (support_level == OPAL_COMMON_UCX_SUPPORT_DEVICE) ? ompi_pml_ucx.priority : 19; PML_UCX_VERBOSE(2, "returning priority %d", *priority); + + /** this pml supports the extended CID space */ + ompi_pml_ucx.super.pml_flags |= MCA_PML_BASE_FLAG_SUPPORTS_EXT_CID; + return &ompi_pml_ucx.super; } diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c index fccb9f6a6f6..1a8d0dbc043 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.c +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -282,7 +282,7 @@ void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req) mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE, mca_pml_completed_request_free, mca_pml_completed_request_cancel); - ompi_req->req_mpi_object.comm = &ompi_mpi_comm_world.comm; + ompi_req->req_mpi_object.comm = &ompi_mpi_comm_null.comm; ompi_request_complete(ompi_req, false); } diff --git a/ompi/mca/pml/ucx/pml_ucx_request.h b/ompi/mca/pml/ucx/pml_ucx_request.h index 8132f6b54ba..9e901794f8d 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.h +++ b/ompi/mca/pml/ucx/pml_ucx_request.h @@ -43,10 +43,10 @@ enum { #define PML_UCX_TAG_MASK 0x7fffff0000000000ul -#define PML_UCX_MAKE_SEND_TAG(_tag, _comm) \ +#define PML_UCX_MAKE_SEND_TAG(_tag, _comm, _c_index) \ ((((uint64_t) (_tag) ) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS)) | \ (((uint64_t)(_comm)->c_my_rank ) << PML_UCX_CONTEXT_BITS) | \ - ((uint64_t)(_comm)->c_index)) + ((uint64_t)(_c_index))) #define PML_UCX_MAKE_RECV_TAG(_ucp_tag, _ucp_tag_mask, _tag, _src, _comm) \ diff --git a/ompi/runtime/ompi_mpi_params.c b/ompi/runtime/ompi_mpi_params.c index c31e47e4af8..cbc82f42f23 100644 --- a/ompi/runtime/ompi_mpi_params.c +++ b/ompi/runtime/ompi_mpi_params.c @@ -20,7 +20,7 @@ * All rights reserved. * Copyright (c) 2016-2021 Research Organization for Information Science * and Technology (RIST). All rights reserved. - * Copyright (c) 2018-2021 Triad National Security, LLC. All rights + * Copyright (c) 2018-2024 Triad National Security, LLC. All rights * reserved. * Copyright (c) 2021 Nanook Consulting. All rights reserved. * Copyright (c) 2022 IBM Corporation. All rights reserved. @@ -104,6 +104,7 @@ bool ompi_ftmpi_enabled = false; #endif /* OPAL_ENABLE_FT_MPI */ static int ompi_stream_buffering_mode = -1; +int ompi_comm_verbose_level = 0; int ompi_mpi_register_params(void) { @@ -445,6 +446,10 @@ int ompi_mpi_register_params(void) } #endif /* OPAL_ENABLE_FT_MPI */ + (void) mca_base_var_register ("ompi", "mpi", "comm", "verbose", + "Verbosity level for communicator management subsystem", + MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, + OPAL_INFO_LVL_8, MCA_BASE_VAR_SCOPE_LOCAL, &ompi_comm_verbose_level); return OMPI_SUCCESS; } diff --git a/ompi/runtime/params.h b/ompi/runtime/params.h index d9f48f80b59..db4e9043d7b 100644 --- a/ompi/runtime/params.h +++ b/ompi/runtime/params.h @@ -16,7 +16,7 @@ * Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013 NVIDIA Corporation. All rights reserved. * Copyright (c) 2013 Intel, Inc. All rights reserved - * Copyright (c) 2018-2021 Triad National Security, LLC. All rights + * Copyright (c) 2018-2024 Triad National Security, LLC. All rights * reserved. * Copyright (c) 2021 Nanook Consulting. All rights reserved. * $COPYRIGHT$ @@ -191,6 +191,12 @@ OMPI_DECLSPEC extern bool ompi_enable_timing; OMPI_DECLSPEC extern int ompi_mpi_event_tick_rate; OMPI_DECLSPEC extern bool ompi_mpi_yield_when_idle; + /** + * An integer value specifying verbosity level for communicator management + * subsystem. + */ +OMPI_DECLSPEC extern int ompi_comm_verbose_level; + /** * Register MCA parameters used by the MPI layer. *