Skip to content

Commit

Permalink
Merge pull request #10403 from shasson5/gga
Browse files Browse the repository at this point in the history
UCT/GGA: Support is_connected API
  • Loading branch information
yosefe authored Jan 16, 2025
2 parents c78164d + 15b4462 commit 92cb1dd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 17 deletions.
40 changes: 32 additions & 8 deletions src/uct/ib/mlx5/gga/gga_mlx5.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ enum {
};

typedef struct {
uint8_t flags;
uct_ib_uint24_t qp_num;
uct_ib_uint24_t flush_rkey;
uct_rc_mlx5_base_ep_address_t super;
uint8_t flags;
uct_ib_uint24_t flush_rkey;
} UCS_S_PACKED uct_gga_mlx5_ep_address_t;

typedef struct {
Expand Down Expand Up @@ -465,7 +465,7 @@ uct_gga_mlx5_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
uct_gga_mlx5_ep_address_t *gga_addr = (uct_gga_mlx5_ep_address_t*)addr;
uct_ib_md_t *md = uct_ib_iface_md(&iface->super.super);

uct_ib_pack_uint24(gga_addr->qp_num, ep->super.tx.wq.super.qp_num);
uct_ib_pack_uint24(gga_addr->super.qp_num, ep->super.tx.wq.super.qp_num);
if (uct_rc_iface_flush_rkey_enabled(&iface->super)) {
gga_addr->flags = UCT_GGA_MLX5_EP_ADDRESS_FLAG_FLUSH_RKEY;
uct_ib_pack_uint24(gga_addr->flush_rkey, md->flush_rkey >> 8);
Expand Down Expand Up @@ -499,7 +499,7 @@ uct_gga_mlx5_ep_connect_to_ep_v2(uct_ep_h tl_ep,
&path_mtu);
ucs_assert(path_mtu != UCT_IB_ADDRESS_INVALID_PATH_MTU);

qp_num = uct_ib_unpack_uint24(gga_ep_addr->qp_num);
qp_num = uct_ib_unpack_uint24(gga_ep_addr->super.qp_num);
status = uct_rc_mlx5_iface_common_devx_connect_qp(
iface, &ep->super.tx.wq.super, qp_num, &ah_attr, path_mtu,
ep->super.super.path_index, iface->super.config.max_rd_atomic);
Expand Down Expand Up @@ -670,6 +670,17 @@ static uct_iface_ops_t uct_gga_mlx5_iface_tl_ops = {
.iface_is_reachable = uct_base_iface_is_reachable
};

static int
uct_gga_mlx5_iface_is_same_device(const uct_iface_h tl_iface,
const uct_gga_mlx5_iface_addr_t *iface_addr)
{
uct_ib_iface_t *iface = ucs_derived_of(tl_iface, uct_ib_iface_t);
uct_ib_device_t *device = uct_ib_iface_device(iface);

return iface_addr->be_sys_image_guid ==
device->dev_attr.orig_attr.sys_image_guid;
}

static int
uct_gga_mlx5_iface_is_reachable_v2(const uct_iface_h tl_iface,
const uct_iface_is_reachable_params_t *params)
Expand All @@ -686,8 +697,7 @@ uct_gga_mlx5_iface_is_reachable_v2(const uct_iface_h tl_iface,
return 0;
}

if (iface_addr->be_sys_image_guid !=
device->dev_attr.orig_attr.sys_image_guid) {
if (!uct_gga_mlx5_iface_is_same_device(tl_iface, iface_addr)) {
uct_iface_fill_info_str_buf(
params,
"different GUID 0x%"PRIx64" (local) vs 0x%"PRIx64" (remote)",
Expand All @@ -699,6 +709,20 @@ uct_gga_mlx5_iface_is_reachable_v2(const uct_iface_h tl_iface,
return uct_ib_iface_is_reachable_v2(tl_iface, params);
}

static int
uct_gga_mlx5_ep_is_connected(uct_ep_h tl_ep,
const uct_ep_is_connected_params_t *params)
{
const uct_gga_mlx5_iface_addr_t *iface_addr =
(const uct_gga_mlx5_iface_addr_t*)
UCS_PARAM_VALUE(UCT_EP_IS_CONNECTED_FIELD, params, iface_addr,
IFACE_ADDR, NULL);

return (iface_addr != NULL) &&
uct_gga_mlx5_iface_is_same_device(tl_ep->iface, iface_addr) &&
uct_rc_mlx5_base_ep_is_connected(tl_ep, params);
}

static uct_rc_iface_ops_t uct_gga_mlx5_iface_ops = {
.super = {
.super = {
Expand All @@ -708,7 +732,7 @@ static uct_rc_iface_ops_t uct_gga_mlx5_iface_ops = {
.ep_invalidate = uct_rc_mlx5_base_ep_invalidate,
.ep_connect_to_ep_v2 = uct_gga_mlx5_ep_connect_to_ep_v2,
.iface_is_reachable_v2 = uct_gga_mlx5_iface_is_reachable_v2,
.ep_is_connected = ucs_empty_function_do_assert
.ep_is_connected = uct_gga_mlx5_ep_is_connected
},
.create_cq = uct_rc_mlx5_iface_common_create_cq,
.destroy_cq = uct_rc_mlx5_iface_common_destroy_cq,
Expand Down
9 changes: 6 additions & 3 deletions src/uct/ib/mlx5/rc/rc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ typedef struct {
uct_ib_mlx5_mmio_reg_t *reg; /* Doorbell register */
} uct_rc_mlx5_iface_qp_cleanup_ctx_t;

typedef struct uct_rc_mlx5_base_ep_address {
uct_ib_uint24_t qp_num;
} UCS_S_PACKED uct_rc_mlx5_base_ep_address_t;

typedef struct uct_rc_mlx5_ep_address {
uct_ib_uint24_t qp_num;
uct_rc_mlx5_base_ep_address_t super;
/* For RNDV TM enabling 2 QPs should be created, one is for sending WRs and
* another one for HW (device will use it for RDMA reads and sending RNDV
* Complete messages). */
uct_ib_uint24_t tm_qp_num;
uint8_t atomic_mr_id;
uct_ib_uint24_t tm_qp_num;
uint8_t atomic_mr_id;
} UCS_S_PACKED uct_rc_mlx5_ep_address_t;


Expand Down
10 changes: 5 additions & 5 deletions src/uct/ib/mlx5/rc/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ ucs_status_t uct_rc_mlx5_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
uct_rc_mlx5_ep_ext_address_t *ext_addr;
void *ptr;

uct_ib_pack_uint24(rc_addr->qp_num, ep->super.tx.wq.super.qp_num);
uct_ib_pack_uint24(rc_addr->super.qp_num, ep->super.tx.wq.super.qp_num);
if (uct_rc_iface_flush_rkey_enabled(&iface->super) ||
md->config.enable_indirect_atomic) {
rc_addr->atomic_mr_id = uct_ib_md_get_atomic_mr_id(md);
Expand Down Expand Up @@ -748,7 +748,7 @@ int uct_rc_mlx5_base_ep_is_connected(const uct_ep_h tl_ep,
{
UCT_RC_MLX5_BASE_EP_DECL(tl_ep, iface, ep);
uint32_t addr_qp = 0;
uct_rc_mlx5_ep_address_t *rc_addr;
uct_rc_mlx5_base_ep_address_t *rc_addr;
ucs_status_t status;
struct ibv_ah_attr ah_attr;
uint32_t qp_num;
Expand All @@ -761,7 +761,7 @@ int uct_rc_mlx5_base_ep_is_connected(const uct_ep_h tl_ep,
}

if (params->field_mask & UCT_EP_IS_CONNECTED_FIELD_EP_ADDR) {
rc_addr = (uct_rc_mlx5_ep_address_t*)params->ep_addr;
rc_addr = (uct_rc_mlx5_base_ep_address_t*)params->ep_addr;
addr_qp = uct_ib_unpack_uint24(rc_addr->qp_num);
}

Expand Down Expand Up @@ -797,7 +797,7 @@ uct_rc_mlx5_ep_connect_to_ep_v2(uct_ep_h tl_ep,
* RNDV offload (for issuing RDMA reads and sending RNDV ACK). No WQEs
* should be posted to the send side of the QP which is owned by device. */
status = uct_rc_mlx5_ep_connect_qp(
iface, &ep->tm_qp, uct_ib_unpack_uint24(rc_addr->qp_num),
iface, &ep->tm_qp, uct_ib_unpack_uint24(rc_addr->super.qp_num),
&ah_attr, path_mtu, ep->super.super.path_index);
if (status != UCS_OK) {
return status;
Expand All @@ -807,7 +807,7 @@ uct_rc_mlx5_ep_connect_to_ep_v2(uct_ep_h tl_ep,
* (and bound to XRQ) on the peer. */
qp_num = uct_ib_unpack_uint24(rc_addr->tm_qp_num);
} else {
qp_num = uct_ib_unpack_uint24(rc_addr->qp_num);
qp_num = uct_ib_unpack_uint24(rc_addr->super.qp_num);
}

status = uct_rc_mlx5_ep_connect_qp(iface, &ep->super.tx.wq.super, qp_num,
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/uct/test_uct_ep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ UCS_TEST_SKIP_COND_P(test_uct_ep, disconnect_after_send,
}
}

UCS_TEST_SKIP_COND_P(test_uct_ep, is_connected, has_transport("gga_mlx5"))
UCS_TEST_P(test_uct_ep, is_connected)
{
create_sender();

Expand Down

0 comments on commit 92cb1dd

Please sign in to comment.