Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear read callback on EOF #550

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3940,12 +3940,14 @@ HQSession::HQStreamTransport::resumeWebTransportIngress(

folly::Expected<folly::Unit, WebTransport::ErrorCode>
HQSession::HQStreamTransport::stopReadingWebTransportIngress(
HTTPCodec::StreamID id, uint32_t errorCode) {
HTTPCodec::StreamID id, folly::Optional<uint32_t> errorCode) {
if (session_.sock_) {
auto res = session_.sock_->setReadCallback(
id,
nullptr,
quic::ApplicationErrorCode(WebTransport::toHTTPErrorCode(errorCode)));
quic::Optional<quic::ApplicationErrorCode> quicErrorCode;
if (errorCode) {
quicErrorCode =
quic::ApplicationErrorCode(WebTransport::toHTTPErrorCode(*errorCode));
}
auto res = session_.sock_->setReadCallback(id, nullptr, quicErrorCode);
if (res.hasError()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
Expand Down
5 changes: 3 additions & 2 deletions proxygen/lib/http/session/HQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -1878,8 +1878,9 @@ class HQSession
resumeWebTransportIngress(HTTPCodec::StreamID /*id*/) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode>
stopReadingWebTransportIngress(HTTPCodec::StreamID /*id*/,
uint32_t /*errorCode*/) override;
stopReadingWebTransportIngress(
HTTPCodec::StreamID /*id*/,
folly::Optional<uint32_t> /*errorCode*/) override;

}; // HQStreamTransport

Expand Down
5 changes: 3 additions & 2 deletions proxygen/lib/http/session/HTTPTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,9 @@ class HTTPTransaction
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
stopReadingWebTransportIngress(HTTPCodec::StreamID /*id*/,
uint32_t /*errorCode*/) override {
stopReadingWebTransportIngress(
HTTPCodec::StreamID /*id*/,
folly::Optional<uint32_t> /*errorCode*/) override {
LOG(FATAL) << __func__ << " not supported";
folly::assume_unreachable();
}
Expand Down
2 changes: 1 addition & 1 deletion proxygen/lib/http/session/test/HTTPTransactionMocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class MockHTTPTransactionTransport : public HTTPTransaction::Transport {
(HTTPCodec::StreamID));
MOCK_METHOD((folly::Expected<folly::Unit, WebTransport::ErrorCode>),
stopReadingWebTransportIngress,
(HTTPCodec::StreamID, uint32_t));
(HTTPCodec::StreamID, folly::Optional<uint32_t>));

MOCK_METHOD(void, trackEgressBodyOffset, (uint64_t, ByteEvent::EventFlags));

Expand Down
42 changes: 30 additions & 12 deletions proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ using WTFCState = proxygen::WebTransport::FCState;
namespace {
constexpr uint32_t WT_APP_ERROR_1 = 19;
constexpr uint32_t WT_APP_ERROR_2 = 77;
folly::Optional<uint32_t> makeOpt(uint32_t n) {
return n;
}
} // namespace

namespace proxygen::test {
Expand Down Expand Up @@ -124,7 +127,8 @@ TEST_F(HTTPTransactionWebTransportTest, CreateStreams) {
.WillOnce(Return(folly::unit));
EXPECT_EQ(res->writeHandle->getID(), 0);
res->writeHandle->resetStream(WT_APP_ERROR_1);
EXPECT_CALL(transport_, stopReadingWebTransportIngress(0, WT_APP_ERROR_2))
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, makeOpt(WT_APP_ERROR_2)))
.WillOnce(Return(folly::unit));
res->readHandle->stopSending(WT_APP_ERROR_2);

Expand Down Expand Up @@ -198,6 +202,10 @@ TEST_F(HTTPTransactionWebTransportTest, ReadStream) {
});
EXPECT_FALSE(fut.isReady());

// it gets stopReadingWebTransportIngress when the EOF is read out
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, folly::Optional<uint32_t>()));

implHandle->dataAvailable(nullptr, true);
eventBase_.loopOnce();
EXPECT_TRUE(fut.isReady());
Expand Down Expand Up @@ -264,8 +272,9 @@ TEST_F(HTTPTransactionWebTransportTest, ReadStreamCancel) {
auto fut = readHandle->readStreamData();

// Cancel the future, the transport will get a STOP_SENDING
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, WebTransport::kInternalError))
EXPECT_CALL(
transport_,
stopReadingWebTransportIngress(0, makeOpt(WebTransport::kInternalError)))
.WillOnce(Return(folly::unit));
fut.cancel();
EXPECT_TRUE(fut.isReady());
Expand Down Expand Up @@ -391,6 +400,10 @@ TEST_F(HTTPTransactionWebTransportTest, BidiStreamEdgeCases) {
// deliver EOF before read
bidiHandle.readHandle->dataAvailable(nullptr, true);

// it gets stopReadingWebTransportIngress when the EOF is read out
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, folly::Optional<uint32_t>()));

auto fut = streamHandle.readHandle->readStreamData()
.via(&eventBase_)
.thenTry([](auto streamData) {
Expand Down Expand Up @@ -448,8 +461,9 @@ TEST_F(HTTPTransactionWebTransportTest, StreamDetachWithOpenStreams) {
});
HTTPException ex(HTTPException::Direction::INGRESS_AND_EGRESS, "aborted");
handler_.expectError();
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, WebTransport::kInternalError));
EXPECT_CALL(
transport_,
stopReadingWebTransportIngress(0, makeOpt(WebTransport::kInternalError)));
txn_->onError(ex);
EXPECT_TRUE(readCancelled);
EXPECT_TRUE(writeCancelled);
Expand All @@ -458,14 +472,16 @@ TEST_F(HTTPTransactionWebTransportTest, StreamDetachWithOpenStreams) {
TEST_F(HTTPTransactionWebTransportTest, NoHandler) {
TearDown();
setup(/*withHandler=*/false);
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, WebTransport::kInternalError))
EXPECT_CALL(
transport_,
stopReadingWebTransportIngress(0, makeOpt(WebTransport::kInternalError)))
.RetiresOnSaturation();
EXPECT_CALL(transport_,
resetWebTransportEgress(0, WebTransport::kInternalError));
txn_->onWebTransportBidiStream(0);
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(1, WebTransport::kInternalError));
EXPECT_CALL(
transport_,
stopReadingWebTransportIngress(1, makeOpt(WebTransport::kInternalError)));
txn_->onWebTransportUniStream(1);
}

Expand Down Expand Up @@ -493,7 +509,8 @@ TEST_F(HTTPTransactionWebTransportTest, StreamIDAPIs) {
EXPECT_TRUE(fut.isReady());

// stopSending by ID
EXPECT_CALL(transport_, stopReadingWebTransportIngress(0, WT_APP_ERROR_1))
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, makeOpt(WT_APP_ERROR_1)))
.WillOnce(Return(folly::unit));
wt_->stopSending(id, WT_APP_ERROR_1);

Expand Down Expand Up @@ -542,7 +559,7 @@ TEST_F(HTTPTransactionWebTransportTest, RefreshTimeout) {
[this] {
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(
0, std::numeric_limits<uint32_t>::max()))
0, makeOpt(std::numeric_limits<uint32_t>::max())))
.WillOnce(Return(folly::unit));
handler_.expectEOM();
txn_->onIngressEOM();
Expand All @@ -560,7 +577,8 @@ TEST_F(HTTPTransactionWebTransportTest, StopSendingThenAbort) {

txn_->onWebTransportUniStream(0);
EXPECT_NE(readHandle, nullptr);
EXPECT_CALL(transport_, stopReadingWebTransportIngress(0, WT_APP_ERROR_2))
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, makeOpt(WT_APP_ERROR_2)))
.WillOnce(Return(folly::unit));
readHandle->stopSending(WT_APP_ERROR_2);
// there's no way to abort this stream anymore. stopSending removes the
Expand Down
11 changes: 7 additions & 4 deletions proxygen/lib/http/webtransport/QuicWebTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,14 @@ QuicWebTransport::resumeWebTransportIngress(HTTPCodec::StreamID id) {
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::stopReadingWebTransportIngress(HTTPCodec::StreamID id,
uint32_t errorCode) {
QuicWebTransport::stopReadingWebTransportIngress(
HTTPCodec::StreamID id, folly::Optional<uint32_t> errorCode) {
XCHECK(quicSocket_);
auto res = quicSocket_->setReadCallback(
id, nullptr, quic::ApplicationErrorCode(errorCode));
quic::Optional<quic::ApplicationErrorCode> quicErrorCode;
if (errorCode) {
quicErrorCode = quic::ApplicationErrorCode(*errorCode);
}
auto res = quicSocket_->setReadCallback(id, nullptr, quicErrorCode);
if (res.hasError()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
Expand Down
5 changes: 3 additions & 2 deletions proxygen/lib/http/webtransport/QuicWebTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ class QuicWebTransport
resumeWebTransportIngress(HTTPCodec::StreamID /*id*/) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode>
stopReadingWebTransportIngress(HTTPCodec::StreamID /*id*/,
uint32_t /*errorCode*/) override;
stopReadingWebTransportIngress(
HTTPCodec::StreamID /*id*/,
folly::Optional<uint32_t> /*errorCode*/) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> /*datagram*/) override;
Expand Down
10 changes: 6 additions & 4 deletions proxygen/lib/http/webtransport/WebTransportImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ WebTransportImpl::resetWebTransportEgress(HTTPCodec::StreamID id,
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
WebTransportImpl::stopReadingWebTransportIngress(HTTPCodec::StreamID id,
uint32_t errorCode) {
WebTransportImpl::stopReadingWebTransportIngress(
HTTPCodec::StreamID id, folly::Optional<uint32_t> errorCode) {
auto res = tp_.stopReadingWebTransportIngress(id, errorCode);
wtIngressStreams_.erase(id);
sp_.refreshTimeout();
Expand Down Expand Up @@ -229,7 +229,8 @@ WebTransportImpl::StreamReadHandle::readStreamData() {
auto bufLen = buf_.chainLength();
WebTransport::StreamData streamData({buf_.move(), eof_});
if (eof_) {
impl_.wtIngressStreams_.erase(getID());
// unregister the read callback, but don't send STOP_SENDING
impl_.stopReadingWebTransportIngress(id_, folly::none);
} else if (bufLen >= kMaxWTIngressBuf) {
VLOG(4) << __func__ << " resuming reads";
impl_.tp_.resumeWebTransportIngress(getID());
Expand Down Expand Up @@ -269,7 +270,8 @@ WebTransport::FCState WebTransportImpl::StreamReadHandle::dataAvailable(
readPromise_->setValue(WebTransport::StreamData({std::move(data), eof}));
readPromise_.reset();
if (eof) {
impl_.wtIngressStreams_.erase(getID());
// unregister the read callback, but don't send STOP_SENDING
impl_.stopReadingWebTransportIngress(getID(), folly::none);
return WebTransport::FCState::UNBLOCKED;
}
} else {
Expand Down
8 changes: 5 additions & 3 deletions proxygen/lib/http/webtransport/WebTransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ class WebTransportImpl : public WebTransport {
resumeWebTransportIngress(HTTPCodec::StreamID /*id*/) = 0;

virtual folly::Expected<folly::Unit, WebTransport::ErrorCode>
stopReadingWebTransportIngress(HTTPCodec::StreamID /*id*/,
uint32_t /*errorCode*/) = 0;
stopReadingWebTransportIngress(
HTTPCodec::StreamID /*id*/,
folly::Optional<uint32_t> /*errorCode*/) = 0;
virtual folly::Expected<folly::Unit, WebTransport::ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> /*datagram*/) = 0;

Expand Down Expand Up @@ -273,7 +274,8 @@ class WebTransportImpl : public WebTransport {
HTTPCodec::StreamID id, uint32_t errorCode);

folly::Expected<folly::Unit, WebTransport::ErrorCode>
stopReadingWebTransportIngress(HTTPCodec::StreamID id, uint32_t errorCode);
stopReadingWebTransportIngress(HTTPCodec::StreamID id,
folly::Optional<uint32_t> errorCode);

folly::Expected<WebTransport::BidiStreamHandle, WebTransport::ErrorCode>
newWebTransportBidiStream();
Expand Down
Loading