From ea008e1587abf52c00b6146a5146119da0afd091 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Sun, 29 Dec 2024 19:03:03 -0600 Subject: [PATCH] coll/accelerator: add bcast,allgather,alltoall add support for bcast, allgather and alltoall for device buffers using a temporary buffer on the CPU. The maximum msg length for each operation for which to use this approach can be controlled through an mca parameter. Note, for allgather and alltoall, the parameter represents the product of communicator size * msg length per proc. Signed-off-by: Edgar Gabriel --- ompi/mca/coll/accelerator/Makefile.am | 5 +- ompi/mca/coll/accelerator/coll_accelerator.h | 27 +++++ .../accelerator/coll_accelerator_allgather.c | 100 ++++++++++++++++++ .../accelerator/coll_accelerator_alltoall.c | 99 +++++++++++++++++ .../coll/accelerator/coll_accelerator_bcast.c | 78 ++++++++++++++ .../accelerator/coll_accelerator_component.c | 32 ++++++ .../accelerator/coll_accelerator_module.c | 9 ++ 7 files changed, 348 insertions(+), 2 deletions(-) create mode 100644 ompi/mca/coll/accelerator/coll_accelerator_allgather.c create mode 100644 ompi/mca/coll/accelerator/coll_accelerator_alltoall.c create mode 100644 ompi/mca/coll/accelerator/coll_accelerator_bcast.c diff --git a/ompi/mca/coll/accelerator/Makefile.am b/ompi/mca/coll/accelerator/Makefile.am index d9b41006530..635d4cb9886 100644 --- a/ompi/mca/coll/accelerator/Makefile.am +++ b/ompi/mca/coll/accelerator/Makefile.am @@ -11,9 +11,10 @@ # $HEADER$ # -sources = coll_accelerator_module.c coll_accelerator_reduce.c coll_accelerator_allreduce.c \ +sources = coll_accelerator_reduce.c coll_accelerator_allreduce.c \ coll_accelerator_reduce_scatter_block.c coll_accelerator_reduce_scatter.c \ - coll_accelerator_component.c \ + coll_accelerator_allgather.c coll_accelerator_alltoall.c coll_accelerator_bcast.c \ + coll_accelerator_component.c coll_accelerator_module.c \ coll_accelerator_scan.c coll_accelerator_exscan.c coll_accelerator.h # Make the output library in this directory, and name it either diff --git a/ompi/mca/coll/accelerator/coll_accelerator.h b/ompi/mca/coll/accelerator/coll_accelerator.h index a719746d8b6..ad3163b937a 100644 --- a/ompi/mca/coll/accelerator/coll_accelerator.h +++ b/ompi/mca/coll/accelerator/coll_accelerator.h @@ -34,6 +34,11 @@ BEGIN_C_DECLS /* API functions */ + +extern int mca_coll_accelerator_bcast_thresh; +extern int mca_coll_accelerator_allgather_thresh; +extern int mca_coll_accelerator_alltoall_thresh; + int mca_coll_accelerator_init_query(bool enable_progress_threads, bool enable_mpi_threads); mca_coll_base_module_t @@ -85,6 +90,28 @@ mca_coll_accelerator_reduce_scatter(const void *sbuf, void *rbuf, ompi_count_arr struct ompi_communicator_t *comm, mca_coll_base_module_t *module); +int +mca_coll_accelerator_allgather(const void *sbuf, size_t scount, + struct ompi_datatype_t *sdtype, + void *rbuf, size_t rcount, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + +int +mca_coll_accelerator_alltoall(const void *sbuf, size_t scount, + struct ompi_datatype_t *sdtype, + void *rbuf, size_t rcount, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + +int +mca_coll_accelerator_bcast(void *buff, size_t count, + struct ompi_datatype_t *datatype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); /* Checks the type of pointer * diff --git a/ompi/mca/coll/accelerator/coll_accelerator_allgather.c b/ompi/mca/coll/accelerator/coll_accelerator_allgather.c new file mode 100644 index 00000000000..71d8ee46465 --- /dev/null +++ b/ompi/mca/coll/accelerator/coll_accelerator_allgather.c @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2014-2017 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved. + * Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved. + * Copyright (c) 2024 Triad National Security, LLC. All rights reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "coll_accelerator.h" + +#include + +#include "ompi/op/op.h" +#include "opal/datatype/opal_convertor.h" + +/* + * Function: - allgather for device buffers through temp CPU buffer + * Accepts: - same as MPI_Allgather() + * Returns: - MPI_SUCCESS or error code + */ +int +mca_coll_accelerator_allgather(const void *sbuf, size_t scount, + struct ompi_datatype_t *sdtype, + void *rbuf, size_t rcount, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module; + ptrdiff_t sgap, rgap; + char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL; + int sbuf_dev, rbuf_dev; + size_t sbufsize, rbufsize; + int rc; + int comm_size = ompi_comm_size(comm); + + sbufsize = opal_datatype_span(&sdtype->super, scount, &sgap); + rc = mca_coll_accelerator_check_buf((void *)sbuf, &sbuf_dev); + if (rc < 0) { + return rc; + } + if ((MPI_IN_PLACE != sbuf) && (rc > 0) && + ((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_allgather_thresh)) { + sbuf1 = (char*)malloc(sbufsize * comm_size); + if (NULL == sbuf1) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + mca_coll_accelerator_memcpy(sbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, sbuf, sbuf_dev, + sbufsize, MCA_ACCELERATOR_TRANSFER_DTOH); + sbuf = sbuf1 - sgap; + } + + rbufsize = opal_datatype_span(&rdtype->super, rcount, &rgap); + rc = mca_coll_accelerator_check_buf(rbuf, &rbuf_dev); + if (rc < 0) { + goto exit; + } + /* Using sbufsize here on purpose to ensure symmetric decision for handling of GPU vs + CPU buffers. The two buffer sizes are expected to be the same for pre-defined datatypes, + but could vary due to layout issues/gaps for derived datatypes */ + if ((rc > 0) && ((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_allgather_thresh)) { + rbuf1 = (char*)malloc(rbufsize * comm_size); + if (NULL == rbuf1) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + mca_coll_accelerator_memcpy(rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbuf, rbuf_dev, + rbufsize, MCA_ACCELERATOR_TRANSFER_DTOH); + rbuf2 = rbuf; /* save original buffer */ + rbuf = rbuf1 - rgap; + } + rc = s->c_coll.coll_allgather(sbuf, scount, sdtype, rbuf, rcount, rdtype, + comm, s->c_coll.coll_allgather_module); + if (rc < 0) { + goto exit; + } + + if (NULL != rbuf1) { + mca_coll_accelerator_memcpy(rbuf2, rbuf_dev, rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbufsize, + MCA_ACCELERATOR_TRANSFER_HTOD); + } + + exit: + if (NULL != sbuf1) { + free(sbuf1); + } + if (NULL != rbuf1) { + free(rbuf1); + } + + return rc; +} diff --git a/ompi/mca/coll/accelerator/coll_accelerator_alltoall.c b/ompi/mca/coll/accelerator/coll_accelerator_alltoall.c new file mode 100644 index 00000000000..5d9938cd16e --- /dev/null +++ b/ompi/mca/coll/accelerator/coll_accelerator_alltoall.c @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014-2017 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved. + * Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved. + * Copyright (c) 2024 Triad National Security, LLC. All rights reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "coll_accelerator.h" + +#include + +#include "ompi/op/op.h" +#include "opal/datatype/opal_convertor.h" + +/* + * Function: - alltoall for device buffers using temp. CPU buffer + * Accepts: - same as MPI_Alltoall() + * Returns: - MPI_SUCCESS or error code + */ +int +mca_coll_accelerator_alltoall(const void *sbuf, size_t scount, + struct ompi_datatype_t *sdtype, + void *rbuf, size_t rcount, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module; + ptrdiff_t sgap, rgap; + char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL; + int sbuf_dev, rbuf_dev; + size_t sbufsize, rbufsize; + int rc; + int comm_size = ompi_comm_size(comm); + + sbufsize = opal_datatype_span(&sdtype->super, scount, &sgap); + rc = mca_coll_accelerator_check_buf((void *)sbuf, &sbuf_dev); + if (rc < 0) { + return rc; + } + if ((MPI_IN_PLACE != sbuf) && (rc > 0) && + ((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_alltoall_thresh)) { + sbuf1 = (char*)malloc(sbufsize * comm_size); + if (NULL == sbuf1) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + mca_coll_accelerator_memcpy(sbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, sbuf, sbuf_dev, + sbufsize, MCA_ACCELERATOR_TRANSFER_DTOH); + sbuf = sbuf1 - sgap; + } + + rbufsize = opal_datatype_span(&rdtype->super, rcount, &rgap); + rc = mca_coll_accelerator_check_buf(rbuf, &rbuf_dev); + if (rc < 0) { + goto exit;; + } + /* Using sbufsize here on purpose to ensure symmetric decision for handling of GPU vs + CPU buffers. The two buffer sizes are expected to be the same for pre-defined datatypes, + but could vary due to layout issues/gaps for derived datatypes */ + if ((rc > 0) && ((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_alltoall_thresh)) { + rbuf1 = (char*)malloc(rbufsize * comm_size); + if (NULL == rbuf1) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + mca_coll_accelerator_memcpy(rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbuf, rbuf_dev, + rbufsize, MCA_ACCELERATOR_TRANSFER_DTOH); + rbuf2 = rbuf; /* save away original buffer */ + rbuf = rbuf1 - rgap; + } + rc = s->c_coll.coll_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, + comm, s->c_coll.coll_alltoall_module); + if (rc < 0) { + goto exit;; + } + if (NULL != rbuf1) { + mca_coll_accelerator_memcpy(rbuf2, rbuf_dev, rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbufsize, + MCA_ACCELERATOR_TRANSFER_HTOD); + } + + exit: + if (NULL != sbuf1) { + free(sbuf1); + } + if (NULL != rbuf1) { + free(rbuf1); + } + + return rc; +} diff --git a/ompi/mca/coll/accelerator/coll_accelerator_bcast.c b/ompi/mca/coll/accelerator/coll_accelerator_bcast.c new file mode 100644 index 00000000000..7bec2ddc876 --- /dev/null +++ b/ompi/mca/coll/accelerator/coll_accelerator_bcast.c @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2024 NVIDIA Corporation. All rights reserved. + * Copyright (c) 2004-2023 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved. + * Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved. + * Copyright (c) 2024 Triad National Security, LLC. All rights reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "coll_accelerator.h" + +#include + +#include "ompi/op/op.h" +#include "opal/datatype/opal_convertor.h" + +/* + * + * Function: - Bcast for device buffers through temp CPU buffer. + * Accepts: - same as MPI_Bcast() + * Returns: - MPI_SUCCESS or error code + */ +int +mca_coll_accelerator_bcast(void *orig_buf, size_t count, + struct ompi_datatype_t *datatype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module; + ptrdiff_t gap; + char *buf1 = NULL; + char *sbuf = (char*) orig_buf; + int buf_dev; + size_t bufsize; + int rc; + + bufsize = opal_datatype_span(&datatype->super, count, &gap); + + rc = mca_coll_accelerator_check_buf((void *)orig_buf, &buf_dev); + if (rc < 0) { + return rc; + } + if ((rc > 0) && (count <= (size_t)mca_coll_accelerator_bcast_thresh)) { + buf1 = (char*)malloc(bufsize); + if (NULL == buf1) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + mca_coll_accelerator_memcpy(buf1, MCA_ACCELERATOR_NO_DEVICE_ID, orig_buf, buf_dev, bufsize, + MCA_ACCELERATOR_TRANSFER_DTOH); + sbuf = buf1 - gap; + } + + rc = s->c_coll.coll_bcast((void *) sbuf, count, datatype, root, comm, + s->c_coll.coll_bcast_module); + if (rc < 0) { + goto exit; + } + if (NULL != buf1) { + mca_coll_accelerator_memcpy((void*)orig_buf, buf_dev, buf1, MCA_ACCELERATOR_NO_DEVICE_ID, bufsize, + MCA_ACCELERATOR_TRANSFER_HTOD); + } + + exit: + if (NULL != buf1) { + free(buf1); + } + + return rc; +} diff --git a/ompi/mca/coll/accelerator/coll_accelerator_component.c b/ompi/mca/coll/accelerator/coll_accelerator_component.c index c4ba508026b..54162566671 100644 --- a/ompi/mca/coll/accelerator/coll_accelerator_component.c +++ b/ompi/mca/coll/accelerator/coll_accelerator_component.c @@ -22,6 +22,11 @@ #include "ompi/constants.h" #include "coll_accelerator.h" + +int mca_coll_accelerator_bcast_thresh = 256; +int mca_coll_accelerator_allgather_thresh = 65536; +int mca_coll_accelerator_alltoall_thresh = 65536; + /* * Public string showing the coll ompi_accelerator component version number */ @@ -88,5 +93,32 @@ static int accelerator_register(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_accelerator_component.disable_accelerator_coll); + mca_coll_accelerator_bcast_thresh = 256; + (void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version, + "bcast_thresh", + "max. msg length for which to copy accelerator buffer to CPU for bcast operation", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_accelerator_bcast_thresh); + + mca_coll_accelerator_allgather_thresh = 65536; + (void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version, + "allgather_thresh", + "max. overall msg length for which to copy accelerator buffer to CPU for allgather operation", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_accelerator_allgather_thresh); + + mca_coll_accelerator_alltoall_thresh = 65536; + (void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version, + "alltoall_thresh", + "max. overall msg length for which to copy accelerator buffer to CPU for alltoall operation", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_accelerator_alltoall_thresh); + return OMPI_SUCCESS; } diff --git a/ompi/mca/coll/accelerator/coll_accelerator_module.c b/ompi/mca/coll/accelerator/coll_accelerator_module.c index 862eaed8ad7..c4a08c2e4a6 100644 --- a/ompi/mca/coll/accelerator/coll_accelerator_module.c +++ b/ompi/mca/coll/accelerator/coll_accelerator_module.c @@ -93,8 +93,11 @@ mca_coll_accelerator_comm_query(struct ompi_communicator_t *comm, accelerator_module->super.coll_module_enable = mca_coll_accelerator_module_enable; accelerator_module->super.coll_module_disable = mca_coll_accelerator_module_disable; + accelerator_module->super.coll_allgather = mca_coll_accelerator_allgather; accelerator_module->super.coll_allreduce = mca_coll_accelerator_allreduce; + accelerator_module->super.coll_alltoall = mca_coll_accelerator_alltoall; accelerator_module->super.coll_reduce = mca_coll_accelerator_reduce; + accelerator_module->super.coll_bcast = mca_coll_accelerator_bcast; accelerator_module->super.coll_reduce_local = mca_coll_accelerator_reduce_local; accelerator_module->super.coll_reduce_scatter = mca_coll_accelerator_reduce_scatter; accelerator_module->super.coll_reduce_scatter_block = mca_coll_accelerator_reduce_scatter_block; @@ -142,7 +145,10 @@ mca_coll_accelerator_module_enable(mca_coll_base_module_t *module, { mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module; + ACCELERATOR_INSTALL_COLL_API(comm, s, allgather); ACCELERATOR_INSTALL_COLL_API(comm, s, allreduce); + ACCELERATOR_INSTALL_COLL_API(comm, s, alltoall); + ACCELERATOR_INSTALL_COLL_API(comm, s, bcast); ACCELERATOR_INSTALL_COLL_API(comm, s, reduce); ACCELERATOR_INSTALL_COLL_API(comm, s, reduce_local); ACCELERATOR_INSTALL_COLL_API(comm, s, reduce_scatter); @@ -162,8 +168,11 @@ mca_coll_accelerator_module_disable(mca_coll_base_module_t *module, { mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module; + ACCELERATOR_UNINSTALL_COLL_API(comm, s, allgather); ACCELERATOR_UNINSTALL_COLL_API(comm, s, allreduce); + ACCELERATOR_UNINSTALL_COLL_API(comm, s, alltoall); ACCELERATOR_UNINSTALL_COLL_API(comm, s, reduce); + ACCELERATOR_UNINSTALL_COLL_API(comm, s, bcast); ACCELERATOR_UNINSTALL_COLL_API(comm, s, reduce_local); ACCELERATOR_UNINSTALL_COLL_API(comm, s, reduce_scatter); ACCELERATOR_UNINSTALL_COLL_API(comm, s, reduce_scatter_block);