Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update shm naming scheme #434

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdparty/ps-lite
5 changes: 3 additions & 2 deletions byteps/common/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ void BytePSCommSocket::init(int* rank, int* size, int* local_rank,
_recv_path = std::string(getenv("BYTEPS_SOCKET_PATH")) +
std::string("/socket_recv_");
} else {
_send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND);
_recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV);
auto job_id = BytePSGlobal::GetJobId();
_send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND) + job_id + "_";
_recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV) + job_id + "_";
}

_send_fd = initSocket(_local_rank, _send_path);
Expand Down
2 changes: 2 additions & 0 deletions byteps/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ unsigned int next_key_ = 0;
cudaStream_t* BytePSGlobal::_copy_device2host_stream = NULL;
cudaStream_t* BytePSGlobal::_copy_host2device_stream = NULL;
std::shared_ptr<NcclManager> BytePSGlobal::_nccl_manager;
std::string BytePSGlobal::_job_id = "0";
std::shared_ptr<CpuReducer> BytePSGlobal::_cpu_reducer;
std::shared_ptr<ThreadPool> BytePSGlobal::_thread_pool;

Expand Down Expand Up @@ -123,6 +124,7 @@ void BytePSGlobal::Init() {
? std::string(getenv("BYTEPS_TRACE_DIR"))
: "./trace";

_job_id = getenv("BYTEPS_JOB_ID") ? std::string(getenv("BYTEPS_JOB_ID")) : "0";
_basic_comm = std::make_shared<BytePSCommSocket>();

_basic_comm->init(&_rank, &_size, &_local_rank, &_local_size, &_worker_id,
Expand Down
4 changes: 4 additions & 0 deletions byteps/common/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class BytePSGlobal {
}
static bool IsRootDevice() { return _is_root_device; }
static bool IsDistributed() { return _is_distributed_job; }
static std::string GetJobId() { return _job_id; }
static bool IsCrossPcieSwitch() { return _is_cross_pcie_switch; }
static BytePSRole GetMyRole() { return _my_role; }
static std::shared_ptr<BytePSComm> GetBasicComm() { return _basic_comm; }
Expand Down Expand Up @@ -209,6 +210,9 @@ class BytePSGlobal {
}

static int _pagesize;
// unique identifier for the current application to avoid resource conflict
// (e.g. shared memory name, socket name, etc)
static std::string _job_id;
static size_t DivUp(size_t x, size_t y) { return (x + y - 1) / y; }
static size_t RoundUp(size_t x, size_t y) { return DivUp(x, y) * y; }

Expand Down
6 changes: 4 additions & 2 deletions byteps/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,13 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) {

size_t aligned_size = Align(size, dtype);
if (BytePSGlobal::IsCrossPcieSwitch()) {
auto shm_prefix = std::string("BytePS_Pcie_") + BytePSGlobal::GetJobId();
context.pcie_cpubuff =
shm_obj->openPcieSharedMemory(key_list[0], aligned_size);
shm_obj->openPcieSharedMemory(shm_prefix, key_list[0], aligned_size);
context.cpubuff = context.pcie_cpubuff.back();
} else {
context.cpubuff = shm_obj->openSharedMemory(std::string("BytePS_ShM_"),
auto shm_prefix = std::string("BytePS_ShM_") + BytePSGlobal::GetJobId() + "_";
context.cpubuff = shm_obj->openSharedMemory(shm_prefix,
key_list[0], aligned_size);
}
BPS_LOG(TRACE) << name << ": open shared memory size " << aligned_size;
Expand Down
23 changes: 14 additions & 9 deletions byteps/common/shared_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix,
uint64_t key, size_t size) {
size = BytePSGlobal::RoundUpToPageSize(size);
std::string shm_name(prefix);
shm_name += std::to_string(key);
std::stringstream stream;
stream << std::hex << key;

shm_name += stream.str();
int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666);
BPS_CHECK_GE(shm_fd, 0) << "shm_open failed for " << shm_name << " " << strerror(errno);

Expand All @@ -41,40 +44,42 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix,

BPS_CHECK_NE(ptr, (void*)-1) << strerror(errno);

BPS_LOG(TRACE) << "initialized share memory size " << size;
BPS_LOG(DEBUG) << "initialized share memory size " << size << ", name=" << shm_name
<< ", key = " << key << "(0x" << stream.str() << ")";

std::lock_guard<std::mutex> lock(_shm_mu);
_key_shm_addr[shm_name] = ptr;
_key_shm_size[shm_name] = size;
return ptr;
}

std::vector<void*> BytePSSharedMemory::openPcieSharedMemory(uint64_t key,
std::vector<void*> BytePSSharedMemory::openPcieSharedMemory(const std::string& prefix,
uint64_t key,
size_t size) {
std::vector<void*> r;
for (int i = 0; i < BytePSGlobal::GetPcieSwitchNum(); i++) {
auto prefix = std::string("BytePS_Pcie") + std::to_string(i) + "_Shm_";
auto prefix_i = prefix + std::to_string(i) + "_Shm_";
if (BytePSGlobal::IsDistributed()) {
if (BytePSGlobal::IsCrossPcieSwitch()) {
if (i <= numa_max_node()) {
numa_set_preferred(i);
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
numa_set_preferred(-1);
} else {
numa_set_preferred(numa_max_node());
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
numa_set_preferred(-1);
}
} else {
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
}
} else {
if (BytePSGlobal::IsCrossPcieSwitch()) {
numa_set_interleave_mask(numa_all_nodes_ptr);
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
numa_set_interleave_mask(numa_no_nodes_ptr);
} else {
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion byteps/common/shared_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BytePSSharedMemory {
}

void *openSharedMemory(const std::string &prefix, uint64_t key, size_t size);
std::vector<void *> openPcieSharedMemory(uint64_t key, size_t size);
std::vector<void *> openPcieSharedMemory(const std::string &prefix, uint64_t key, size_t size);

private:
std::unordered_map<std::string, void *> _key_shm_addr;
Expand Down