Skip to content

Commit

Permalink
coll/accelerator: add bcast,allgather,alltoall
Browse files Browse the repository at this point in the history
add support for bcast, allgather and alltoall for device buffers using
a temporary buffer on the CPU. The maximum msg length for each operation
for which to use this approach can be controlled through an mca
parameter.

Signed-off-by: Edgar Gabriel <[email protected]>
  • Loading branch information
edgargabriel committed Jan 2, 2025
1 parent 09044f4 commit d24466d
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 2 deletions.
5 changes: 3 additions & 2 deletions ompi/mca/coll/accelerator/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# $HEADER$
#

sources = coll_accelerator_module.c coll_accelerator_reduce.c coll_accelerator_allreduce.c \
sources = coll_accelerator_reduce.c coll_accelerator_allreduce.c \
coll_accelerator_reduce_scatter_block.c coll_accelerator_reduce_scatter.c \
coll_accelerator_component.c \
coll_accelerator_allgather.c coll_accelerator_alltoall.c coll_accelerator_bcast.c \
coll_accelerator_component.c coll_accelerator_module.c \
coll_accelerator_scan.c coll_accelerator_exscan.c coll_accelerator.h

# Make the output library in this directory, and name it either
Expand Down
27 changes: 27 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ BEGIN_C_DECLS

/* API functions */


extern int mca_coll_accelerator_bcast_thresh;
extern int mca_coll_accelerator_allgather_thresh;
extern int mca_coll_accelerator_alltoall_thresh;

int mca_coll_accelerator_init_query(bool enable_progress_threads,
bool enable_mpi_threads);
mca_coll_base_module_t
Expand Down Expand Up @@ -85,6 +90,28 @@ mca_coll_accelerator_reduce_scatter(const void *sbuf, void *rbuf, ompi_count_arr
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int
mca_coll_accelerator_allgather(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int
mca_coll_accelerator_alltoall(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int
mca_coll_accelerator_bcast(void *buff, size_t count,
struct ompi_datatype_t *datatype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

/* Checks the type of pointer
*
Expand Down
100 changes: 100 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_allgather.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2014-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"
#include "coll_accelerator.h"

#include <stdio.h>

#include "ompi/op/op.h"
#include "opal/datatype/opal_convertor.h"

/*
* Function: - allgather for device buffers through temp CPU buffer
* Accepts: - same as MPI_Allgather()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_accelerator_allgather(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module;
ptrdiff_t sgap, rgap;
char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL;
int sbuf_dev, rbuf_dev;
size_t sbufsize, rbufsize;
int rc;
int comm_size = ompi_comm_size(comm);

sbufsize = opal_datatype_span(&sdtype->super, scount, &sgap);
rc = mca_coll_accelerator_check_buf((void *)sbuf, &sbuf_dev);
if (rc < 0) {
return rc;
}
if ((MPI_IN_PLACE != sbuf) && (rc > 0) &&
(sbufsize <= (size_t)mca_coll_accelerator_allgather_thresh)) {
sbuf1 = (char*)malloc(sbufsize * comm_size);
if (NULL == sbuf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_accelerator_memcpy(sbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, sbuf, sbuf_dev,
sbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
sbuf = sbuf1 - sgap;
}

rbufsize = opal_datatype_span(&rdtype->super, rcount, &rgap);
rc = mca_coll_accelerator_check_buf(rbuf, &rbuf_dev);
if (rc < 0) {
goto exit;
}
/* Using sbufsize here on purpose to ensure symmetric decision for handling of GPU vs
CPU buffers. The two buffer sizes are expected to be the same for pre-defined datatypes,
but could vary due to layout issues/gaps for derived datatypes */
if ((rc > 0) && (sbufsize <= (size_t)mca_coll_accelerator_allgather_thresh)) {
rbuf1 = (char*)malloc(rbufsize * comm_size);
if (NULL == rbuf1) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
mca_coll_accelerator_memcpy(rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbuf, rbuf_dev,
rbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
rbuf2 = rbuf; /* save original buffer */
rbuf = rbuf1 - rgap;
}
rc = s->c_coll.coll_allgather(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, s->c_coll.coll_allgather_module);
if (rc < 0) {
goto exit;
}

if (NULL != rbuf1) {
mca_coll_accelerator_memcpy(rbuf2, rbuf_dev, rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbufsize,
MCA_ACCELERATOR_TRANSFER_HTOD);
}

exit:
if (NULL != sbuf1) {
free(sbuf1);
}
if (NULL != rbuf1) {
free(rbuf1);
}

return rc;
}
99 changes: 99 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_alltoall.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2014-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"
#include "coll_accelerator.h"

#include <stdio.h>

#include "ompi/op/op.h"
#include "opal/datatype/opal_convertor.h"

/*
* Function: - alltoall for device buffers using temp. CPU buffer
* Accepts: - same as MPI_Alltoall()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_accelerator_alltoall(const void *sbuf, size_t scount,
struct ompi_datatype_t *sdtype,
void *rbuf, size_t rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module;
ptrdiff_t sgap, rgap;
char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL;
int sbuf_dev, rbuf_dev;
size_t sbufsize, rbufsize;
int rc;
int comm_size = ompi_comm_size(comm);

sbufsize = opal_datatype_span(&sdtype->super, scount, &sgap);
rc = mca_coll_accelerator_check_buf((void *)sbuf, &sbuf_dev);
if (rc < 0) {
return rc;
}
if ((MPI_IN_PLACE != sbuf) && (rc > 0) &&
(sbufsize <= (size_t)mca_coll_accelerator_alltoall_thresh)) {
sbuf1 = (char*)malloc(sbufsize * comm_size);
if (NULL == sbuf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_accelerator_memcpy(sbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, sbuf, sbuf_dev,
sbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
sbuf = sbuf1 - sgap;
}

rbufsize = opal_datatype_span(&rdtype->super, rcount, &rgap);
rc = mca_coll_accelerator_check_buf(rbuf, &rbuf_dev);
if (rc < 0) {
goto exit;
}
/* Using sbufsize here on purpose to ensure symmetric decision for handling of GPU vs
CPU buffers. The two buffer sizes are expected to be the same for pre-defined datatypes,
but could vary due to layout issues/gaps for derived datatypes */
if ((rc > 0) && (sbufsize <= (size_t)mca_coll_accelerator_alltoall_thresh)) {
rbuf1 = (char*)malloc(rbufsize * comm_size);
if (NULL == rbuf1) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
mca_coll_accelerator_memcpy(rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbuf, rbuf_dev,
rbufsize, MCA_ACCELERATOR_TRANSFER_DTOH);
rbuf2 = rbuf; /* save away original buffer */
rbuf = rbuf1 - rgap;
}
rc = s->c_coll.coll_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, s->c_coll.coll_alltoall_module);
if (rc < 0) {
goto exit;;
}
if (NULL != rbuf1) {
mca_coll_accelerator_memcpy(rbuf2, rbuf_dev, rbuf1, MCA_ACCELERATOR_NO_DEVICE_ID, rbufsize,
MCA_ACCELERATOR_TRANSFER_HTOD);
}

exit:
if (NULL != sbuf1) {
free(sbuf1);
}
if (NULL != rbuf1) {
free(rbuf1);
}

return rc;
}
78 changes: 78 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_bcast.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2004-2023 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2014-2015 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All Rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

#include "ompi_config.h"
#include "coll_accelerator.h"

#include <stdio.h>

#include "ompi/op/op.h"
#include "opal/datatype/opal_convertor.h"

/*
*
* Function: - Bcast for device buffers through temp CPU buffer.
* Accepts: - same as MPI_Bcast()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_accelerator_bcast(void *orig_buf, size_t count,
struct ompi_datatype_t *datatype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_accelerator_module_t *s = (mca_coll_accelerator_module_t*) module;
ptrdiff_t gap;
char *buf1 = NULL;
char *sbuf = (char*) orig_buf;
int buf_dev;
size_t bufsize;
int rc;

bufsize = opal_datatype_span(&datatype->super, count, &gap);

rc = mca_coll_accelerator_check_buf((void *)orig_buf, &buf_dev);
if (rc < 0) {
return rc;
}
if ((rc > 0) && (bufsize <= (size_t)mca_coll_accelerator_bcast_thresh)) {
buf1 = (char*)malloc(bufsize);
if (NULL == buf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_accelerator_memcpy(buf1, MCA_ACCELERATOR_NO_DEVICE_ID, orig_buf, buf_dev, bufsize,
MCA_ACCELERATOR_TRANSFER_DTOH);
sbuf = buf1 - gap;
}

rc = s->c_coll.coll_bcast((void *) sbuf, count, datatype, root, comm,
s->c_coll.coll_bcast_module);
if (rc < 0) {
goto exit;
}
if (NULL != buf1) {
mca_coll_accelerator_memcpy((void*)orig_buf, buf_dev, buf1, MCA_ACCELERATOR_NO_DEVICE_ID, bufsize,
MCA_ACCELERATOR_TRANSFER_HTOD);
}

exit:
if (NULL != buf1) {
free(buf1);
}

return rc;
}
32 changes: 32 additions & 0 deletions ompi/mca/coll/accelerator/coll_accelerator_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
#include "ompi/constants.h"
#include "coll_accelerator.h"


int mca_coll_accelerator_bcast_thresh = 256;
int mca_coll_accelerator_allgather_thresh = 65536;
int mca_coll_accelerator_alltoall_thresh = 65536;

/*
* Public string showing the coll ompi_accelerator component version number
*/
Expand Down Expand Up @@ -88,5 +93,32 @@ static int accelerator_register(void)
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_component.disable_accelerator_coll);

mca_coll_accelerator_bcast_thresh = 256;
(void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version,
"bcast_thresh",
"max. msg length for which to copy accelerator buffer to CPU for bcast operation",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_bcast_thresh);

mca_coll_accelerator_allgather_thresh = 65536;
(void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version,
"allgather_thresh",
"max. msg length for which to copy accelerator buffer to CPU for allgather operation",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_allgather_thresh);

mca_coll_accelerator_alltoall_thresh = 65536;
(void) mca_base_component_var_register(&mca_coll_accelerator_component.super.collm_version,
"alltoall_thresh",
"max. msg length for which to copy accelerator buffer to CPU for alltoall operation",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_accelerator_alltoall_thresh);

return OMPI_SUCCESS;
}
Loading

0 comments on commit d24466d

Please sign in to comment.