Skip to content

Commit

Permalink
prov/sharp: Dummy implementation of fi_barrier() and fi_allreduce()
Browse files Browse the repository at this point in the history
Both collective operation implemented ba colling peer collective operation and
transparently passing completion back to peer CQ

Signed-off-by: Tomasz Gromadzki <[email protected]>
  • Loading branch information
grom72 committed Dec 14, 2022
1 parent 51de7cb commit c9ba8f8
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 9 deletions.
6 changes: 6 additions & 0 deletions prov/sharp/src/sharp.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,10 @@ ssize_t sharp_ep_allreduce(struct fid_ep *ep, const void *buf, size_t count,
fi_addr_t coll_addr, enum fi_datatype datatype,
enum fi_op op, uint64_t flags, void *context);

ssize_t sharp_peer_xfer_complete(struct fid_ep *ep,
struct fi_cq_tagged_entry *cqe,
fi_addr_t src_addr);

ssize_t sharp_peer_xfer_error(struct fid_ep *ep, struct fi_cq_err_entry *cqerr);

#endif
75 changes: 66 additions & 9 deletions prov/sharp/src/sharp_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static struct fi_ops sharp_mc_fid_ops = {


int sharp_join_collective(struct fid_ep *fid, const void *addr, uint64_t flags,
struct fid_mc **mc_fid, void *context)
struct fid_mc **mc_fid, void *context)
{
struct fi_peer_mc_context *peer_context;
struct sharp_mc *mc;
Expand All @@ -107,6 +107,7 @@ int sharp_join_collective(struct fid_ep *fid, const void *addr, uint64_t flags,
(*mc_fid)->fid.ops = &sharp_mc_fid_ops;
if ((flags & FI_PEER))
mc->peer_mc = context;
mc->mc_fid.fi_addr = (uintptr_t)mc;

/* XXX Dummy implementation */
struct fi_eq_entry entry;
Expand Down Expand Up @@ -138,21 +139,35 @@ int sharp_join_collective(struct fid_ep *fid, const void *addr, uint64_t flags,
ssize_t sharp_ep_barrier2(struct fid_ep *fid, fi_addr_t coll_addr,
uint64_t flags, void *context)
{
#if 1
/* XXX Dummy implementation based on peer:fi_barrier() */
struct sharp_ep *ep;
struct sharp_mc *sharp_mc;
ep = container_of(fid, struct sharp_ep, util_ep.ep_fid);
sharp_mc = (struct sharp_mc *) ((uintptr_t) coll_addr);

coll_addr = fi_mc_addr(sharp_mc->peer_mc);

flags |= FI_PEER_TRANSFER;
return fi_barrier2(ep->peer_ep, coll_addr, flags, context);
#else
/* XXX Dummy implementation */

struct sharp_ep *ep;
struct sharp_cq *cq;
ssize_t ret;

ep = container_of(fid, struct sharp_ep, util_ep.ep_fid);
cq = container_of(ep->util_ep.tx_cq, struct sharp_cq, util_cq);
ssize_t ret;
ret = cq->peer_cq->owner_ops->write(cq->peer_cq, context, FI_COLLECTIVE,
ret = cq->peer_cq->owner_ops->write(cq->peer_cq, context, FI_COLLECTIVE,
0, 0, 0, 0, 0);

if (ret)
FI_WARN(ep->util_ep.domain->fabric->prov, FI_LOG_CQ,
"barrier2 - cq write failed\n");

return ret;
#endif
}

ssize_t sharp_ep_barrier(struct fid_ep *ep, fi_addr_t coll_addr, void *context)
Expand All @@ -166,20 +181,62 @@ ssize_t sharp_ep_allreduce(struct fid_ep *fid, const void *buf, size_t count,
fi_addr_t coll_addr, enum fi_datatype datatype,
enum fi_op op, uint64_t flags, void *context)
{
#if 1
/* XXX Dummy implementation based on peer:fi_allreduce() */
struct sharp_ep *ep;
struct sharp_mc *sharp_mc;
ep = container_of(fid, struct sharp_ep, util_ep.ep_fid);
sharp_mc = (struct sharp_mc *) ((uintptr_t) coll_addr);

coll_addr = fi_mc_addr(sharp_mc->peer_mc);

flags |= FI_PEER_TRANSFER;
return fi_allreduce(ep->peer_ep, buf, count, desc, result,
result_desc, coll_addr, datatype, op, flags, context);
#else
/* XXX Dummy implementation */
struct sharp_cq *cq;
ssize_t ret;

memcpy(result,buf,count*ofi_datatype_size(datatype));
struct sharp_ep *ep;
struct sharp_cq *cq;
ep = container_of(fid, struct sharp_ep, util_ep.ep_fid);
cq = container_of(ep->util_ep.tx_cq, struct sharp_cq, util_cq);
ssize_t ret;
ret = cq->peer_cq->owner_ops->write(cq->peer_cq, context, FI_COLLECTIVE,
0, 0, 0, 0, 0);
ret = cq->peer_cq->owner_ops->write(cq->peer_cq, context, FI_COLLECTIVE,
0, 0, 0, 0, 0);
if (ret)
FI_WARN(ep->util_ep.domain->fabric->prov, FI_LOG_CQ,
"allreduce - cq write failed\n");
memcpy(result,buf,count*ofi_datatype_size(datatype));
return ret;
#endif
}

ssize_t sharp_peer_xfer_complete(struct fid_ep *ep_fid,
struct fi_cq_tagged_entry *cqe,
fi_addr_t src_addr)
{
struct sharp_ep *ep;
struct sharp_cq *cq;

ep = container_of(ep_fid, struct sharp_ep, util_ep.ep_fid);
cq = container_of(ep->util_ep.tx_cq, struct sharp_cq, util_cq);

if (cq->peer_cq->owner_ops->write(cq->peer_cq, cqe->op_context,
FI_COLLECTIVE, 0, 0, 0, 0, 0))
FI_WARN(ep->util_ep.domain->fabric->prov, FI_LOG_DOMAIN,
"collective - cq write failed\n");
return 0;
}

ssize_t sharp_peer_xfer_error(struct fid_ep *ep_fid, struct fi_cq_err_entry *cqerr)
{
struct sharp_ep *ep;
struct sharp_cq *cq;

ep = container_of(ep_fid, struct sharp_ep, util_ep.ep_fid);
cq = container_of(ep->util_ep.tx_cq, struct sharp_cq, util_cq);

if (cq->peer_cq->owner_ops->writeerr(cq->peer_cq, cqerr))
FI_WARN(ep->util_ep.domain->fabric->prov, FI_LOG_DOMAIN,
"collective - cq write failed\n");
return 0;
}
8 changes: 8 additions & 0 deletions prov/sharp/src/sharp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ void sharp_ep_progress(struct util_ep *util_ep)
;
}

static struct fi_ops_transfer_peer sharp_ep_peer_xfer_ops = {
.size = sizeof(struct fi_ops_transfer_peer),
.complete = sharp_peer_xfer_complete,
.comperr = sharp_peer_xfer_error,
};

int sharp_endpoint(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep_fid, void *context)
{
Expand All @@ -188,6 +194,8 @@ int sharp_endpoint(struct fid_domain *domain, struct fi_info *info,
return -EINVAL;
}

peer_context->peer_ops = &sharp_ep_peer_xfer_ops;

ep = calloc(1, sizeof(*ep));
if (!ep)
return -FI_ENOMEM;
Expand Down

0 comments on commit c9ba8f8

Please sign in to comment.