diff --git a/CMakeLists.txt b/CMakeLists.txt index 77b1cdf8..db8262fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -233,6 +233,9 @@ endif() if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 8.0.0 ) ) add_definitions(-DGCC_GE_8) # Tell library to build against GFortran 8.x bindings w/ descriptor change endif() + if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 14.0.0 ) ) + add_definitions(-DGCC_GE_15) # Tell library to build against GFortran 15.x bindings + endif() if(gfortran_compiler) set(OLD_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS}) @@ -857,6 +860,7 @@ if(opencoarrays_aware_compiler) # Pure sendget tests add_caf_test(strided_sendget 3 strided_sendget) + add_caf_test(get_with_1d_vector_index 3 get_with_1d_vector_index) add_caf_test(get_with_vector_index 4 get_with_vector_index) # Collective subroutine tests diff --git a/src/application-binary-interface/libcaf.h b/src/application-binary-interface/libcaf.h index d88f1537..5635fb1a 100644 --- a/src/application-binary-interface/libcaf.h +++ b/src/application-binary-interface/libcaf.h @@ -261,6 +261,23 @@ void PREFIX(caf_sendget)(caf_token_t, size_t, int, gfc_descriptor_t *, gfc_descriptor_t *, caf_vector_t *, int, int, bool, int *); +#ifdef GCC_GE_15 +void PREFIX(register_accessor)(const int hash, + void (*accessor)(void **, int32_t *, void *, + void *, size_t *, size_t *)); + +void PREFIX(register_accessors_finish)(); + +int PREFIX(get_remote_function_index)(const int hash); + +void PREFIX(get_by_ct)(caf_token_t token, const gfc_descriptor_t *opt_src_desc, + const size_t *opt_src_charlen, const int image_index, + const size_t dst_size, void **dst_data, + size_t *opt_dst_charlen, gfc_descriptor_t *opt_dst_desc, + const bool may_realloc_dst, const int getter_index, + void *get_data, const size_t get_data_size, int *stat, + caf_team_t *team, int *team_number); +#endif #ifdef GCC_GE_8 void PREFIX(get_by_ref)(caf_token_t, int, gfc_descriptor_t *dst, caf_reference_t *refs, int dst_kind, int src_kind, diff --git a/src/runtime-libraries/mpi/mpi_caf.c b/src/runtime-libraries/mpi/mpi_caf.c index 9c283569..7a608101 100644 --- a/src/runtime-libraries/mpi/mpi_caf.c +++ b/src/runtime-libraries/mpi/mpi_caf.c @@ -35,6 +35,7 @@ #include /* Assume functionality provided elsewhere if missing */ #endif #include +#define __USE_GNU #include #include /* For raise */ #include /* For int32_t. */ @@ -234,6 +235,56 @@ char *msgbody; pthread_mutex_t lock_am; int done_am = 0; +/* Communication thread variables, constants and structures. */ +static const int CAF_CT_TAG = 13; +pthread_t commthread; +MPI_Comm ct_COMM; +bool commthread_running = true; +enum CT_MSG_FLAGS +{ + /* Use the inter communication thread communicator. */ + CT_INTER_CT = 1, + CT_DST_HAS_DESC = 1 << 1, + CT_SRC_HAS_DESC = 1 << 2, + CT_CHAR_ARRAY = 1 << 3, + CT_INCLUDE_DESCRIPTOR = 1 << 4, + /* Use 1 << 5 for next flag. */ +}; + +typedef void (*accessor_t)(void **, int32_t *, void *, void *, size_t *, + size_t *); +struct accessor_hash_t +{ + int hash; + int pad; + accessor_t accessor; +}; + +static struct accessor_hash_t *accessor_hash_table = NULL; +static int aht_cap = 0; +static int aht_size = 0; +static enum +{ + AHT_UNINITIALIZED, + AHT_OPEN, + AHT_PREPARED +} accessor_hash_table_state = AHT_UNINITIALIZED; + +/* The structure to communicate with the communication thread. Make sure, that + * data[] starts on pointer aligned address to not loss any performance. */ +typedef struct +{ + MPI_Win win; + int flags; + size_t transfer_size; + size_t opt_charlen; + int dest_image; + int dest_tag; + size_t dest_opt_charlen; + int accessor_index; + char data[]; +} ct_msg_t; + char err_buffer[MPI_MAX_ERROR_STRING]; /* All CAF runtime calls should use this comm instead of MPI_COMM_WORLD for @@ -322,6 +373,12 @@ double (*double_by_value)(double, double); // #define CAF_Win_unlock_all(win) MPI_Win_unlock_all (win) // #endif // CAF_MPI_LOCK_UNLOCK +/* Convenience macro to get the extent of a descriptor in a certain dimension + * + * Copied from gcc:libgfortran/libgfortran.h. */ +#define GFC_DESCRIPTOR_EXTENT(desc, i) \ + ((desc)->dim[i]._ubound + 1 - (desc)->dim[i].lower_bound) + #define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) #if defined(NONBLOCKING_PUT) && !defined(CAF_MPI_LOCK_UNLOCK) @@ -406,6 +463,250 @@ helperFunction() } #endif +void +dump_mem(const char *pre, void *m, const size_t s) +{ + const size_t str_len = s && m ? s * 3 + 1 : 8; + char *str = (char *)alloca(str_len), *p, *pend = str + str_len; + + if (m && s) + { + p = str; + for (size_t i = 0; i < s; ++i, p += 3) + sprintf(p, "%02x ", ((unsigned char *)m)[i]); + } + else + memcpy(str, "*EMPTY*", 8); + if (p >= pend) + dprint("dump_mem: output buffer exhausted.\n"); + dprint("%s: %p: (len = %d) %s\n", pre, m, s, str); +} + +void +handle_incoming_message(MPI_Status *status_in, MPI_Message *msg_han, + const int cnt) +{ + int ierr = 0; + MPI_Comm comm; + void *src_ptr, *baseptr, *buffer, *dst_ptr, *get_data; + int flag; + size_t charlen, send_size; + int free_buffer, i; + ct_msg_t *msg = alloca(cnt); + + ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, msg_han, status_in); + chk_err(ierr); + dprint("ct: Received request of size %ld.\n", cnt); + + ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &baseptr, &flag); + chk_err(ierr); + dprint("ct: Local base for win %ld is %p (set: %b) Executing accessor at " + "index %d address %p.\n", + msg->win, baseptr, flag, msg->accessor_index, + accessor_hash_table[msg->accessor_index].accessor); + if (!flag) + { + dprint("ct: Error: Window %p memory is not allocated.\n", msg->win); + } + if (msg->flags & CT_DST_HAS_DESC) + { + buffer = msg->data; + ((gfc_descriptor_t *)buffer)->base_addr = NULL; + get_data = msg->data + sizeof(gfc_descriptor_t) + + GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)buffer) + * sizeof(descriptor_dimension); + // dump_mem(buffer, sizeof(gfc_descriptor_t) + // + GFC_DESCRIPTOR_RANK((gfc_descriptor_t + // *)buffer) + // * sizeof(descriptor_dimension)); + /* The destination is a descriptor which address is not mutable. */ + dst_ptr = buffer; + } + else + { + get_data = msg->data; + /* The destination is raw memory block, which adress is mutable. */ + buffer = NULL; + dst_ptr = &buffer; + dprint("ct: dst_ptr: %p, buffer: %p.\n", dst_ptr, buffer); + } + if (msg->flags & CT_SRC_HAS_DESC) + { + ((gfc_descriptor_t *)get_data)->base_addr = baseptr; + src_ptr = get_data; + get_data += sizeof(gfc_descriptor_t) + + GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)src_ptr) + * sizeof(descriptor_dimension); + dprint("ct: src_desc base: %p, rank: %d, offset: %d.\n", + ((gfc_descriptor_t *)src_ptr)->base_addr, + GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)src_ptr), + ((gfc_descriptor_t *)src_ptr)->offset); + for (int i = 0; i < GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)src_ptr); ++i) + dprint("ct: src_desc (dim: %d) lb: %d, ub: %d, stride: %d\n", i, + ((gfc_descriptor_t *)src_ptr)->dim[i].lower_bound, + ((gfc_descriptor_t *)src_ptr)->dim[i]._ubound, + ((gfc_descriptor_t *)src_ptr)->dim[i]._stride); + // dump_mem("ct", ((gfc_descriptor_t *)baseptr)->base_addr, + // (((gfc_descriptor_t *)baseptr)->dim[0]._ubound + 1 + // - ((gfc_descriptor_t *)baseptr)->dim[0].lower_bound + // + ((gfc_descriptor_t *)baseptr)->offset) + // * GFC_DESCRIPTOR_SIZE((gfc_descriptor_t *)baseptr)); + } + else + src_ptr = baseptr; + + charlen = msg->dest_opt_charlen; + accessor_hash_table[msg->accessor_index].accessor( + dst_ptr, &free_buffer, src_ptr, get_data, &charlen, &msg->opt_charlen); + dprint("ct: getter executed.\n"); + comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD; + if (msg->flags & CT_DST_HAS_DESC) + { + size_t dsize = ((gfc_descriptor_t *)dst_ptr)->span; + dprint("ct: dst_desc base: %p, rank: %d, offset: %d.\n", + ((gfc_descriptor_t *)dst_ptr)->base_addr, + GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)dst_ptr), + ((gfc_descriptor_t *)dst_ptr)->offset); + for (int i = 0; i < GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)dst_ptr); ++i) + { + dprint("ct: dst_desc (dim: %d) lb: %d, ub: %d, stride: %d, extend: " + "%d\n", + i, ((gfc_descriptor_t *)dst_ptr)->dim[i].lower_bound, + ((gfc_descriptor_t *)dst_ptr)->dim[i]._ubound, + ((gfc_descriptor_t *)dst_ptr)->dim[i]._stride, + GFC_DESCRIPTOR_EXTENT((gfc_descriptor_t *)dst_ptr, i)); + dsize *= GFC_DESCRIPTOR_EXTENT((gfc_descriptor_t *)dst_ptr, i); + } + dump_mem("ct", ((gfc_descriptor_t *)dst_ptr)->base_addr, dsize); + buffer = ((gfc_descriptor_t *)dst_ptr)->base_addr; + if ((msg->flags & (CT_CHAR_ARRAY | CT_INCLUDE_DESCRIPTOR)) == 0) + send_size = msg->transfer_size; + else + { + if (msg->flags & CT_INCLUDE_DESCRIPTOR) + send_size = ((gfc_descriptor_t *)dst_ptr)->span; + else + send_size = charlen * msg->transfer_size; + for (i = 0; i < GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)dst_ptr); ++i) + { + const ptrdiff_t ext + = GFC_DESCRIPTOR_EXTENT((gfc_descriptor_t *)dst_ptr, i); + if (ext < 0) + dprint("ct: dst extend in dim %d is < 0: %ld.\n", i, ext); + send_size *= ext; + } + } + if (msg->flags & CT_INCLUDE_DESCRIPTOR) + { + const size_t desc_size + = sizeof(gfc_descriptor_t) + + GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)dst_ptr) + * sizeof(descriptor_dimension); + void *tbuff = malloc(desc_size + send_size); + dprint("ct: Including dst descriptor: %p, sizeof(desc): %d, rank: " + "%d, sizeof(buffer): %d, incoming free_buffer: %b.\n", + tbuff, desc_size, GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)dst_ptr), + send_size, free_buffer); + /* Copy the descriptor contents. */ + memcpy(tbuff, dst_ptr, desc_size); + /* Copy the data to the end of buffer (i.e. behind the descriptor). + * Does not copy anything, when send_size is 0. */ + memcpy(tbuff + desc_size, buffer, send_size); + if (free_buffer) + { + dprint("ct: Freeing buffer: %p.\n", buffer); + free(buffer); + } + /* For debugging only: */ + ((gfc_descriptor_t *)tbuff)->base_addr = tbuff + desc_size; + free_buffer = true; + buffer = tbuff; + send_size += desc_size; + } + } + else + { + buffer = *(void **)dst_ptr; + dprint("ct: dst_ptr: %p, buffer: %p.\n", dst_ptr, buffer); + send_size = charlen * msg->transfer_size; + dprint("ct: buffer %p, send_size: %d.\n", buffer, send_size); + } + dump_mem("ct", buffer, send_size); + dprint("ct: Sending %ld bytes (kind: %d, cl: %d) to image %d, tag %d " + "on comm %x " + "(%s).\n", + send_size, msg->transfer_size, charlen, msg->dest_image, msg->dest_tag, + comm, comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM"); + ierr = MPI_Send(buffer, send_size, MPI_BYTE, msg->dest_image, msg->dest_tag, + comm); + chk_err(ierr); + if (free_buffer) + { + dprint("ct: going to free buffer: %p (&buffer: %p).\n", buffer, &buffer); + free(buffer); + } +} + +void * +communication_thread(void *) +{ + int ierr = 0, cnt; + MPI_Status status; + MPI_Message msg_han; + void *baseptr; + + pthread_t self; + pthread_attr_t pattr; + size_t stacksize; + self = pthread_self(); + pthread_getattr_np(self, &pattr); + pthread_attr_getstacksize(&pattr, &stacksize); + dprint("ct: Started witch stacksize: %ld.\n", stacksize); + + do + { + dprint("ct: Probing for incoming message.\n"); + ierr = MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ct_COMM, &msg_han, &status); + chk_err(ierr); + dprint("ct: Message received from %d, tag %d, mpi-status: %d, processing " + "...\n", + status.MPI_SOURCE, status.MPI_TAG, status.MPI_ERROR); + if (status.MPI_TAG == CAF_CT_TAG && status.MPI_ERROR == MPI_SUCCESS) + { + MPI_Get_count(&status, MPI_BYTE, &cnt); + + if (cnt >= sizeof(ct_msg_t)) + { + handle_incoming_message(&status, &msg_han, cnt); + } + else if (!commthread_running) + { + /* Pickup empty message. */ + dprint("ct: Got termination message. Terminating.\n"); + baseptr = NULL; + ierr = MPI_Mrecv(baseptr, cnt, MPI_BYTE, &msg_han, &status); + chk_err(ierr); + } + else + { + dprint("ct: Error: message to small, ignoring (got: %ld, exp: %ld).\n", + cnt, sizeof(ct_msg_t)); + } + } + else if (ierr == MPI_SUCCESS) + { + /* There is a message, but not for us. */ + dprint("ct: Message not for us received. Setting it free again.\n"); + // ierr = MPI_Request_free(&msg_han); + chk_err(ierr); + } + else + chk_err(ierr); + } while (commthread_running); + dprint("ct: Ended.\n"); + return NULL; +} + /* Keep in sync with single.c. */ static void @@ -842,7 +1143,7 @@ PREFIX(init)(int *argc, char ***argv) if (caf_num_images == 0) { int ierr = 0, i = 0, j = 0, rc, prov_lev = 0; - int is_init = 0, prior_thread_level = MPI_THREAD_FUNNELED; + int is_init = 0, prior_thread_level = MPI_THREAD_MULTIPLE; ierr = MPI_Initialized(&is_init); chk_err(ierr); @@ -851,6 +1152,7 @@ PREFIX(init)(int *argc, char ***argv) ierr = MPI_Query_thread(&prior_thread_level); chk_err(ierr); } + dprint("Main thread: thread level: %d\n", prior_thread_level); #ifdef HELPER if (is_init) { @@ -991,6 +1293,11 @@ PREFIX(init)(int *argc, char ***argv) *win_model, flag); } #endif + + ierr = MPI_Comm_dup(CAF_COMM_WORLD, &ct_COMM); + chk_err(ierr); + ierr = pthread_create(&commthread, NULL, &communication_thread, NULL); + chk_err(ierr); } } @@ -1060,9 +1367,12 @@ finalize_internal(int status_code) /* Add a conventional barrier to prevent images from quitting too early. */ if (status_code == 0) { - dprint("In barrier for finalize..."); - ierr = MPI_Barrier(CAF_COMM_WORLD); - chk_err(ierr); + if (caf_num_images > 1) + { + dprint("In barrier for finalize..."); + ierr = MPI_Barrier(CAF_COMM_WORLD); + chk_err(ierr); + } } else /* Without failed images support, but a given status_code, we need to @@ -1127,6 +1437,17 @@ finalize_internal(int status_code) chk_err(ierr); #endif // MPI_VERSION + dprint("Sending termination signal to communication thread.\n"); + commthread_running = false; + ierr = MPI_Send(NULL, 0, MPI_BYTE, mpi_this_image, CAF_CT_TAG, ct_COMM); + chk_err(ierr); + dprint("Termination signal send, waiting for thread join.\n"); + ierr = pthread_join(commthread, NULL); + dprint("Communication thread terminated with rc = %d.\n", ierr); + dprint("Freeing ct_COMM.\n"); + MPI_Comm_free(&ct_COMM); + dprint("Freeed ct_COMM.\n"); + /* Free the global dynamic window. */ ierr = MPI_Win_free(&global_dynamic_win); chk_err(ierr); @@ -1201,6 +1522,7 @@ finalize_internal(int status_code) caf_is_finalized = 1; #endif free(sync_handles); + dprint("Finalisation done!!!\n"); } @@ -1263,8 +1585,8 @@ void PREFIX(register)(size_t size, caf_register_t type, caf_token_t *token, else actual_size = size; - dprint("size = %zd, type = %d, token = %p, desc = %p\n", size, type, token, - desc); + dprint("size = %zd, type = %d, token = %p, desc = %p, rank = %d\n", size, + type, token, desc, GFC_DESCRIPTOR_RANK(desc)); switch (type) { case CAF_REGTYPE_COARRAY_ALLOC_REGISTER_ONLY: @@ -3550,6 +3872,13 @@ PREFIX(send)(caf_token_t token, size_t offset, int image_index, } } +void +get_access(void **dst, bool *dst_is_tmp, void *base, void *) +{ + *dst = base; + *dst_is_tmp = false; +} + /* Get array data from a remote src to a local dest. */ void @@ -3691,11 +4020,26 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index, { const size_t trans_size = ((dst_size > src_size) ? src_size : dst_size) * size; - CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p); - ierr = MPI_Get(dest->base_addr, trans_size, MPI_BYTE, remote_image, - offset, trans_size, MPI_BYTE, *p); + ct_msg_t *msg = alloca(sizeof(ct_msg_t)); + msg->win = *p; + msg->transfer_size = trans_size; + msg->dest_image = mpi_this_image; + msg->dest_tag = CAF_CT_TAG + 1; + msg->flags = 0; + // msg->accessor_index = get_index; + ierr = MPI_Send(msg, sizeof(ct_msg_t), MPI_BYTE, remote_image, + CAF_CT_TAG, ct_COMM); chk_err(ierr); - CAF_Win_unlock(remote_image, *p); + ierr + = MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1, + msg->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE); + chk_err(ierr); + + // CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p); + // ierr = MPI_Get(dest->base_addr, trans_size, MPI_BYTE, remote_image, + // offset, trans_size, MPI_BYTE, *p); + // chk_err(ierr); + // CAF_Win_unlock(remote_image, *p); } else { @@ -4187,12 +4531,6 @@ get_data(void *ds, mpi_caf_token_t *token, MPI_Aint offset, int dst_type, num = (abs_stride > 1) ? (1 + (num - 1) / abs_stride) : num; \ } while (0) -/* Convenience macro to get the extent of a descriptor in a certain dimension - * - * Copied from gcc:libgfortran/libgfortran.h. */ -#define GFC_DESCRIPTOR_EXTENT(desc, i) \ - ((desc)->dim[i]._ubound + 1 - (desc)->dim[i].lower_bound) - #define sizeof_desc_for_rank(rank) \ (sizeof(gfc_descriptor_t) + (rank) * sizeof(descriptor_dimension)) @@ -4757,6 +5095,255 @@ get_for_ref(caf_reference_t *ref, size_t *i, size_t dst_index, } } +#ifdef GCC_GE_15 +void +PREFIX(register_accessor)(const int hash, accessor_t accessor) +{ + if (accessor_hash_table_state == AHT_UNINITIALIZED) + { + aht_cap = 16; + accessor_hash_table = calloc(aht_cap, sizeof(struct accessor_hash_t)); + accessor_hash_table_state = AHT_OPEN; + } + if (aht_size == aht_cap) + { + aht_cap += 16; + accessor_hash_table = realloc(accessor_hash_table, + aht_cap * sizeof(struct accessor_hash_t)); + } + if (accessor_hash_table_state == AHT_PREPARED) + { + accessor_hash_table_state = AHT_OPEN; + } + dprint("adding function %p with hash %x.\n", accessor, hash); + accessor_hash_table[aht_size].hash = hash; + accessor_hash_table[aht_size].accessor = accessor; + ++aht_size; +} + +static int +hash_compare(const struct accessor_hash_t *lhs, + const struct accessor_hash_t *rhs) +{ + return lhs->hash < rhs->hash ? -1 : (lhs->hash > rhs->hash ? 1 : 0); +} + +void +PREFIX(register_accessors_finish)() +{ + if (accessor_hash_table_state == AHT_PREPARED + || accessor_hash_table_state == AHT_UNINITIALIZED) + return; + + qsort(accessor_hash_table, aht_size, sizeof(struct accessor_hash_t), + (int (*)(const void *, const void *))hash_compare); + accessor_hash_table_state = AHT_PREPARED; + dprint("finished accessor hash table.\n"); +} + +int +PREFIX(get_remote_function_index)(const int hash) +{ + if (accessor_hash_table_state != AHT_PREPARED) + { + caf_runtime_error("the accessor hash table is not prepared."); + } + + struct accessor_hash_t cand; + cand.hash = hash; + struct accessor_hash_t *f = bsearch( + &cand, accessor_hash_table, aht_size, sizeof(struct accessor_hash_t), + (int (*)(const void *, const void *))hash_compare); + + int index = f ? f - accessor_hash_table : -1; + dprint("the index for accessor hash %x is %d.\n", hash, index); + return index; +} + +/* Get data from a remote image's memory pointed to by `token`. The image is + * given by `image_index`. `opt_src_charlen` gives the length of the source + * string on the remote image when that is a character array. `dst_size` then + * gives the number of bytes of each character. `opt_src_charlen` is null, when + * this is no character array. + * `*dst_size` gives the expected number of bytes to be stored in `*dst_data`. + * `*dst_data` gives the memory where the data is stored. This address may be + * changed, when reallocation is necessary. + * `opt_dst_charlen` is NULL when dst is not a character array, or stores the + * number a characters in `*dst_data`. + * 'opt_dst_desc' is an optional descriptor. Its address in memory is fixed, + * but its data may be changed. `getter_index` is the index in the hashtable as + * returned by `get_remote_function_index()`. `get_data` is optional data to be + * passed to the getter function. `get_data_size` is the size of the former + * data. + * + */ +void +PREFIX(get_by_ct)(caf_token_t token, const gfc_descriptor_t *opt_src_desc, + const size_t *opt_src_charlen, const int image_index, + const size_t dst_size, void **dst_data, + size_t *opt_dst_charlen, gfc_descriptor_t *opt_dst_desc, + const bool may_realloc_dst, const int getter_index, + void *get_data, const size_t get_data_size, int *stat, + caf_team_t *team __attribute__((unused)), + int *team_number __attribute__((unused))) +{ + MPI_Group current_team_group, win_group; + int ierr, this_image, remote_image; + int trans_ranks[2]; + bool free_t_buff, free_msg; + void *t_buff; + ct_msg_t *msg; + const bool dst_incl_desc = opt_dst_desc && may_realloc_dst, + has_src_desc = opt_src_desc; + const size_t dst_desc_size + = opt_dst_desc ? sizeof(gfc_descriptor_t) + + GFC_DESCRIPTOR_RANK(opt_dst_desc) + * sizeof(descriptor_dimension) + : 0, + src_desc_size = has_src_desc ? sizeof(gfc_descriptor_t) + + GFC_DESCRIPTOR_RANK(opt_src_desc) + * sizeof(descriptor_dimension) + : 0, + msg_size + = sizeof(ct_msg_t) + dst_desc_size + src_desc_size + get_data_size; + + if (stat) + *stat = 0; + + // Get mapped remote image + ierr = MPI_Comm_group(CAF_COMM_WORLD, ¤t_team_group); + chk_err(ierr); + ierr = MPI_Win_get_group(*TOKEN(token), &win_group); + chk_err(ierr); + ierr = MPI_Group_translate_ranks(current_team_group, 2, + (int[]){image_index - 1, mpi_this_image}, + win_group, trans_ranks); + chk_err(ierr); + remote_image = trans_ranks[0]; + this_image = trans_ranks[1]; + ierr = MPI_Group_free(¤t_team_group); + chk_err(ierr); + ierr = MPI_Group_free(&win_group); + chk_err(ierr); + + check_image_health(remote_image, stat); + + dprint("Entering get_by_ct(), token = %p, win_rank = %d, this_rank = %d, " + "getter index = " + "%d, sizeof(src_desc) = %d, sizeof(dst_desc) = %d.\n", + token, remote_image, this_image, getter_index, src_desc_size, + dst_desc_size); + + // create get msg + if ((free_msg = (((msg = alloca(msg_size))) == NULL))) + { + msg = malloc(msg_size); + if (msg == NULL) + caf_runtime_error("Unable to allocate memory " + "for internal message in get_by_ct()."); + } + msg->win = *TOKEN(token); + msg->transfer_size = dst_size; + msg->opt_charlen = opt_src_charlen ? *opt_src_charlen : 0; + msg->dest_image = mpi_this_image; + msg->dest_tag = CAF_CT_TAG + 1; + msg->dest_opt_charlen = opt_dst_charlen ? *opt_dst_charlen : 1; + msg->flags = (opt_dst_desc ? CT_DST_HAS_DESC : 0) + | (has_src_desc ? CT_SRC_HAS_DESC : 0) + | (opt_src_charlen ? CT_CHAR_ARRAY : 0) + | (dst_incl_desc ? CT_INCLUDE_DESCRIPTOR : 0); + dprint("message flags: %x.\n", msg->flags); + msg->accessor_index = getter_index; + if (opt_dst_desc) + memcpy(msg->data, opt_dst_desc, dst_desc_size); + if (has_src_desc) + memcpy(msg->data + dst_desc_size, opt_src_desc, src_desc_size); + + memcpy(msg->data + dst_desc_size + src_desc_size, get_data, get_data_size); + + // call get on remote + ierr = MPI_Send(msg, msg_size, MPI_BYTE, remote_image, CAF_CT_TAG, ct_COMM); + chk_err(ierr); + + if (!opt_dst_charlen && !dst_incl_desc) + { + // allocate local buffer + if ((free_t_buff = (((t_buff = alloca(dst_size))) == NULL))) + { + t_buff = malloc(dst_size); + if (t_buff == NULL) + caf_runtime_error("Unable to allocate memory " + "for internal buffer in get_by_ct()."); + } + dprint("waiting to receive %d bytes from %d.\n", dst_size, image_index - 1); + ierr = MPI_Recv(t_buff, dst_size, MPI_BYTE, image_index - 1, msg->dest_tag, + CAF_COMM_WORLD, MPI_STATUS_IGNORE); + chk_err(ierr); + dprint("received %d bytes as requested from %d.\n", dst_size, + image_index - 1); + // dump_mem("get_by_ct", t_buff, dst_size); + memcpy(*dst_data, t_buff, dst_size); + + if (free_t_buff) + free(t_buff); + } + else + { + MPI_Status status; + MPI_Message msg_han; + int cnt; + + dprint("probing for incoming message from %d, tag %d.\n", image_index - 1, + msg->dest_tag); + ierr = MPI_Mprobe(image_index - 1, msg->dest_tag, CAF_COMM_WORLD, &msg_han, + &status); + chk_err(ierr); + if (ierr == MPI_SUCCESS) + { + MPI_Get_count(&status, MPI_BYTE, &cnt); + dprint("get message of %d bytes from image %d, tag %d, dest_addr %p.\n", + cnt, image_index - 1, msg->dest_tag, *dst_data); + if (may_realloc_dst) + *dst_data = realloc(*dst_data, cnt); + // else // max cnt + ierr = MPI_Mrecv(*dst_data, cnt, MPI_BYTE, &msg_han, &status); + chk_err(ierr); + if (opt_dst_charlen) + *opt_dst_charlen = cnt / dst_size; + if (dst_incl_desc) + { + const size_t desc_size + = sizeof(gfc_descriptor_t) + + GFC_DESCRIPTOR_RANK((gfc_descriptor_t *)(*dst_data)) + * sizeof(descriptor_dimension); + dprint("refitting dst descriptor of size %d at %p with data %d at %p " + "from %d bytes " + "transfered.\n", + desc_size, opt_dst_desc, cnt - desc_size, *dst_data, cnt); + memcpy(opt_dst_desc, *dst_data, desc_size); + memmove(*dst_data, (*dst_data) + desc_size, cnt - desc_size); + opt_dst_desc->base_addr = *dst_data + = realloc(*dst_data, cnt - desc_size); + dump_mem("ret data", opt_dst_desc->base_addr, cnt - desc_size); + } + } + else + { + int err_len; + char err_str[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status.MPI_ERROR, err_str, &err_len); + caf_runtime_error("Got MPI error %d retrieving result: %s", + status.MPI_ERROR, err_str); + } + } + + if (free_msg) + free(msg); + + dprint("done with get_by_ct.\n"); +} +#endif + void PREFIX(get_by_ref)(caf_token_t token, int image_index, gfc_descriptor_t *dst, caf_reference_t *refs, int dst_kind, int src_kind, diff --git a/src/tests/unit/send-get/CMakeLists.txt b/src/tests/unit/send-get/CMakeLists.txt index 7dff2b4d..e2143719 100644 --- a/src/tests/unit/send-get/CMakeLists.txt +++ b/src/tests/unit/send-get/CMakeLists.txt @@ -10,6 +10,7 @@ caf_compile_executable(get_convert_char_array get_convert_char_array.f90) caf_compile_executable(get_with_offset_1d get_with_offset_1d.f90) caf_compile_executable(whole_get_array whole_get_array.f90) caf_compile_executable(strided_get strided_get.f90) +caf_compile_executable(get_with_1d_vector_index get_with_1d_vector_index.f90) caf_compile_executable(get_with_vector_index get_with_vector_index.f90) ## Inquiry functions (these are gets that could be optimized in the future to communicate only the descriptors) caf_compile_executable(alloc_comp_multidim_shape alloc_comp_multidim_shape.F90) diff --git a/src/tests/unit/send-get/get_with_1d_vector_index.f90 b/src/tests/unit/send-get/get_with_1d_vector_index.f90 new file mode 100644 index 00000000..35e26c15 --- /dev/null +++ b/src/tests/unit/send-get/get_with_1d_vector_index.f90 @@ -0,0 +1,56 @@ +program get_with_1d_vector_index + use iso_fortran_env + implicit none + integer, parameter :: nloc=8, nhl=2, ivsize=nloc+2*nhl + real :: xv(ivsize)[*] + real, allocatable :: expected(:) + integer :: rmt_idx(2), loc_idx(2) + integer, allocatable :: xchg(:) + integer :: nrcv, me, np, nxch, i, iv + character(len=120) :: fmt + + me = this_image() + np = num_images() + + if (np==1) then + xchg = [ integer :: ] + + else if (me == 1) then + xchg = [me+1] + else if (me == np) then + xchg = [me-1] + else + xchg = [me-1, me+1] + end if + nxch = size(xchg) + nrcv = nxch * nhl + + allocate(expected(nxch)) + xv(1:nloc) = [(i,i=(me-1)*nloc+1,me*nloc)] + iv = nloc + 1 + loc_idx(1:nhl) = [ (i,i=iv,iv+nhl-1) ] + rmt_idx(1:nhl) = [ (i,i=1,nhl) ] + + sync images(xchg) + iv = nloc + 1 + + xv(iv:iv+nhl-1) = xv(rmt_idx(1:nhl))[xchg(1)] + print *, me, ":", xv + iv = iv + nhl + if (me == 1) then + expected(:) = nloc + rmt_idx(1:nhl) + else + expected(:) = ((me - 2) * nloc) + rmt_idx(1:nhl) + end if + + sync all + if (any(xv(loc_idx(1:nhl)) /= expected(:))) then + write(fmt,*) '( i0,a,',nhl,'(f5.0,1x),a,',nhl,'(f5.0,1x) )' + write(*,fmt) me,': is:',xv(loc_idx(1:nhl)),', exp:',expected(1:nhl) + + error stop 'Test failed.' + end if + + sync all + if (me == 1) print *, 'Test passed.' +end program get_with_1d_vector_index diff --git a/src/tests/unit/send-get/get_with_offset_1d.f90 b/src/tests/unit/send-get/get_with_offset_1d.f90 index 417d20d8..54d291ca 100644 --- a/src/tests/unit/send-get/get_with_offset_1d.f90 +++ b/src/tests/unit/send-get/get_with_offset_1d.f90 @@ -9,11 +9,7 @@ program get_offset_1d allocate(a(100)[*],b(10)) - a = (/ (i, i=1,100) /) - - do i=1,100 - a(i) = a(i) + me - enddo + a = (/ (i + me, i=1,100) /) sync all