diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 141c3c33478..2525b91b6a1 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -136,6 +136,7 @@ struct ucp_request { ucp_tag_t tag; uintptr_t rreq_ptr; /* receive request ptr on the recv side (used in AM rndv) */ + uint64_t rts_send_seq; /* sequence number of actual rts send */ } tag; struct { @@ -270,6 +271,7 @@ struct ucp_request { ucp_worker_t *worker; uct_tag_context_t uct_ctx; /* Transport offload context */ unsigned prev_flags; + uint64_t req_id; union { struct { @@ -344,6 +346,7 @@ struct ucp_recv_desc { uint32_t length; /* Received length */ uint32_t payload_offset; /* Offset from end of the descriptor * to AM data */ + uint64_t rndv_rts_seq; /* RNDV: rts sequence number */ uint16_t flags; /* Flags */ int16_t priv_length; /* Number of bytes consumed from headroom private space, except the diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 57c09f32d3c..e194ba9be94 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -1706,6 +1706,8 @@ ucs_status_t ucp_worker_create(ucp_context_h context, worker->am_message_id = ucs_generate_uuid(0); worker->rkey_ptr_cb_id = UCS_CALLBACKQ_ID_NULL; worker->rndv_req_id = 1; + worker->rndv_rts_send_seq = 0; + worker->rndv_rts_recv_seq = 0; ucs_queue_head_init(&worker->rkey_ptr_reqs); ucs_list_head_init(&worker->arm_ifaces); ucs_list_head_init(&worker->stream_ready_eps); diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 8c0f128ae7d..7ea6a5f4632 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -234,6 +234,9 @@ typedef struct ucp_worker { khash_t(ucp_worker_rndv_req_ptrs) rndv_req_ptrs; uint64_t rndv_req_id; + uint64_t rndv_rts_send_seq; + uint64_t rndv_rts_recv_seq; + ucp_ep_match_ctx_t ep_match_ctx; /* Endpoint-to-endpoint matching context */ ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces, one for each resource */ @@ -375,4 +378,11 @@ ucp_worker_sockaddr_is_cm_proto(const ucp_worker_h worker) return !!ucp_worker_num_cm_cmpts(worker); } +static inline ucp_tag_rndv_debug_entry_t* +ucp_worker_rndv_debug_entry(ucp_worker_h worker, uint64_t req_id) +{ + size_t elem_index = req_id % worker->tm.rndv_debug.queue_length; + return &worker->tm.rndv_debug.queue[elem_index]; +} + #endif diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index c7dbbe70b9b..80fec50ada7 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -152,7 +152,7 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_rndv_cb, ucs_assert(header_length >= sizeof(ucp_rndv_rts_hdr_t)); if (UCP_MEM_IS_ACCESSIBLE_FROM_CPU(req->recv.mem_type)) { - ucp_rndv_matched(req->recv.worker, req, header); + ucp_rndv_matched(req->recv.worker, req, header, 0); } else { /* SW rendezvous request is stored in the user buffer (temporarily) when matched. If user buffer allocated on GPU memory, need to "pack" @@ -160,7 +160,7 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_rndv_cb, header_host_copy = ucs_alloca(header_length); ucp_mem_type_pack(req->recv.worker, header_host_copy, header, header_length, req->recv.mem_type); - ucp_rndv_matched(req->recv.worker, req, header_host_copy); + ucp_rndv_matched(req->recv.worker, req, header_host_copy, 0); } ucp_tag_offload_release_buf(req, 0); diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 6beb84f3603..99fee376b9c 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -65,6 +65,114 @@ static void ucp_rndv_complete_send(ucp_request_t *sreq, ucs_status_t status) ucp_request_complete_send(sreq, status); } +/* add debug entry for rndv send flow */ +static ucp_tag_rndv_debug_entry_t* +ucp_rndv_add_debug_entry_common(ucp_request_t *req) +{ + ucp_worker_h worker = req->send.ep->worker; + ucp_tag_rndv_debug_entry_t *entry = ucp_worker_rndv_debug_entry(worker, + req->send.rndv_req_id); + entry->id = req->send.rndv_req_id; + entry->ep = req->send.ep; + entry->local_address = req->send.buffer; + entry->size = req->send.length; + + return entry; +} + +/* add debug entry for rndv_get flow */ +static void +ucp_rndv_get_req_add_debug_entry(ucp_request_t *rndv_req, + const ucp_rndv_rts_hdr_t *rndv_rts_hdr, + uint64_t rts_seq) +{ + ucp_worker_h worker = rndv_req->send.ep->worker; + ucp_request_t *rreq = rndv_req->send.rndv_get.rreq; + ucp_tag_rndv_debug_entry_t *entry; + + /* set request id */ + rndv_req->send.rndv_req_id = worker->rndv_req_id; + worker->rndv_req_id++; + + /* add entry for rndv_get request */ + entry = ucp_rndv_add_debug_entry_common(rndv_req); + entry->type = "rndv_get"; + entry->rts_seq = rts_seq; + entry->send_tag = rndv_rts_hdr->super.tag; + entry->recv_tag = rreq->recv.tag.tag; + entry->remote_address = rndv_req->send.rndv_get.remote_address; + entry->remote_reqptr = rndv_req->send.rndv_get.remote_request; + entry->rndv_get_req = rndv_req; + entry->recv_req = rreq; + entry->send_req = NULL; + + /* add more data in existing receive request entry */ + entry = ucp_worker_rndv_debug_entry(worker, rreq->recv.req_id); + entry->ep = rndv_req->send.ep; + entry->rts_seq = rts_seq; + entry->send_tag = rndv_rts_hdr->super.tag; + entry->remote_address = rndv_req->send.rndv_get.remote_address; + entry->remote_reqptr = rndv_req->send.rndv_get.remote_request; + entry->rndv_get_req = rndv_req; + entry->send_req = NULL; +} + +/* add debug entry for rndv send flow */ +static void +ucp_rndv_send_add_debug_entry(ucp_request_t *req) +{ + ucp_tag_rndv_debug_entry_t *entry; + + entry = ucp_rndv_add_debug_entry_common(req); + entry->type = "rndv_send"; + entry->rts_seq = 0; + entry->send_tag = req->send.msg_proto.tag.tag; + entry->recv_tag = 0; + entry->remote_address = 0; + entry->remote_reqptr = 0; + entry->rndv_get_req = NULL; + entry->recv_req = NULL; + entry->send_req = req; +} + +/* to be used from debugger */ +void ucp_rndv_print_debug_data(ucp_worker_h worker, const char *filename, + ucp_tag_t send_tag) +{ + ucp_tag_rndv_debug_entry_t *entry; + size_t i, count; + FILE *file; + + if (filename == NULL) { + file = stdout; + } else { + file = fopen(filename, "w"); + if (file == NULL) { + fprintf(stderr, "cannot open %s: %m\n", filename); + return; + } + } + + count = ucs_min(worker->tm.rndv_debug.queue_length, worker->rndv_req_id); + for (i = 0; i < count; ++i) { + entry = &worker->tm.rndv_debug.queue[i]; + if ((send_tag != 0) && (send_tag != entry->send_tag)) { + continue; + } + fprintf(file, + "%s id %lu rts_seq %lu stag 0x%lx rtag 0x%lx rva 0x%lx rmreq 0x%lx " + "lva %p sz %zu greq %p rreq %p sreq %p\n", + entry->type, entry->id, entry->rts_seq, entry->send_tag, + entry->recv_tag, entry->remote_address, entry->remote_reqptr, + entry->local_address, entry->size, entry->rndv_get_req, + entry->recv_req, entry->send_req); + } + + if (filename != NULL) { + fclose(file); + } +} + size_t ucp_tag_rndv_rts_pack(void *dest, void *arg) { ucp_request_t *sreq = arg; /* send request */ @@ -108,14 +216,21 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_rndv_rts, (self), uct_pending_req_t *self) { ucp_request_t *sreq = ucs_container_of(self, ucp_request_t, send.uct); + ucp_ep_h ep = sreq->send.ep; + ucp_worker_h worker = ep->worker; size_t packed_rkey_size; ucs_status_t status; /* send the RTS. the pack_cb will pack all the necessary fields in the RTS */ - packed_rkey_size = ucp_ep_config(sreq->send.ep)->tag.rndv.rkey_size; + packed_rkey_size = ucp_ep_config(ep)->tag.rndv.rkey_size; status = ucp_do_am_single(self, UCP_AM_ID_RNDV_RTS, ucp_tag_rndv_rts_pack, sizeof(ucp_rndv_rts_hdr_t) + packed_rkey_size); if (status == UCS_OK) { + sreq->send.msg_proto.tag.rts_send_seq = worker->rndv_rts_send_seq++; + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + ucp_worker_rndv_debug_entry(worker, sreq->send.rndv_req_id)->rts_seq = + sreq->send.msg_proto.tag.rts_send_seq; + } sreq->flags |= UCP_REQUEST_FLAG_RNDV_RTS_SENT; return status; } else if (status == UCS_ERR_NO_RESOURCE) { @@ -256,6 +371,11 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq) sreq->flags |= UCP_REQUEST_FLAG_SEND_RNDV; + sreq->send.rndv_req_id = worker->rndv_req_id++; + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + ucp_rndv_send_add_debug_entry(sreq); + } + status = ucp_ep_resolve_dest_ep_ptr(ep, sreq->send.lane); if (status != UCS_OK) { return status; @@ -272,19 +392,17 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *sreq) /* add the rndv send request to a hash on the worker. the key is a unique * value on the worker */ khiter = kh_put(ucp_worker_rndv_req_ptrs, &worker->rndv_req_ptrs, - worker->rndv_req_id, &ret); + sreq->send.rndv_req_id, &ret); if (ret < 1 ) { ucs_warn("failed to add rndv req id (%zu) to worker %p rndv req ptrs hash", - worker->rndv_req_id, worker); + sreq->send.rndv_req_id, worker); } - sreq->send.rndv_req_id = worker->rndv_req_id; kh_value(&worker->rndv_req_ptrs, khiter) = (uintptr_t)sreq; ucs_debug("added sreq %p to hash with key %zu. worker %p", sreq, sreq->send.rndv_req_id, worker); - worker->rndv_req_id++; return status; } @@ -713,32 +831,9 @@ ucp_rndv_req_init_zcopy_lane_map(ucp_request_t *rndv_req) rndv_req->send.rndv_get.lanes_count = ucs_popcount(lane_map); } -static void ucp_rndv_req_add_debug_entry(ucp_request_t *rndv_req, - const ucp_rndv_rts_hdr_t *rndv_rts_hdr) -{ - ucp_worker_h worker = rndv_req->send.ep->worker; - ucp_tag_rndv_debug_entry_t *entry; - size_t elem_index; - - /* set request id */ - rndv_req->send.rndv_req_id = worker->rndv_req_id; - worker->rndv_req_id++; - - elem_index = rndv_req->send.rndv_req_id % - worker->tm.rndv_debug.queue_length; - entry = &worker->tm.rndv_debug.queue[elem_index]; - entry->id = rndv_req->send.rndv_req_id; - entry->ep = rndv_req->send.ep; - entry->send_tag = rndv_rts_hdr->super.tag; - entry->recv_tag = rndv_req->send.rndv_get.rreq->recv.tag.tag; - entry->remote_address = rndv_req->send.rndv_get.remote_address; - entry->local_address = rndv_req->send.buffer; - entry->size = rndv_req->send.length; - entry->req = rndv_req; -} - static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rreq, - const ucp_rndv_rts_hdr_t *rndv_rts_hdr) + const ucp_rndv_rts_hdr_t *rndv_rts_hdr, + uint64_t rts_seq) { ucp_worker_h worker = rndv_req->send.ep->worker; ucs_status_t status; @@ -769,7 +864,7 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr ucp_rndv_req_init_zcopy_lane_map(rndv_req); if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { - ucp_rndv_req_add_debug_entry(rndv_req, rndv_rts_hdr); + ucp_rndv_get_req_add_debug_entry(rndv_req, rndv_rts_hdr, rts_seq); } if (worker->context->config.ext.rdnv_defer_sched) { @@ -993,9 +1088,10 @@ ucp_rndv_test_zcopy_scheme_support(size_t length, size_t min_zcopy, /* or can the message be split? */ split); } -UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), +UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr, rts_seq), ucp_worker_h worker, ucp_request_t *rreq, - const ucp_rndv_rts_hdr_t *rndv_rts_hdr) + const ucp_rndv_rts_hdr_t *rndv_rts_hdr, + uint64_t rts_seq) { ucp_rndv_mode_t rndv_mode; ucp_request_t *rndv_req; @@ -1064,7 +1160,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr), ep_config->tag.rndv.max_get_zcopy, ep_config->tag.rndv.get_zcopy_split)) { /* try to fetch the data with a get_zcopy operation */ - ucp_rndv_req_send_rma_get(rndv_req, rreq, rndv_rts_hdr); + ucp_rndv_req_send_rma_get(rndv_req, rreq, rndv_rts_hdr, rts_seq); goto out; } else if (rndv_mode == UCP_RNDV_MODE_AUTO) { /* check if we need pipelined memtype staging */ @@ -1148,6 +1244,9 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, ucp_recv_desc_t *rdesc; ucp_request_t *rreq; ucs_status_t status; + uint64_t seq; + + seq = worker->rndv_rts_recv_seq++; if (rndv_rts_hdr->status == UCS_ERR_CANCELED) { ucp_rndv_unexp_cancel(worker, rndv_rts_hdr); @@ -1156,7 +1255,7 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, rreq = ucp_tag_exp_search(&worker->tm, rndv_rts_hdr->super.tag); if (rreq != NULL) { - ucp_rndv_matched(worker, rreq, rndv_rts_hdr); + ucp_rndv_matched(worker, rreq, rndv_rts_hdr, seq); /* Cancel req in transport if it was offloaded, because it arrived as unexpected */ @@ -1169,6 +1268,7 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, sizeof(*rndv_rts_hdr), UCP_RECV_DESC_FLAG_RNDV, 0, &rdesc); if (!UCS_STATUS_IS_ERR(status)) { + rdesc->rndv_rts_seq = seq; ucp_tag_unexp_recv(&worker->tm, rdesc, rndv_rts_hdr->super.tag); } } diff --git a/src/ucp/tag/rndv.h b/src/ucp/tag/rndv.h index 9ef0310f181..4aab0d626c1 100644 --- a/src/ucp/tag/rndv.h +++ b/src/ucp/tag/rndv.h @@ -52,7 +52,8 @@ ucs_status_t ucp_tag_send_start_rndv(ucp_request_t *req); void ucp_tag_rndv_cancel(ucp_request_t *sreq); void ucp_rndv_matched(ucp_worker_h worker, ucp_request_t *req, - const ucp_rndv_rts_hdr_t *rndv_rts_hdr); + const ucp_rndv_rts_hdr_t *rndv_rts_hdr, + uint64_t rts_seq); ucs_status_t ucp_rndv_progress_rma_get_zcopy(uct_pending_req_t *self); diff --git a/src/ucp/tag/tag_match.h b/src/ucp/tag/tag_match.h index 7955d179972..abe24fc3d34 100644 --- a/src/ucp/tag/tag_match.h +++ b/src/ucp/tag/tag_match.h @@ -56,14 +56,19 @@ KHASH_INIT(ucp_tag_frag_hash, uint64_t, ucp_tag_frag_match_t, 1, typedef struct ucp_tag_rndv_debug_entry { + const char *type; uint64_t id; + uint64_t rts_seq; ucp_ep_h ep; ucp_tag_t send_tag; ucp_tag_t recv_tag; uintptr_t remote_address; + uintptr_t remote_reqptr; void *local_address; size_t size; - ucp_request_t *req; + ucp_request_t *rndv_get_req; + ucp_request_t *send_req; + ucp_request_t *recv_req; } ucp_tag_rndv_debug_entry_t; diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 1da95eb2e77..7f26fff3747 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -34,6 +34,27 @@ ucp_tag_recv_request_completed(ucp_request_t *req, ucs_status_t status, UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", 0); } +static void +ucp_tag_recv_add_debug_entry(ucp_worker_h worker, void *buffer, size_t length, + ucp_tag_t tag, ucp_request_t *req) +{ + ucp_tag_rndv_debug_entry_t *entry = ucp_worker_rndv_debug_entry(worker, + req->recv.req_id); + entry->id = req->recv.req_id; + entry->type = "tag_recv"; + entry->ep = NULL; + entry->local_address = buffer; + entry->size = length; + entry->rts_seq = 0; + entry->send_tag = 0; + entry->recv_tag = tag; + entry->remote_address = 0; + entry->remote_reqptr = 0; + entry->rndv_get_req = NULL; + entry->recv_req = req; + entry->send_req = NULL; +} + static UCS_F_ALWAYS_INLINE void ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, uintptr_t datatype, ucp_tag_t tag, ucp_tag_t tag_mask, @@ -51,6 +72,16 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, ucp_trace_req(req, "%s buffer %p dt 0x%lx count %zu tag %"PRIx64"/%"PRIx64, debug_name, buffer, datatype, count, tag, tag_mask); + /* set request id */ + req->recv.req_id = worker->rndv_req_id; + worker->rndv_req_id++; + + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + ucp_tag_recv_add_debug_entry(worker, buffer, + ucp_contig_dt_length(datatype, count), + tag, req); + } + /* First, check the fast path case - single fragment * in this case avoid initializing most of request fields * */ @@ -128,7 +159,7 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, /* Check rendezvous case */ if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_RNDV)) { - ucp_rndv_matched(worker, req, (void*)(rdesc + 1)); + ucp_rndv_matched(worker, req, (void*)(rdesc + 1), rdesc->rndv_rts_seq); UCP_WORKER_STAT_RNDV(worker, UNEXP); ucp_recv_desc_release(rdesc); return;