Skip to content

Commit

Permalink
prov/tcp: introduce sub-domains to support FI_THREAD_COMPLETION
Browse files Browse the repository at this point in the history
By default, the tcp provider is optimized for single threaded
applications that make use of FI_THREAD_DOMAIN but is inefficient for
multithreaded applications that intend to follow FI_THREAD_COMPLETION
semantics.  The primary limiting factoe  is that the progress engine is
tied to the domain, and all endpoint under a single domain are
synchronized by this lone progress engine

In an effort to increase efficiency and performance of these
multithreaded applications, multiplex the application's domain reference
into a subdomain per ep, each having their own progress engine

Co-authored-by: Alexia Ingerson <[email protected]>
Signed-off-by: Stephen Oost <[email protected]>
  • Loading branch information
2 people authored and j-xiong committed Aug 8, 2024
1 parent 3e8d734 commit 5b08723
Show file tree
Hide file tree
Showing 11 changed files with 742 additions and 30 deletions.
3 changes: 2 additions & 1 deletion include/ofi_enosys.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ static struct fi_ops_av_set X = {
.straddr = X,
};
*/

int fi_no_av_set(struct fid_av *av_fid, struct fi_av_set_attr *attr,
struct fid_av_set **av_set_fid, void *context);
int fi_no_av_set_union(struct fid_av_set *dst, const struct fid_av_set *src);
int fi_no_av_set_intersect(struct fid_av_set *dst, const struct fid_av_set *src);
int fi_no_av_set_diff(struct fid_av_set *dst, const struct fid_av_set *src);
Expand Down
7 changes: 7 additions & 0 deletions include/ofi_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,15 @@ int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
fi_addr_t *fi_addr, uint64_t flags, void *context);
int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
size_t count, uint64_t flags);
bool ofi_ip_av_is_valid(struct fid_av *av_fid, fi_addr_t fi_addr);
int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
void *addr, size_t *addrlen);
int ofi_ip_av_insertsym(struct fid_av *av_fid, const char *node,
size_t nodecnt, const char *service, size_t svccnt,
fi_addr_t *fi_addr, uint64_t flags, void *context);
int ofi_ip_av_insertsvc(struct fid_av *av, const char *node,
const char *service, fi_addr_t *fi_addr,
uint64_t flags, void *context);
const char *
ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len);

Expand Down
1 change: 1 addition & 0 deletions libfabric.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@
<ClCompile Include="prov\tcp\src\xnet_cm.c" />
<ClCompile Include="prov\tcp\src\xnet_cq.c" />
<ClCompile Include="prov\tcp\src\xnet_domain.c" />
<ClCompile Include="prov\tcp\src\xnet_av.c" />
<ClCompile Include="prov\tcp\src\xnet_ep.c" />
<ClCompile Include="prov\tcp\src\xnet_eq.c" />
<ClCompile Include="prov\tcp\src\xnet_fabric.c" />
Expand Down
3 changes: 3 additions & 0 deletions libfabric.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@
<ClCompile Include="prov\tcp\src\xnet_domain.c">
<Filter>Source Files\prov\tcp\src</Filter>
</ClCompile>
<ClCompile Include="prov\tcp\src\xnet_av.c">
<Filter>Source Files\prov\tcp\src</Filter>
</ClCompile>
<ClCompile Include="prov\tcp\src\xnet_rma.c">
<Filter>Source Files\prov\tcp\src</Filter>
</ClCompile>
Expand Down
1 change: 1 addition & 0 deletions prov/tcp/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ _xnet_files = \
prov/tcp/src/xnet_cm.c \
prov/tcp/src/xnet_rdm_cm.c \
prov/tcp/src/xnet_domain.c \
prov/tcp/src/xnet_av.c \
prov/tcp/src/xnet_rma.c \
prov/tcp/src/xnet_msg.c \
prov/tcp/src/xnet_ep.c \
Expand Down
35 changes: 31 additions & 4 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,34 @@ struct xnet_xfer_entry {
char msg_data[];
};

struct xnet_mplex_av {
struct util_av util_av;
struct dlist_entry subav_list;
struct ofi_genlock lock;
};

struct xnet_domain {
struct util_domain util_domain;
struct xnet_progress progress;
enum fi_ep_type ep_type;

/* When an application requests FI_THREAD_COMPLETION
* the assumption is that the domain will be used
* across multiple threads.
*
* The xnet progress engine is optimized for single
* threaded performance, so instead of reworking the
* progress engine, likely losing single threaded
* performance, multiplex the domain into multiple
* subdomains for each ep. This way, each ep, with
* the assumption that the application wants to
* progress an ep per thread, can have it's own
* progress engine and avoid having a single
* synchronization point among all eps.
*/
struct fi_info *subdomain_info;
struct ofi_genlock subdomain_list_lock;
struct dlist_entry subdomain_list;
};

static inline struct xnet_progress *xnet_ep2_progress(struct xnet_ep *ep)
Expand Down Expand Up @@ -544,10 +568,6 @@ int xnet_passive_ep(struct fid_fabric *fabric, struct fi_info *info,

int xnet_set_port_range(void);

int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info,
struct fid_domain **domain, void *context);


int xnet_setup_socket(SOCKET sock, struct fi_info *info);
void xnet_set_zerocopy(SOCKET sock);

Expand All @@ -567,6 +587,13 @@ static inline struct xnet_cq *xnet_ep_tx_cq(struct xnet_ep *ep)
}


int xnet_mplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
struct fid_av **fid_av, void *context);
int xnet_domain_multiplexed(struct fid_domain *domain_fid);
int xnet_domain_open(struct fid_fabric *fabric, struct fi_info *info,
struct fid_domain **domain, void *context);
int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
struct fid_av **fid_av, void *context);
int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context);
void xnet_report_success(struct xnet_xfer_entry *xfer_entry);
Expand Down
234 changes: 234 additions & 0 deletions prov/tcp/src/xnet_av.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Copyright (c) Intel Corporation. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

#include <stdlib.h>

#include "xnet.h"

int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
struct fid_av **fid_av, void *context)
{
return rxm_util_av_open(domain_fid, attr, fid_av, context,
sizeof(struct xnet_conn), NULL);
}

static int xnet_mplex_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
size_t count, uint64_t flags)
{
int ret;
struct fid_list_entry *item;
struct fid_av *subav_fid;
struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
util_av.av_fid);

ofi_genlock_lock(&av->lock);
dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
subav_fid = container_of(item->fid, struct fid_av, fid);
ret = fi_av_remove(subav_fid, fi_addr, count, flags);
if (ret)
goto out;
}
ret = ofi_ip_av_remove(&av->util_av.av_fid, fi_addr, count, flags);
out:
ofi_genlock_unlock(&av->lock);
return ret;
}

static int xnet_mplex_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
fi_addr_t *fi_addr, uint64_t flags, void *context)
{
int ret;
struct fid_list_entry *item;
struct fid_av *subav_fid;
fi_addr_t sub_fi_addr;
struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
util_av.av_fid.fid);

ofi_genlock_lock(&av->lock);
ret = ofi_ip_av_insert(&av->util_av.av_fid, addr, count, fi_addr, flags, context);
if (ret < count)
goto out;
dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
subav_fid = container_of(item->fid, struct fid_av, fid);
ret = fi_av_insert(subav_fid, addr, count, &sub_fi_addr, flags, context);
if (ret < count)
break;
assert(*fi_addr == sub_fi_addr);
}
out:
ofi_genlock_unlock(&av->lock);
return ret;
}

static int xnet_mplex_av_insertsym(struct fid_av *av_fid, const char *node,
size_t nodecnt, const char *service,
size_t svccnt, fi_addr_t *fi_addr,
uint64_t flags, void *context)
{
int ret;
struct fid_list_entry *item;
struct fid_av *subav_fid;
fi_addr_t sub_fi_addr;
struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
util_av.av_fid.fid);

ofi_genlock_lock(&av->lock);
ret = ofi_ip_av_insertsym(&av->util_av.av_fid, node, nodecnt,
service, svccnt, fi_addr, flags, context);
if (ret)
goto out;
dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
subav_fid = container_of(item->fid, struct fid_av, fid);
ret = fi_av_insertsym(subav_fid, node, nodecnt, service, svccnt,
&sub_fi_addr, flags, context);
if (ret)
break;
assert(*fi_addr == sub_fi_addr);
}
out:
ofi_genlock_unlock(&av->lock);

return ret;
}

static int xnet_mplex_av_insertsvc(struct fid_av *av_fid, const char *node,
const char *service, fi_addr_t *fi_addr,
uint64_t flags, void *context)
{
int ret;
struct fid_list_entry *item;
struct fid_av *subav_fid;
fi_addr_t sub_fi_addr;
struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
util_av.av_fid.fid);

ofi_genlock_lock(&av->lock);
ret = ofi_ip_av_insertsvc(&av->util_av.av_fid, node, service,
fi_addr, flags, context);
if (ret)
goto out;
dlist_foreach_container(&av->subav_list, struct fid_list_entry, item, entry) {
subav_fid = container_of(item->fid, struct fid_av, fid);
ret = fi_av_insertsvc(subav_fid, node, service, &sub_fi_addr, flags,
context);
if (ret)
break;
assert(*fi_addr == sub_fi_addr);
}
out:
ofi_genlock_unlock(&av->lock);
return ret;
}

static int xnet_mplex_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
void *addr, size_t *addrlen)
{
struct xnet_mplex_av *av = container_of(av_fid, struct xnet_mplex_av,
util_av.av_fid.fid);
return ofi_ip_av_lookup(&av->util_av.av_fid, fi_addr, addr, addrlen);
}

static int xnet_mplex_av_close(struct fid *av_fid)
{
struct xnet_mplex_av *av;
struct fid_list_entry *item;
int ret = 0;

av = container_of(av_fid, struct xnet_mplex_av, util_av.av_fid.fid);
while (!dlist_empty(&av->subav_list)) {
dlist_pop_front(&av->subav_list, struct fid_list_entry, item, entry);
(void)fi_close(item->fid);
free(item);
}
ret = ofi_av_close(&av->util_av);
ofi_genlock_destroy(&av->lock);
free(av);
return ret;
}

static struct fi_ops xnet_mplex_av_fi_ops = {
.size = sizeof(struct fi_ops),
.close = xnet_mplex_av_close,
.bind = fi_no_bind,
.control = fi_no_control,
.ops_open = fi_no_ops_open,
};

static struct fi_ops_av xnet_mplex_av_ops = {
.size = sizeof(struct fi_ops_av),
.insert = xnet_mplex_av_insert,
.insertsvc = xnet_mplex_av_insertsvc,
.insertsym = xnet_mplex_av_insertsym,
.remove = xnet_mplex_av_remove,
.lookup = xnet_mplex_av_lookup,
.straddr = ofi_ip_av_straddr,
.av_set = fi_no_av_set,
};

int xnet_mplex_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
struct fid_av **fid_av, void *context)
{
struct xnet_mplex_av *av;
struct util_domain *domain;
struct util_av_attr util_attr = {0};
int ret;

av = calloc(1, sizeof(*av));
if (!av)
return -FI_ENOMEM;

ret = ofi_genlock_init(&av->lock, OFI_LOCK_MUTEX);
if (ret)
goto free;

domain = container_of(domain_fid, struct util_domain, domain_fid);

util_attr.context_len = sizeof(struct util_peer_addr *);
util_attr.addrlen = ofi_sizeof_addr_format(domain->addr_format);
if (attr->type == FI_AV_UNSPEC)
attr->type = FI_AV_TABLE;

ret = ofi_av_init(domain, attr, &util_attr, &av->util_av, context);
if (ret)
goto free_lock;
dlist_init(&av->subav_list);
av->util_av.av_fid.fid.ops = &xnet_mplex_av_fi_ops;
av->util_av.av_fid.ops = &xnet_mplex_av_ops;
*fid_av = &av->util_av.av_fid;
return FI_SUCCESS;

free_lock:
ofi_genlock_destroy(&av->lock);
free:
free(av);
return ret;
}
Loading

0 comments on commit 5b08723

Please sign in to comment.