Skip to content

Commit

Permalink
jsonrpc: Wait for the socket to be ready to read. (#11815)
Browse files Browse the repository at this point in the history
jsonrpc: Add poll to wait for the socket to be ready to read.
We found out that sometimes we hit a busy loop here, so adding a timeout
to make sure we do not hangup forever here.
  • Loading branch information
brbzull0 authored Nov 6, 2024
1 parent 191ba94 commit 5ef1036
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
9 changes: 5 additions & 4 deletions include/shared/rpc/IPCSocketClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ using namespace std::chrono_literals;
///
/// Error handling: Enclose this inside a try/catch because if any error is detected functions will throw.
struct IPCSocketClient {
enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, STREAM_ERROR, UNKNOWN };
enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, READ_ERROR, TIMEOUT, UNKNOWN };
using self_reference = IPCSocketClient &;

IPCSocketClient(std::string path = "/tmp/jsonrpc20.sock") : _path{std::move(path)} { memset(&_server, 0, sizeof(_server)); }
Expand All @@ -52,13 +52,14 @@ struct IPCSocketClient {

/// Connect to the configured socket path.
/// Connection will retry every @c ms for @c attempts times if errno is EAGAIN
self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts = 5);
self_reference connect(std::chrono::milliseconds wait_ms = 40ms, int attempts = 5);

/// Send all the passed string to the socket.
self_reference send(std::string_view data);

/// Read all the content from the socket till the message is complete.
ReadStatus read_all(std::string &content);
/// Read all the content until the fd closes or timeout( @c timeout_ms * @c attempts) has passed.
/// @return @c ReadStatus will be set accordingly with the operation result.
ReadStatus read_all(std::string &content, std::chrono::milliseconds timeout_ms = 1000ms, int attempts = 10);

/// Closes the socket.
void
Expand Down
7 changes: 5 additions & 2 deletions include/shared/rpc/RPCClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ class RPCClient
// responses.
ink_assert(!"Buffer full, not enough space to read the response.");
break;
case IPCSocketClient::ReadStatus::STREAM_ERROR:
err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while reading response. {}({})", std::strerror(errno), errno);
case IPCSocketClient::ReadStatus::READ_ERROR:
err_text = swoc::bwprint(err_text, "READ_ERROR: Error while reading response. {}({})", std::strerror(errno), errno);
break;
case IPCSocketClient::ReadStatus::TIMEOUT:
err_text = swoc::bwprint(err_text, "TIMEOUT: Couldn't get the response. {}({})", std::strerror(errno), errno);
break;
default:
err_text = "Something happened, we can't read the response. Unknown error.";
Expand Down
53 changes: 35 additions & 18 deletions src/shared/rpc/IPCSocketClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace shared::rpc
{

IPCSocketClient::self_reference
IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts)
IPCSocketClient::connect(std::chrono::milliseconds wait_ms, int attempts)
{
std::string text;
int err, tries{attempts};
Expand Down Expand Up @@ -67,7 +67,7 @@ IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts)
if (errno == EAGAIN || errno == EINPROGRESS) {
// Connection cannot be completed immediately
// EAGAIN for UDS should suffice, but just in case.
std::this_thread::sleep_for(ms);
std::this_thread::sleep_for(wait_ms);
err = errno;
continue;
} else {
Expand Down Expand Up @@ -117,35 +117,52 @@ IPCSocketClient ::send(std::string_view data)
}

IPCSocketClient::ReadStatus
IPCSocketClient::read_all(std::string &content)
IPCSocketClient::read_all(std::string &content, std::chrono::milliseconds timeout_ms, int attempts)
{
if (this->is_closed()) {
// we had a failure.
return {};
return ReadStatus::UNKNOWN;
}

MessageStorage<356000> bs;

ReadStatus readStatus{ReadStatus::UNKNOWN};
while (true) {
int attempts_left{attempts};
ReadStatus readStatus{ReadStatus::NO_ERROR};
// Try to read all the data from the socket. If a timeout happens we retry
// 'attemps' times. On error we just stop.
while (attempts_left > 0 || readStatus == ReadStatus::NO_ERROR) {
auto buf = bs.writable_data();
const auto to_read = bs.available(); // Available in the current memory chunk.
ssize_t ret{-1};
do {
ret = ::read(_sock, buf, to_read);
} while (ret < 0 && (errno == EAGAIN || errno == EINTR));
ssize_t nread{-1};

if (ret > 0) {
bs.save(ret);
// Try again if timed out.
if (auto const r = read_ready(_sock, timeout_ms.count()); r == 0) {
readStatus = ReadStatus::TIMEOUT;
--attempts_left;
continue;
} else {
if (bs.stored() > 0) {
readStatus = ReadStatus::NO_ERROR;
break;
} else if (r < 0) {
// No more tries.
readStatus = ReadStatus::READ_ERROR;
break;
}

nread = ::read(_sock, buf, to_read);
if (nread > 0) {
bs.save(nread);
continue;
} else if (nread == -1) {
if (errno == EAGAIN || errno == EINTR) {
continue;
}
readStatus = ReadStatus::STREAM_ERROR;
readStatus = ReadStatus::READ_ERROR;
break;
}
// EOF
if (bs.stored() > 0) {
readStatus = ReadStatus::NO_ERROR;
break;
}
readStatus = ReadStatus::READ_ERROR;
break;
}
content = bs.str();
return readStatus;
Expand Down

0 comments on commit 5ef1036

Please sign in to comment.