diff --git a/include/shared/rpc/IPCSocketClient.h b/include/shared/rpc/IPCSocketClient.h index a90ea6be44d..33fbdb62e8d 100644 --- a/include/shared/rpc/IPCSocketClient.h +++ b/include/shared/rpc/IPCSocketClient.h @@ -43,7 +43,7 @@ using namespace std::chrono_literals; /// /// Error handling: Enclose this inside a try/catch because if any error is detected functions will throw. struct IPCSocketClient { - enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, STREAM_ERROR, UNKNOWN }; + enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, READ_ERROR, TIMEOUT, UNKNOWN }; using self_reference = IPCSocketClient &; IPCSocketClient(std::string path = "/tmp/jsonrpc20.sock") : _path{std::move(path)} { memset(&_server, 0, sizeof(_server)); } @@ -52,13 +52,14 @@ struct IPCSocketClient { /// Connect to the configured socket path. /// Connection will retry every @c ms for @c attempts times if errno is EAGAIN - self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts = 5); + self_reference connect(std::chrono::milliseconds wait_ms = 40ms, int attempts = 5); /// Send all the passed string to the socket. self_reference send(std::string_view data); - /// Read all the content from the socket till the message is complete. - ReadStatus read_all(std::string &content); + /// Read all the content until the fd closes or timeout( @c timeout_ms * @c attempts) has passed. + /// @return @c ReadStatus will be set accordingly with the operation result. + ReadStatus read_all(std::string &content, std::chrono::milliseconds timeout_ms = 1000ms, int attempts = 10); /// Closes the socket. void diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h index e4492f156c9..57c629f1aa0 100644 --- a/include/shared/rpc/RPCClient.h +++ b/include/shared/rpc/RPCClient.h @@ -67,8 +67,11 @@ class RPCClient // responses. ink_assert(!"Buffer full, not enough space to read the response."); break; - case IPCSocketClient::ReadStatus::STREAM_ERROR: - err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while reading response. {}({})", std::strerror(errno), errno); + case IPCSocketClient::ReadStatus::READ_ERROR: + err_text = swoc::bwprint(err_text, "READ_ERROR: Error while reading response. {}({})", std::strerror(errno), errno); + break; + case IPCSocketClient::ReadStatus::TIMEOUT: + err_text = swoc::bwprint(err_text, "TIMEOUT: Couldn't get the response. {}({})", std::strerror(errno), errno); break; default: err_text = "Something happened, we can't read the response. Unknown error."; diff --git a/src/shared/rpc/IPCSocketClient.cc b/src/shared/rpc/IPCSocketClient.cc index ab04f681a72..45a67061b57 100644 --- a/src/shared/rpc/IPCSocketClient.cc +++ b/src/shared/rpc/IPCSocketClient.cc @@ -35,7 +35,7 @@ namespace shared::rpc { IPCSocketClient::self_reference -IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts) +IPCSocketClient::connect(std::chrono::milliseconds wait_ms, int attempts) { std::string text; int err, tries{attempts}; @@ -67,7 +67,7 @@ IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts) if (errno == EAGAIN || errno == EINPROGRESS) { // Connection cannot be completed immediately // EAGAIN for UDS should suffice, but just in case. - std::this_thread::sleep_for(ms); + std::this_thread::sleep_for(wait_ms); err = errno; continue; } else { @@ -117,35 +117,52 @@ IPCSocketClient ::send(std::string_view data) } IPCSocketClient::ReadStatus -IPCSocketClient::read_all(std::string &content) +IPCSocketClient::read_all(std::string &content, std::chrono::milliseconds timeout_ms, int attempts) { if (this->is_closed()) { // we had a failure. - return {}; + return ReadStatus::UNKNOWN; } MessageStorage<356000> bs; - - ReadStatus readStatus{ReadStatus::UNKNOWN}; - while (true) { + int attempts_left{attempts}; + ReadStatus readStatus{ReadStatus::NO_ERROR}; + // Try to read all the data from the socket. If a timeout happens we retry + // 'attemps' times. On error we just stop. + while (attempts_left > 0 || readStatus == ReadStatus::NO_ERROR) { auto buf = bs.writable_data(); const auto to_read = bs.available(); // Available in the current memory chunk. - ssize_t ret{-1}; - do { - ret = ::read(_sock, buf, to_read); - } while (ret < 0 && (errno == EAGAIN || errno == EINTR)); + ssize_t nread{-1}; - if (ret > 0) { - bs.save(ret); + // Try again if timed out. + if (auto const r = read_ready(_sock, timeout_ms.count()); r == 0) { + readStatus = ReadStatus::TIMEOUT; + --attempts_left; continue; - } else { - if (bs.stored() > 0) { - readStatus = ReadStatus::NO_ERROR; - break; + } else if (r < 0) { + // No more tries. + readStatus = ReadStatus::READ_ERROR; + break; + } + + nread = ::read(_sock, buf, to_read); + if (nread > 0) { + bs.save(nread); + continue; + } else if (nread == -1) { + if (errno == EAGAIN || errno == EINTR) { + continue; } - readStatus = ReadStatus::STREAM_ERROR; + readStatus = ReadStatus::READ_ERROR; + break; + } + // EOF + if (bs.stored() > 0) { + readStatus = ReadStatus::NO_ERROR; break; } + readStatus = ReadStatus::READ_ERROR; + break; } content = bs.str(); return readStatus;