Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug that rdma server rx complete queue will be blocked if client crashed. #1978

Merged
merged 2 commits into from
Aug 9, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix bug that rdma server complete queue will be blocked if client cra…
…shed.

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Aug 8, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 35031369602060df610857961b2b566e1f5415dc
7 changes: 4 additions & 3 deletions src/common/rdma/rdma.cc
Original file line number Diff line number Diff line change
@@ -155,21 +155,22 @@ int IRDMA::GetCompletion(fid_cq* cq, int timeout, void** context) {
if (ret > 0) {
break;
} else if (ret < 0 && ret != -FI_EAGAIN) {
return ret;
break;
} else if (timeout > 0) {
clock_gettime(CLOCK_REALTIME, &end);
if ((end.tv_sec - start.tv_sec) * 1000 +
(end.tv_nsec - start.tv_nsec) / 1000000 >
timeout) {
return -FI_ETIMEDOUT;
ret = -FI_ETIMEDOUT;
break;
}
}
}
if (context) {
*context = err.op_context;
}

return 0;
return ret < 0 ? ret : 0;
}

void IRDMA::FreeInfo(fi_info* info) {
13 changes: 12 additions & 1 deletion src/common/rdma/rdma_server.cc
Original file line number Diff line number Diff line change
@@ -429,7 +429,18 @@ Status RDMAServer::GetRXCompletion(int timeout, void** context) {
continue;
}
} else if (ret < 0) {
return Status::Invalid("GetRXCompletion failed");
if (ret == -FI_EAVAIL) {
fi_cq_err_entry err;
fi_cq_readerr(rxcq, &err, 0);
if (err.err == FI_ECANCELED) {
// client crashed
return Status::ConnectionError("Client crashed.");
} else {
return Status::Invalid(fi_strerror(err.err));
}
}
return Status::Invalid("GetRXCompletion failed, ret:" +
std::to_string(ret));
} else {
return Status::OK();
}
19 changes: 11 additions & 8 deletions src/server/async/rpc_server.cc
Original file line number Diff line number Diff line change
@@ -364,6 +364,12 @@ void RPCServer::doRDMARecv() {
VLOG(100) << "RDMA server stopped!";
return;
}
if (status.IsConnectionError()) {
LOG(ERROR) << "Connection error!" << status.message();
VineyardRecvContext* recv_context =
reinterpret_cast<VineyardRecvContext*>(context);
doVineyardClose(recv_context);
}
VLOG(100) << "Get RX completion failed! Error:" << status.message();
VLOG(100) << "Retry...";
} else {
@@ -378,6 +384,11 @@ void RPCServer::doRDMARecv() {
VineyardMsg* recv_msg =
reinterpret_cast<VineyardMsg*>(recv_context->attr.msg_buffer);

if (recv_msg->type == VINEYARD_MSG_CLOSE) {
doVineyardClose(recv_context);
delete recv_context;
}

VineyardRecvContext* recv_context_tmp = new VineyardRecvContext();
VineyardMsg* recv_msg_tmp = new VineyardMsg();
if (recv_msg_tmp == nullptr || recv_context_tmp == nullptr) {
@@ -407,14 +418,6 @@ void RPCServer::doRDMARecv() {
rdma_server_->Recv(
recv_context->rdma_conn_id, reinterpret_cast<void*>(recv_msg),
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context));
} else if (recv_msg->type == VINEYARD_MSG_CLOSE) {
boost::asio::post(vs_ptr_->GetIOContext(),
[this, recv_context_tmp, recv_msg_tmp] {
doVineyardClose(recv_context_tmp);
delete recv_context_tmp;
delete recv_msg_tmp;
});
delete recv_context;
} else {
LOG(ERROR) << "Unknown message type: " << recv_msg->type;
rdma_server_->Recv(
Loading