Skip to content

Commit

Permalink
Make SessionCloseErrorCode mandatory in MoQSession::close
Browse files Browse the repository at this point in the history
Summary: This may not be forever, setting the value to NO_ERROR by default seems reasonable, but for now, this forces us to think about the appropriate error code.

Reviewed By: sharmafb

Differential Revision: D67878869

fbshipit-source-id: 7abcca17ef04f22d3bdbafc34ae398beb0accec9
  • Loading branch information
afrind authored and facebook-github-bot committed Jan 7, 2025
1 parent 77802ca commit 70ffd57
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 34 deletions.
4 changes: 3 additions & 1 deletion moxygen/MoQFramer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ enum class SubscribeDoneStatusCode : uint32_t {
TRACK_ENDED = 0x3,
SUBSCRIPTION_ENDED = 0x4,
GOING_AWAY = 0x5,
EXPIRED = 0x6
EXPIRED = 0x6,
//
SESSION_CLOSED = std::numeric_limits<uint32_t>::max()
};

enum class TrackStatusCode : uint32_t {
Expand Down
28 changes: 14 additions & 14 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ class MoQSession::SubscribeTrackReceiveState
} else {
subscribeDone(
{subscribeID_,
SubscribeDoneStatusCode::INTERNAL_ERROR,
SubscribeDoneStatusCode::SESSION_CLOSED,
"closed locally",
folly::none});
}
Expand Down Expand Up @@ -936,11 +936,11 @@ void MoQSession::drain() {

void MoQSession::checkForCloseOnDrain() {
if (draining_ && fetches_.empty() && subTracks_.empty()) {
close();
close(SessionCloseErrorCode::NO_ERROR);
}
}

void MoQSession::close(folly::Optional<SessionCloseErrorCode> error) {
void MoQSession::close(SessionCloseErrorCode error) {
XLOG(DBG1) << __func__ << " sess=" << this;
if (wt_) {
// TODO: The error code should be propagated to
Expand All @@ -951,10 +951,7 @@ void MoQSession::close(folly::Optional<SessionCloseErrorCode> error) {

cleanup();

wt->closeSession(
error.has_value()
? folly::make_optional(folly::to_underlying(error.value()))
: folly::none);
wt->closeSession(folly::to_underlying(error));
XLOG(DBG1) << "requestCancellation from close sess=" << this;
cancellationSource_.requestCancellation();
}
Expand Down Expand Up @@ -1022,7 +1019,7 @@ folly::coro::Task<ServerSetup> MoQSession::setup(ClientSetup setup) {
co_yield folly::coro::co_error(folly::OperationCancelled());
}
if (serverSetup.hasException()) {
close();
close(SessionCloseErrorCode::INTERNAL_ERROR);
XLOG(ERR) << "Setup Failed: "
<< folly::exceptionStr(serverSetup.exception());
co_yield folly::coro::co_error(serverSetup.exception());
Expand All @@ -1037,7 +1034,7 @@ void MoQSession::onServerSetup(ServerSetup serverSetup) {
if (serverSetup.selectedVersion != kVersionDraftCurrent) {
XLOG(ERR) << "Invalid version = " << serverSetup.selectedVersion
<< " sess=" << this;
close();
close(SessionCloseErrorCode::PROTOCOL_VIOLATION);
setupPromise_.setException(std::runtime_error("Invalid version"));
return;
}
Expand All @@ -1056,15 +1053,15 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) {
for (auto v : clientSetup.supportedVersions) {
XLOG(ERR) << "client sent=" << v << " sess=" << this;
}
close();
close(SessionCloseErrorCode::PROTOCOL_VIOLATION);
return;
}
peerMaxSubscribeID_ = getMaxSubscribeIdIfPresent(clientSetup.params);
auto serverSetup =
serverSetupCallback_->onClientSetup(std::move(clientSetup));
if (!serverSetup.hasValue()) {
XLOG(ERR) << "Server setup callback failed sess=" << this;
close();
close(SessionCloseErrorCode::INTERNAL_ERROR);
return;
}

Expand Down Expand Up @@ -1810,8 +1807,11 @@ void MoQSession::onGoaway(Goaway goaway) {

void MoQSession::onConnectionError(ErrorCode error) {
XLOG(DBG1) << __func__ << " sess=" << this;
XLOG(ERR) << "err=" << folly::to_underlying(error);
// TODO
XLOG(ERR) << "MoQCodec control stream parse error err="
<< folly::to_underlying(error);
// TODO: This error is coming from MoQCodec -- do we need a better
// error code?
close(SessionCloseErrorCode::PROTOCOL_VIOLATION);
}

folly::coro::Task<folly::Expected<AnnounceOk, AnnounceError>>
Expand Down Expand Up @@ -2225,7 +2225,7 @@ void MoQSession::onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) {
XLOG(DBG1) << __func__ << " sess=" << this;
if (!setupComplete_) {
XLOG(ERR) << "Uni stream before setup complete sess=" << this;
close();
close(SessionCloseErrorCode::PROTOCOL_VIOLATION);
return;
}
// maybe not STREAM_HEADER_SUBGROUP, but at least not control
Expand Down
11 changes: 7 additions & 4 deletions moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MoQSession : public MoQControlCodec::ControlCallback,

void start();
void drain();
void close(folly::Optional<SessionCloseErrorCode> error = folly::none);
void close(SessionCloseErrorCode error);

folly::coro::Task<ServerSetup> setup(ClientSetup setup);
folly::coro::Task<void> clientSetupComplete() {
Expand Down Expand Up @@ -250,9 +250,12 @@ class MoQSession : public MoQControlCodec::ControlCallback,
void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override;
void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override;
void onDatagram(std::unique_ptr<folly::IOBuf> datagram) override;
void onSessionEnd(folly::Optional<uint32_t>) override {
XLOG(DBG1) << __func__ << " sess=" << this;
close();
void onSessionEnd(folly::Optional<uint32_t> err) override {
XLOG(DBG1) << __func__ << "err="
<< (err ? folly::to<std::string>(*err) : std::string("none"))
<< " sess=" << this;
// The peer closed us, but we can close with NO_ERROR
close(SessionCloseErrorCode::NO_ERROR);
}

class TrackReceiveStateBase;
Expand Down
2 changes: 1 addition & 1 deletion moxygen/samples/chat/MoQChatClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void MoQChatClient::publishLoop() {
moqClient_.getEventBase()->runInEventBaseThread([this, input] {
if (input == "/leave") {
XLOG(INFO) << "Leaving chat";
moqClient_.moqSession_->close();
moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR);
moqClient_.moqSession_.reset();
} else if (chatSubscribeID_) {
if (publisher_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MoQFlvStreamerClient {
XLOG(INFO) << __func__;
if (moqClient_.moqSession_) {
moqClient_.moqSession_->unannounce({trackNamespace_});
moqClient_.moqSession_->close();
moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR);
}
}

Expand Down
2 changes: 1 addition & 1 deletion moxygen/samples/text-client/MoQTextClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class MoQTextClient {
textHandler_.baton.post();
// TODO: maybe need fetchCancel + fetchTextHandler_.baton.post()
moqClient_.moqSession_->unsubscribe({subscribeID_});
moqClient_.moqSession_->close();
moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR);
}

folly::coro::Task<void> controlReadLoop() {
Expand Down
24 changes: 12 additions & 12 deletions moxygen/test/MoQSessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ void MoQSessionTest::setupMoQSession() {

TEST_F(MoQSessionTest, Setup) {
setupMoQSession();
clientSession_->close();
clientSession_->close(SessionCloseErrorCode::NO_ERROR);
}

MATCHER_P(HasChainDataLengthOf, n, "") {
Expand Down Expand Up @@ -244,7 +244,7 @@ TEST_F(MoQSessionTest, Fetch) {
auto res = co_await session->fetch(getFetch({0, 0}, {0, 1}), fetchCallback);
EXPECT_FALSE(res.hasError());
co_await baton;
session->close();
session->close(SessionCloseErrorCode::NO_ERROR);
};
EXPECT_CALL(serverControl, onFetch(testing::_))
.WillOnce(testing::Invoke([this](Fetch fetch) {
Expand Down Expand Up @@ -295,7 +295,7 @@ TEST_F(MoQSessionTest, FetchCleanupFromStreamFin) {
return folly::unit;
}));
co_await baton;
session->close();
session->close(SessionCloseErrorCode::NO_ERROR);
};
EXPECT_CALL(serverControl, onFetch(testing::_))
.WillOnce(testing::Invoke([this, &fetchPub](Fetch fetch) {
Expand Down Expand Up @@ -324,7 +324,7 @@ TEST_F(MoQSessionTest, FetchError) {
EXPECT_EQ(
res.error().errorCode,
folly::to_underlying(FetchErrorCode::INVALID_RANGE));
session->close();
session->close(SessionCloseErrorCode::NO_ERROR);
};
f(clientSession_).scheduleOn(&eventBase_).start();
eventBase_.loop();
Expand Down Expand Up @@ -362,7 +362,7 @@ TEST_F(MoQSessionTest, FetchCancel) {
/*finFetch=*/true);
// publish after fetchCancel fails
EXPECT_TRUE(res2.hasError());
clientSession->close();
clientSession->close(SessionCloseErrorCode::NO_ERROR);
};
EXPECT_CALL(serverControl, onFetch(testing::_))
.WillOnce(testing::Invoke([this, &fetchPub](Fetch fetch) {
Expand Down Expand Up @@ -399,7 +399,7 @@ TEST_F(MoQSessionTest, FetchEarlyCancel) {
EXPECT_FALSE(res.hasError());
// TODO: this no-ops right now so there's nothing to verify
clientSession->fetchCancel({subscribeID});
clientSession->close();
clientSession->close(SessionCloseErrorCode::NO_ERROR);
};
EXPECT_CALL(serverControl, onFetch(testing::_))
.WillOnce(testing::Invoke([this](Fetch fetch) {
Expand Down Expand Up @@ -440,7 +440,7 @@ TEST_F(MoQSessionTest, FetchBadLength) {
co_await folly::coro::timeout(
std::move(contract.second), std::chrono::milliseconds(100), &tk),
folly::FutureTimeout);
session->close();
session->close(SessionCloseErrorCode::NO_ERROR);
};
EXPECT_CALL(serverControl, onFetch(testing::_))
.WillOnce(testing::Invoke([this](Fetch fetch) {
Expand Down Expand Up @@ -520,7 +520,7 @@ TEST_F(MoQSessionTest, FetchBadID) {
eventBase_.loopOnce();
serverSession_->fetchError({SubscribeID(2000), 500, "local write failed"});
eventBase_.loopOnce();
serverSession_->close();
serverSession_->close(SessionCloseErrorCode::NO_ERROR);
// These are no-ops
}

Expand Down Expand Up @@ -560,7 +560,7 @@ TEST_F(MoQSessionTest, SetupTimeout) {
setup.supportedVersions.push_back(kVersionDraftCurrent);
auto serverSetup = co_await co_awaitTry(clientSession->setup(setup));
EXPECT_TRUE(serverSetup.hasException());
clientSession->close();
clientSession->close(SessionCloseErrorCode::NO_ERROR);
}(clientSession_)
.scheduleOn(&eventBase_)
.start();
Expand All @@ -575,7 +575,7 @@ TEST_F(MoQSessionTest, InvalidVersion) {
setup.supportedVersions.push_back(0xfaceb001);
auto serverSetup = co_await co_awaitTry(clientSession->setup(setup));
EXPECT_TRUE(serverSetup.hasException());
clientSession->close();
clientSession->close(SessionCloseErrorCode::NO_ERROR);
}(clientSession_)
.scheduleOn(&eventBase_)
.start();
Expand All @@ -591,7 +591,7 @@ TEST_F(MoQSessionTest, InvalidServerVersion) {
setup.supportedVersions.push_back(kVersionDraftCurrent);
auto serverSetup = co_await co_awaitTry(clientSession->setup(setup));
EXPECT_TRUE(serverSetup.hasException());
clientSession->close();
clientSession->close(SessionCloseErrorCode::NO_ERROR);
}(clientSession_)
.scheduleOn(&eventBase_)
.start();
Expand All @@ -607,7 +607,7 @@ TEST_F(MoQSessionTest, ServerSetupFail) {
setup.supportedVersions.push_back(kVersionDraftCurrent);
auto serverSetup = co_await co_awaitTry(clientSession->setup(setup));
EXPECT_TRUE(serverSetup.hasException());
clientSession->close();
clientSession->close(SessionCloseErrorCode::NO_ERROR);
}(clientSession_)
.scheduleOn(&eventBase_)
.start();
Expand Down

0 comments on commit 70ffd57

Please sign in to comment.