Skip to content

Commit

Permalink
added multiple_comms test
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Ducarton committed Nov 21, 2024
1 parent 94e0333 commit 2b1f660
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
6 changes: 4 additions & 2 deletions mpi/tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ starpu_mpi_TESTS += \

if STARPU_USE_MPI_MPI
starpu_mpi_TESTS += \
load_balancer
load_balancer \
multiple_comms
endif
endif

Expand Down Expand Up @@ -255,7 +256,8 @@ noinst_PROGRAMS += \
nothing \
display_bindings \
mpi_task_submit \
wait_for_all
wait_for_all \
multiple_comms

if STARPU_USE_MPI_FT
noinst_PROGRAMS += \
Expand Down
87 changes: 87 additions & 0 deletions mpi/tests/multiple_comms.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#include <starpu_mpi.h>
#include "helper.h"

int main(int argc, char ** argv){
//we will only use three processes for now
int my_rank, size, rsize;
int ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");

starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
starpu_mpi_comm_size(MPI_COMM_WORLD, &size);

MPI_Comm new_world;
MPI_Comm_dup(MPI_COMM_WORLD, &new_world);
MPI_Comm_set_name(new_world, "application duplicated comm");
starpu_mpi_comm_register(new_world);
if(size < 2){
fprintf(stderr, "We need at least 2 processes.\n");
starpu_mpi_shutdown();
return STARPU_TEST_SKIPPED;
}
//we want an even number of processes
rsize = size%2 != 0 ? size-1 : size;
if(my_rank < rsize ){
starpu_data_handle_t send_handles[rsize];
starpu_data_handle_t recv_handles[rsize];

int * stabs[rsize];
int * rtabs[rsize];
for(int i = 0; i < rsize; i++){
stabs[i] = malloc(sizeof(int)*100);
rtabs[i] = malloc(sizeof(int)*100);
}
for(int i = 0; i < 100; i++)
stabs[my_rank][i] = my_rank * 100 + i;
for(int i = 0; i < rsize; i++){
starpu_variable_data_register( send_handles + i, STARPU_MAIN_RAM, (uintptr_t)stabs[i], 100* sizeof(int));
starpu_variable_data_register( recv_handles + i, STARPU_MAIN_RAM, (uintptr_t)rtabs[i], 100 *sizeof(int));
}
starpu_mpi_req reqs[4];

if(my_rank%2 == 0){
starpu_mpi_isend(send_handles[my_rank], &reqs[0], (my_rank+1)%rsize, 12, MPI_COMM_WORLD);
starpu_mpi_isend(send_handles[my_rank], &reqs[1], my_rank==0?rsize-1:my_rank-1, 12, new_world);

starpu_mpi_irecv(recv_handles[(my_rank+1)%rsize], &reqs[2], (my_rank+1)%rsize, 13,MPI_COMM_WORLD);
starpu_mpi_irecv(recv_handles[my_rank==0?rsize-1:my_rank-1], &reqs[3], my_rank==0?rsize-1:my_rank-1, 13,new_world);
}
else{
starpu_mpi_irecv(recv_handles[my_rank==0?rsize-1:my_rank-1], &reqs[0], my_rank==0?rsize-1:my_rank-1, 12, MPI_COMM_WORLD);
starpu_mpi_irecv(recv_handles[(my_rank+1)%rsize], &reqs[1], (my_rank+1)%rsize, 12, new_world);

starpu_mpi_isend(send_handles[my_rank], &reqs[2], my_rank==0?rsize-1:my_rank-1, 13, MPI_COMM_WORLD);
starpu_mpi_isend(send_handles[my_rank], &reqs[3], (my_rank+1)%rsize, 13, new_world);
}
int nb_req=4;
while (nb_req)
{
int r=0;
for(r=0 ; r<4 ; r++)
{
if (reqs[r])
{
int finished = 0;
MPI_Status status;
ret = starpu_mpi_test(&reqs[r], &finished, &status);
STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_test");
STARPU_ASSERT(finished != -1);
if (finished)
{
reqs[r] = NULL;
nb_req--;
}
}
}
}
for(int i = 0; i < rsize; i++){
starpu_data_unregister(send_handles[i]);
starpu_data_unregister(recv_handles[i]);
free(stabs[i]);
free(rtabs[i]);
}
}
starpu_mpi_barrier(MPI_COMM_WORLD);
starpu_mpi_shutdown();
return 0;
}

0 comments on commit 2b1f660

Please sign in to comment.