Skip to content

Commit

Permalink
Initial ReadMultiplePackets plumbing.
Browse files Browse the repository at this point in the history
This adds support for ReadMultiplePackets() in the QuicChromiumPacketReader and
in net::Socket.

The structure attempts to mirror the QuicPacketReader that is not used with
chromium, but does have an implementation for ReadMultiplePackets().

ReadMultiplePackets() will initially only be implemented for UDPSocketStarboard,
and only for connected sockets.

There is an automatic fallback to single packet reading when
ReadMultiplePackets() is not implemented for the socket or an unexpected error
occurs when attempting to read multiple packets in one call.
  • Loading branch information
jellefoks committed Oct 29, 2024
1 parent 38e817e commit 51135a8
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 18 deletions.
118 changes: 118 additions & 0 deletions net/quic/quic_chromium_packet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const size_t kReadBufferSize =
static_cast<size_t>(quic::kMaxIncomingPacketSize + 1);
} // namespace

#if defined(STARBOARD)
bool QuicChromiumPacketReader::try_reading_multiple_packets_{true};
#endif

QuicChromiumPacketReader::QuicChromiumPacketReader(
DatagramClientSocket* socket,
const quic::QuicClock* clock,
Expand All @@ -39,7 +43,121 @@ QuicChromiumPacketReader::QuicChromiumPacketReader(

QuicChromiumPacketReader::~QuicChromiumPacketReader() = default;

#if defined(STARBOARD)
int QuicChromiumPacketReader::StartReadingMultiplePackets() {
for (;;) {
if (read_pending_)
return OK;

if (num_packets_read_ == 0)
yield_after_ = clock_->Now() + yield_after_duration_;

CHECK(socket_);
read_pending_ = true;
int rv = socket_->ReadMultiplePackets(
&read_results_, kReadBufferSize,
base::BindOnce(&QuicChromiumPacketReader::OnReadMultiplePacketComplete,
weak_factory_.GetWeakPtr()));
if (rv == ERR_NOT_IMPLEMENTED)
// The platform reports that ReadMultiplePackets is not implemented.
return rv;

UMA_HISTOGRAM_BOOLEAN("Net.QuicSession.AsyncRead", rv == ERR_IO_PENDING);
if (rv == ERR_IO_PENDING) {
num_packets_read_ = 0;
return rv;
}

if (++num_packets_read_ > yield_after_packets_ ||
clock_->Now() > yield_after_) {
num_packets_read_ = 0;
// Data was read, process it.
// Schedule the work through the message loop to 1) prevent infinite
// recursion and 2) avoid blocking the thread for too long.
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(
&QuicChromiumPacketReader::OnReadMultiplePacketComplete,
weak_factory_.GetWeakPtr(), rv));
} else {
if (!ProcessMultiplePacketReadResult(rv)) {
return rv < 0 ? rv : OK;
}
}
}

return OK;
}

bool QuicChromiumPacketReader::ProcessMultiplePacketReadResult(int result) {
read_pending_ = false;
if (result <= 0 && net_log_.IsCapturing()) {
net_log_.AddEventWithIntParams(NetLogEventType::QUIC_READ_ERROR,
"net_error", result);
}
if (result == 0) {
// 0-length UDP packets are legal but useless, ignore them.
return true;
}
if (result == ERR_MSG_TOO_BIG) {
// This indicates that we received a UDP packet larger than our receive
// buffer, ignore it.
return true;
}
if (result < 0) {
// Report all other errors to the visitor.
return visitor_->OnReadError(result, socket_);
}

IPEndPoint local_address;
IPEndPoint peer_address;
socket_->GetLocalAddress(&local_address);
socket_->GetPeerAddress(&peer_address);
quic::QuicSocketAddress quick_local_address =
ToQuicSocketAddress(local_address);
quic::QuicSocketAddress quick_peer_address =
ToQuicSocketAddress(peer_address);

auto self = weak_factory_.GetWeakPtr();
for (size_t i = 0; i < result; ++i) {
auto& result = read_results_[i];
quic::QuicReceivedPacket packet(read_buffer_->data(), result,
clock_->Now());
if (!(visitor_->OnPacket(packet, quick_local_address, quick_peer_address) &&
self)) {
return false;
}
}

return true;
}

void QuicChromiumPacketReader::OnReadMultiplePacketComplete(int result) {
if (ProcessMultiplePacketReadResult(result))
StartReadingMultiplePackets();
}

#endif

void QuicChromiumPacketReader::StartReading() {
#if defined(STARBOARD)
if (try_reading_multiple_packets_) {
int rv = StartReadingMultiplePackets();
if (rv == OK || rv == ERR_IO_PENDING) {
// If there was no error, or a callback was scheduled, there is no need
// to attempt single packet reading.
return;
} else {
if (rv == ERR_NOT_IMPLEMENTED) {
// Remember that the platform reported that ReadMultiplePackets is not
// implemented.
try_reading_multiple_packets_ = false;
read_results_.clear();
}
}
}
#endif

for (;;) {
if (read_pending_)
return;
Expand Down
18 changes: 18 additions & 0 deletions net/quic/quic_chromium_packet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketReader {
// Return true if reading should continue.
bool ProcessReadResult(int result);

#if defined(STARBOARD)
// Version of StartReading that reads multiple packets per read call.
int StartReadingMultiplePackets();
// A completion callback invoked when a multiple packet read completes.
void OnReadMultiplePacketComplete(int result);
// Return true if reading should continue.
bool ProcessMultiplePacketReadResult(int result);
#endif

raw_ptr<DatagramClientSocket, DanglingUntriaged> socket_;

raw_ptr<Visitor> visitor_;
Expand All @@ -75,6 +84,15 @@ class NET_EXPORT_PRIVATE QuicChromiumPacketReader {
NetLogWithSource net_log_;

base::WeakPtrFactory<QuicChromiumPacketReader> weak_factory_{this};

#if defined(STARBOARD)
// Static flag to remember when ReadMultiplePackets has ever returned
// ERR_NOT_IMPLEMENTED
static bool try_reading_multiple_packets_;

// Results from ReadMultiplePackets.
Socket::ReadPacketResults read_results_;
#endif
};

} // namespace net
Expand Down
9 changes: 9 additions & 0 deletions net/socket/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ Socket::Socket() = default;

Socket::~Socket() = default;

#if defined(STARBOARD)
int Socket::ReadMultiplePackets(ReadPacketResults* results,
int read_buffer_size,
CompletionOnceCallback callback) {
// Default to not implemented
return ERR_NOT_IMPLEMENTED;
}
#endif

int Socket::ReadIfReady(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback) {
Expand Down
7 changes: 7 additions & 0 deletions net/socket/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class NET_EXPORT Socket {
virtual int Read(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback) = 0;
#if defined(STARBOARD)
using ReadPacketResult = int;
using ReadPacketResults = std::vector<ReadPacketResult>;
virtual int ReadMultiplePackets(ReadPacketResults* results,
int read_buffer_size,
CompletionOnceCallback callback);
#endif

// Reads data, up to |buf_len| bytes, into |buf| without blocking. Default
// implementation returns ERR_READ_IF_READY_NOT_IMPLEMENTED. Caller should
Expand Down
9 changes: 9 additions & 0 deletions net/socket/udp_client_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ void UDPClientSocket::ApplySocketTag(const SocketTag& tag) {
socket_.ApplySocketTag(tag);
}

#if defined(STARBOARD)
int UDPClientSocket::ReadMultiplePackets(ReadPacketResults* results,
int read_buffer_size,
CompletionOnceCallback callback) {
return socket_.ReadMultiplePackets(results, read_buffer_size,
std::move(callback));
}
#endif

int UDPClientSocket::Read(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback) {
Expand Down
5 changes: 5 additions & 0 deletions net/socket/udp_client_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class NET_EXPORT_PRIVATE UDPClientSocket : public DatagramClientSocket {
int Read(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback) override;
#if defined(STARBOARD)
int ReadMultiplePackets(ReadPacketResults* results,
int read_buffer_size,
CompletionOnceCallback callback) override;
#endif
int Write(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback,
Expand Down
94 changes: 77 additions & 17 deletions net/socket/udp_socket_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@

namespace net {

namespace {
// Read in larger batches to minimize recvmmsg overhead.
const int kNumPacketsPerReadMmsgCall = 16;
} // namespace

UDPSocketStarboard::UDPSocketStarboard(DatagramSocket::BindType bind_type,
net::NetLog* net_log,
const net::NetLogSource& source)
Expand All @@ -44,8 +49,9 @@ UDPSocketStarboard::UDPSocketStarboard(DatagramSocket::BindType bind_type,
socket_options_(0),
bind_type_(bind_type),
socket_watcher_(FROM_HERE),
read_buf_(nullptr),
read_buf_len_(0),
recv_from_address_(NULL),
recv_from_address_(nullptr),
write_buf_len_(0),
net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
weak_factory_(this) {
Expand Down Expand Up @@ -159,6 +165,46 @@ int UDPSocketStarboard::GetLocalAddress(IPEndPoint* address) const {
return OK;
}

int UDPSocketStarboard::StartWatchingSocketForReading() {
if (!base::CurrentIOThread::Get()->Watch(
socket_, true, base::MessagePumpIOStarboard::WATCH_READ,
&socket_watcher_, this)) {
PLOG(ERROR) << "WatchSocket failed on read";
Error result = MapLastSocketError(socket_);
if (result == ERR_IO_PENDING) {
// Watch(...) might call SbSocketWaiterAdd() which does not guarantee
// setting system error on failure, but we need to treat this as an
// error since watching the socket failed.
result = ERR_FAILED;
}
LogRead(result, NULL, NULL);
return result < 0 ? result : ERR_FAILED;
}
return OK;
}

int UDPSocketStarboard::ReadMultiplePackets(Socket::ReadPacketResults* results,
int read_buffer_size,
CompletionOnceCallback callback) {
LOG(WARNING) << __FUNCTION__ << " not implemented";
if (!is_connected_) {
// This is only implemented correctly for connected sockets.
return ERR_SOCKET_NOT_CONNECTED;
}
int nread = InternalReadMultiplePackets(results, read_buffer_size);
if (nread != ERR_IO_PENDING)
return nread;

int rv = StartWatchingSocketForReading();
if (rv < 0)
return rv;

results_ = results;
read_buf_len_ = read_buffer_size;
read_callback_ = std::move(callback);
return ERR_IO_PENDING;
}

int UDPSocketStarboard::Read(IOBuffer* buf,
int buf_len,
CompletionOnceCallback callback) {
Expand All @@ -180,20 +226,9 @@ int UDPSocketStarboard::RecvFrom(IOBuffer* buf,
if (nread != ERR_IO_PENDING)
return nread;

if (!base::CurrentIOThread::Get()->Watch(
socket_, true, base::MessagePumpIOStarboard::WATCH_READ,
&socket_watcher_, this)) {
PLOG(ERROR) << "WatchSocket failed on read";
Error result = MapLastSocketError(socket_);
if (result == ERR_IO_PENDING) {
// Watch(...) might call SbSocketWaiterAdd() which does not guarantee
// setting system error on failure, but we need to treat this as an
// error since watching the socket failed.
result = ERR_FAILED;
}
LogRead(result, NULL, NULL);
return result;
}
int rv = StartWatchingSocketForReading();
if (rv < 0)
return rv;

read_buf_ = buf;
read_buf_len_ = buf_len;
Expand Down Expand Up @@ -368,8 +403,13 @@ int UDPSocketStarboard::AllowAddressSharingForMulticast() {
}

void UDPSocketStarboard::OnSocketReadyToRead(SbSocket /*socket*/) {
if (!read_callback_.is_null())
DidCompleteRead();
if (!read_callback_.is_null()) {
if (results_) {
DidCompleteMultiplePacketRead();
} else {
DidCompleteRead();
}
}
}

void UDPSocketStarboard::OnSocketReadyToWrite(SbSocket socket) {
Expand Down Expand Up @@ -419,6 +459,16 @@ void UDPSocketStarboard::DidCompleteRead() {
}
}

void UDPSocketStarboard::DidCompleteMultiplePacketRead() {
int result = InternalReadMultiplePackets(results_, read_buf_len_);
if (result != ERR_IO_PENDING) {
results_ = NULL;
read_buf_len_ = 0;
InternalStopWatchingSocket();
DoReadCallback(result);
}
}

void UDPSocketStarboard::LogRead(int result,
const char* bytes,
const IPEndPoint* address) const {
Expand Down Expand Up @@ -498,6 +548,16 @@ int UDPSocketStarboard::InternalRecvFrom(IOBuffer* buf,
return result;
}

int UDPSocketStarboard::InternalReadMultiplePackets(
Socket::ReadPacketResults* results,
int read_buffer_size) {
if (!results) {
NOTREACHED() << __FUNCTION__ << " No results";
return;
}
return ERR_NOT_IMPLEMENTED;
}

int UDPSocketStarboard::InternalSendTo(IOBuffer* buf,
int buf_len,
const IPEndPoint* address) {
Expand Down
Loading

0 comments on commit 51135a8

Please sign in to comment.