Skip to content

Commit

Permalink
coll/accelerator: add bcast,allgather,alltoall
Browse files Browse the repository at this point in the history
add support for bcast, allgather and alltoall for device buffers using
a temporary buffer on the CPU. The maximum msg length for each operation
for which to use this approach can be controlled through an mca
parameter. Note, for allgather and alltoall, the parameter represents
the product of communicator size * msg length per proc.

Signed-off-by: Edgar Gabriel <[email protected]>
  • Loading branch information
edgargabriel committed Dec 30, 2024
1 parent 09044f4 commit ea008e1
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 2 deletions.
5 changes: 3 additions & 2 deletions ompi/mca/coll/accelerator/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# $HEADER$
#

sources = coll_accelerator_module.c coll_accelerator_reduce.c coll_accelerator_allreduce.c \
sources = coll_accelerator_reduce.c coll_accelerator_allreduce.c \
coll_accelerator_reduce_scatter_block.c coll_accelerator_reduce_scatter.c \
coll_accelerator_component.c \
coll_accelerator_allgather.c coll_accelerator_alltoall.c coll_accelerator_bcast.c \
coll_accelerator_component.c coll_accelerator_module.c \
coll_accelerator_scan.c coll_accelerator_exscan.c coll_accelerator.h

# Make the output library in this directory, and name it either
Expand Down
27 changes: 27 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ BEGIN_C_DECLS

/* API functions */


extern int mca_coll_accelerator_bcast_thresh;
extern int mca_coll_accelerator_allgather_thresh;
extern int mca_coll_accelerator_alltoall_thresh;

int mca_coll_accelerator_init_query(bool enable_progress_threads,
bool enable_mpi_threads);
mca_coll_base_module_t
Expand Down Expand Up @@ -85,6 +90,28 @@ mca_coll_accelerator_reduce_scatter(const void *sbuf, void *rbuf, ompi_count_arr
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int
mca_coll_accelerator_allgather(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int
mca_coll_accelerator_alltoall(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int
mca_coll_accelerator_bcast(void *buff, size_t count,
struct ompi_datatype_t *datatype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

/* Checks the type of pointer
*
Expand Down
100 changes: 100 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_allgather.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2014-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"
#include "coll_accelerator.h"

#include <stdio.h>

#include "ompi/op/op.h"
#include "opal/datatype/opal_convertor.h"

/*
* Function: - allgather for device buffers through temp CPU buffer
* Accepts: - same as MPI_Allgather()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_accelerator_allgather(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module;
ptrdiff_t sgap, rgap;
char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL;
int sbuf_dev, rbuf_dev;
size_t sbufsize, rbufsize;
int rc;
int comm_size = ompi_comm_size(comm);

sbufsize = opal_datatype_span(&sdtype->super, scount, &sgap);
rc = mca_coll_accelerator_check_buf((void *)sbuf, &sbuf_dev);
if (rc < 0) {
return rc;
}
if ((MPI_IN_PLACE != sbuf) && (rc > 0) &&
((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_allgather_thresh)) {
sbuf1 = (char*)malloc(sbufsize * comm_size);
if (NULL == sbuf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_accelerator_memcpy(sbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, sbuf, sbuf_dev,
sbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
sbuf = sbuf1 - sgap;
}

rbufsize = opal_datatype_span(&rdtype->super, rcount, &rgap);
rc = mca_coll_accelerator_check_buf(rbuf, &rbuf_dev);
if (rc < 0) {
goto exit;
}
/* Using sbufsize here on purpose to ensure symmetric decision for handling of GPU vs
CPU buffers. The two buffer sizes are expected to be the same for pre-defined datatypes,
but could vary due to layout issues/gaps for derived datatypes */
if ((rc > 0) && ((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_allgather_thresh)) {
rbuf1 = (char*)malloc(rbufsize * comm_size);
if (NULL == rbuf1) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
mca_coll_accelerator_memcpy(rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbuf, rbuf_dev,
rbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
rbuf2 = rbuf; /* save original buffer */
rbuf = rbuf1 - rgap;
}
rc = s->c_coll.coll_allgather(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, s->c_coll.coll_allgather_module);
if (rc < 0) {
goto exit;
}

if (NULL != rbuf1) {
mca_coll_accelerator_memcpy(rbuf2, rbuf_dev, rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbufsize,
MCA_ACCELERATOR_TRANSFER_HTOD);
}

exit:
if (NULL != sbuf1) {
free(sbuf1);
}
if (NULL != rbuf1) {
free(rbuf1);
}

return rc;
}
99 changes: 99 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_alltoall.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2014-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"
#include "coll_accelerator.h"

#include <stdio.h>

#include "ompi/op/op.h"
#include "opal/datatype/opal_convertor.h"

/*
* Function: - alltoall for device buffers using temp. CPU buffer
* Accepts: - same as MPI_Alltoall()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_accelerator_alltoall(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module;
ptrdiff_t sgap, rgap;
char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL;
int sbuf_dev, rbuf_dev;
size_t sbufsize, rbufsize;
int rc;
int comm_size = ompi_comm_size(comm);

sbufsize = opal_datatype_span(&sdtype->super, scount, &sgap);
rc = mca_coll_accelerator_check_buf((void *)sbuf, &sbuf_dev);
if (rc < 0) {
return rc;
}
if ((MPI_IN_PLACE != sbuf) && (rc > 0) &&
((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_alltoall_thresh)) {
sbuf1 = (char*)malloc(sbufsize * comm_size);
if (NULL == sbuf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_accelerator_memcpy(sbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, sbuf, sbuf_dev,
sbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
sbuf = sbuf1 - sgap;
}

rbufsize = opal_datatype_span(&rdtype->super, rcount, &rgap);
rc = mca_coll_accelerator_check_buf(rbuf, &rbuf_dev);
if (rc < 0) {
goto exit;;
}
/* Using sbufsize here on purpose to ensure symmetric decision for handling of GPU vs
CPU buffers. The two buffer sizes are expected to be the same for pre-defined datatypes,
but could vary due to layout issues/gaps for derived datatypes */
if ((rc > 0) && ((sbufsize * comm_size) <= (size_t)mca_coll_accelerator_alltoall_thresh)) {
rbuf1 = (char*)malloc(rbufsize * comm_size);
if (NULL == rbuf1) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
mca_coll_accelerator_memcpy(rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbuf, rbuf_dev,
rbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
rbuf2 = rbuf; /* save away original buffer */
rbuf = rbuf1 - rgap;
}
rc = s->c_coll.coll_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, s->c_coll.coll_alltoall_module);
if (rc < 0) {
goto exit;;
}
if (NULL != rbuf1) {
mca_coll_accelerator_memcpy(rbuf2, rbuf_dev, rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbufsize,
MCA_ACCELERATOR_TRANSFER_HTOD);
}

exit:
if (NULL != sbuf1) {
free(sbuf1);
}
if (NULL != rbuf1) {
free(rbuf1);
}

return rc;
}
78 changes: 78 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_bcast.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2004-2023 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"
#include "coll_accelerator.h"

#include <stdio.h>

#include "ompi/op/op.h"
#include "opal/datatype/opal_convertor.h"

/*
*
* Function: - Bcast for device buffers through temp CPU buffer.
* Accepts: - same as MPI_Bcast()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_accelerator_bcast(void *orig_buf, size_t count,
struct ompi_datatype_t *datatype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module;
ptrdiff_t gap;
char *buf1 = NULL;
char *sbuf = (char*) orig_buf;
int buf_dev;
size_t bufsize;
int rc;

bufsize = opal_datatype_span(&datatype->super, count, &gap);

rc = mca_coll_accelerator_check_buf((void *)orig_buf, &buf_dev);
if (rc < 0) {
return rc;
}
if ((rc > 0) && (count <= (size_t)mca_coll_accelerator_bcast_thresh)) {
buf1 = (char*)malloc(bufsize);
if (NULL == buf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_accelerator_memcpy(buf1, MCA_ACCELERATOR_NO_DEVICE_ID, orig_buf, buf_dev, bufsize,
MCA_ACCELERATOR_TRANSFER_DTOH);
sbuf = buf1 - gap;
}

rc = s->c_coll.coll_bcast((void *) sbuf, count, datatype, root, comm,
s->c_coll.coll_bcast_module);
if (rc < 0) {
goto exit;
}
if (NULL != buf1) {
mca_coll_accelerator_memcpy((void*)orig_buf, buf_dev, buf1, MCA_ACCELERATOR_NO_DEVICE_ID, bufsize,
MCA_ACCELERATOR_TRANSFER_HTOD);
}

exit:
if (NULL != buf1) {
free(buf1);
}

return rc;
}
32 changes: 32 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
#include "ompi/constants.h"
#include "coll_accelerator.h"


int mca_coll_accelerator_bcast_thresh = 256;
int mca_coll_accelerator_allgather_thresh = 65536;
int mca_coll_accelerator_alltoall_thresh = 65536;

/*
* Public string showing the coll ompi_accelerator component version number
*/
Expand Down Expand Up @@ -88,5 +93,32 @@ static int accelerator_register(void)
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_component.disable_accelerator_coll);

mca_coll_accelerator_bcast_thresh = 256;
(void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version,
"bcast_thresh",
"max. msg length for which to copy accelerator buffer to CPU for bcast operation",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_bcast_thresh);

mca_coll_accelerator_allgather_thresh = 65536;
(void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version,
"allgather_thresh",
"max. overall msg length for which to copy accelerator buffer to CPU for allgather operation",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_allgather_thresh);

mca_coll_accelerator_alltoall_thresh = 65536;
(void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version,
"alltoall_thresh",
"max. overall msg length for which to copy accelerator buffer to CPU for alltoall operation",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_alltoall_thresh);

return OMPI_SUCCESS;
}
Loading

0 comments on commit ea008e1

Please sign in to comment.