From 70ffd57d28bcee362d9b66d305ef4b1b2cb55a08 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Mon, 6 Jan 2025 16:24:52 -0800 Subject: [PATCH] Make SessionCloseErrorCode mandatory in MoQSession::close Summary: This may not be forever, setting the value to NO_ERROR by default seems reasonable, but for now, this forces us to think about the appropriate error code. Reviewed By: sharmafb Differential Revision: D67878869 fbshipit-source-id: 7abcca17ef04f22d3bdbafc34ae398beb0accec9 --- moxygen/MoQFramer.h | 4 ++- moxygen/MoQSession.cpp | 28 +++++++++---------- moxygen/MoQSession.h | 11 +++++--- moxygen/samples/chat/MoQChatClient.cpp | 2 +- .../MoQFlvStreamerClient.cpp | 2 +- moxygen/samples/text-client/MoQTextClient.cpp | 2 +- moxygen/test/MoQSessionTest.cpp | 24 ++++++++-------- 7 files changed, 39 insertions(+), 34 deletions(-) diff --git a/moxygen/MoQFramer.h b/moxygen/MoQFramer.h index 772cc345..32e20742 100644 --- a/moxygen/MoQFramer.h +++ b/moxygen/MoQFramer.h @@ -62,7 +62,9 @@ enum class SubscribeDoneStatusCode : uint32_t { TRACK_ENDED = 0x3, SUBSCRIPTION_ENDED = 0x4, GOING_AWAY = 0x5, - EXPIRED = 0x6 + EXPIRED = 0x6, + // + SESSION_CLOSED = std::numeric_limits::max() }; enum class TrackStatusCode : uint32_t { diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index 6bfe3282..663be97f 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -783,7 +783,7 @@ class MoQSession::SubscribeTrackReceiveState } else { subscribeDone( {subscribeID_, - SubscribeDoneStatusCode::INTERNAL_ERROR, + SubscribeDoneStatusCode::SESSION_CLOSED, "closed locally", folly::none}); } @@ -936,11 +936,11 @@ void MoQSession::drain() { void MoQSession::checkForCloseOnDrain() { if (draining_ && fetches_.empty() && subTracks_.empty()) { - close(); + close(SessionCloseErrorCode::NO_ERROR); } } -void MoQSession::close(folly::Optional error) { +void MoQSession::close(SessionCloseErrorCode error) { XLOG(DBG1) << __func__ << " sess=" << this; if (wt_) { // TODO: The error code should be propagated to @@ -951,10 +951,7 @@ void MoQSession::close(folly::Optional error) { cleanup(); - wt->closeSession( - error.has_value() - ? folly::make_optional(folly::to_underlying(error.value())) - : folly::none); + wt->closeSession(folly::to_underlying(error)); XLOG(DBG1) << "requestCancellation from close sess=" << this; cancellationSource_.requestCancellation(); } @@ -1022,7 +1019,7 @@ folly::coro::Task MoQSession::setup(ClientSetup setup) { co_yield folly::coro::co_error(folly::OperationCancelled()); } if (serverSetup.hasException()) { - close(); + close(SessionCloseErrorCode::INTERNAL_ERROR); XLOG(ERR) << "Setup Failed: " << folly::exceptionStr(serverSetup.exception()); co_yield folly::coro::co_error(serverSetup.exception()); @@ -1037,7 +1034,7 @@ void MoQSession::onServerSetup(ServerSetup serverSetup) { if (serverSetup.selectedVersion != kVersionDraftCurrent) { XLOG(ERR) << "Invalid version = " << serverSetup.selectedVersion << " sess=" << this; - close(); + close(SessionCloseErrorCode::PROTOCOL_VIOLATION); setupPromise_.setException(std::runtime_error("Invalid version")); return; } @@ -1056,7 +1053,7 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) { for (auto v : clientSetup.supportedVersions) { XLOG(ERR) << "client sent=" << v << " sess=" << this; } - close(); + close(SessionCloseErrorCode::PROTOCOL_VIOLATION); return; } peerMaxSubscribeID_ = getMaxSubscribeIdIfPresent(clientSetup.params); @@ -1064,7 +1061,7 @@ void MoQSession::onClientSetup(ClientSetup clientSetup) { serverSetupCallback_->onClientSetup(std::move(clientSetup)); if (!serverSetup.hasValue()) { XLOG(ERR) << "Server setup callback failed sess=" << this; - close(); + close(SessionCloseErrorCode::INTERNAL_ERROR); return; } @@ -1810,8 +1807,11 @@ void MoQSession::onGoaway(Goaway goaway) { void MoQSession::onConnectionError(ErrorCode error) { XLOG(DBG1) << __func__ << " sess=" << this; - XLOG(ERR) << "err=" << folly::to_underlying(error); - // TODO + XLOG(ERR) << "MoQCodec control stream parse error err=" + << folly::to_underlying(error); + // TODO: This error is coming from MoQCodec -- do we need a better + // error code? + close(SessionCloseErrorCode::PROTOCOL_VIOLATION); } folly::coro::Task> @@ -2225,7 +2225,7 @@ void MoQSession::onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) { XLOG(DBG1) << __func__ << " sess=" << this; if (!setupComplete_) { XLOG(ERR) << "Uni stream before setup complete sess=" << this; - close(); + close(SessionCloseErrorCode::PROTOCOL_VIOLATION); return; } // maybe not STREAM_HEADER_SUBGROUP, but at least not control diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index b6a5ee19..a61cab51 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -59,7 +59,7 @@ class MoQSession : public MoQControlCodec::ControlCallback, void start(); void drain(); - void close(folly::Optional error = folly::none); + void close(SessionCloseErrorCode error); folly::coro::Task setup(ClientSetup setup); folly::coro::Task clientSetupComplete() { @@ -250,9 +250,12 @@ class MoQSession : public MoQControlCodec::ControlCallback, void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override; void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override; void onDatagram(std::unique_ptr datagram) override; - void onSessionEnd(folly::Optional) override { - XLOG(DBG1) << __func__ << " sess=" << this; - close(); + void onSessionEnd(folly::Optional err) override { + XLOG(DBG1) << __func__ << "err=" + << (err ? folly::to(*err) : std::string("none")) + << " sess=" << this; + // The peer closed us, but we can close with NO_ERROR + close(SessionCloseErrorCode::NO_ERROR); } class TrackReceiveStateBase; diff --git a/moxygen/samples/chat/MoQChatClient.cpp b/moxygen/samples/chat/MoQChatClient.cpp index ae7f591b..a4afb240 100644 --- a/moxygen/samples/chat/MoQChatClient.cpp +++ b/moxygen/samples/chat/MoQChatClient.cpp @@ -157,7 +157,7 @@ void MoQChatClient::publishLoop() { moqClient_.getEventBase()->runInEventBaseThread([this, input] { if (input == "/leave") { XLOG(INFO) << "Leaving chat"; - moqClient_.moqSession_->close(); + moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR); moqClient_.moqSession_.reset(); } else if (chatSubscribeID_) { if (publisher_) { diff --git a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp index 2be643ff..9e9ada02 100644 --- a/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp +++ b/moxygen/samples/flv_streamer_client/MoQFlvStreamerClient.cpp @@ -85,7 +85,7 @@ class MoQFlvStreamerClient { XLOG(INFO) << __func__; if (moqClient_.moqSession_) { moqClient_.moqSession_->unannounce({trackNamespace_}); - moqClient_.moqSession_->close(); + moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR); } } diff --git a/moxygen/samples/text-client/MoQTextClient.cpp b/moxygen/samples/text-client/MoQTextClient.cpp index 169ac4a1..69494c8d 100644 --- a/moxygen/samples/text-client/MoQTextClient.cpp +++ b/moxygen/samples/text-client/MoQTextClient.cpp @@ -201,7 +201,7 @@ class MoQTextClient { textHandler_.baton.post(); // TODO: maybe need fetchCancel + fetchTextHandler_.baton.post() moqClient_.moqSession_->unsubscribe({subscribeID_}); - moqClient_.moqSession_->close(); + moqClient_.moqSession_->close(SessionCloseErrorCode::NO_ERROR); } folly::coro::Task controlReadLoop() { diff --git a/moxygen/test/MoQSessionTest.cpp b/moxygen/test/MoQSessionTest.cpp index 96cf14b9..ae8c1c8a 100644 --- a/moxygen/test/MoQSessionTest.cpp +++ b/moxygen/test/MoQSessionTest.cpp @@ -207,7 +207,7 @@ void MoQSessionTest::setupMoQSession() { TEST_F(MoQSessionTest, Setup) { setupMoQSession(); - clientSession_->close(); + clientSession_->close(SessionCloseErrorCode::NO_ERROR); } MATCHER_P(HasChainDataLengthOf, n, "") { @@ -244,7 +244,7 @@ TEST_F(MoQSessionTest, Fetch) { auto res = co_await session->fetch(getFetch({0, 0}, {0, 1}), fetchCallback); EXPECT_FALSE(res.hasError()); co_await baton; - session->close(); + session->close(SessionCloseErrorCode::NO_ERROR); }; EXPECT_CALL(serverControl, onFetch(testing::_)) .WillOnce(testing::Invoke([this](Fetch fetch) { @@ -295,7 +295,7 @@ TEST_F(MoQSessionTest, FetchCleanupFromStreamFin) { return folly::unit; })); co_await baton; - session->close(); + session->close(SessionCloseErrorCode::NO_ERROR); }; EXPECT_CALL(serverControl, onFetch(testing::_)) .WillOnce(testing::Invoke([this, &fetchPub](Fetch fetch) { @@ -324,7 +324,7 @@ TEST_F(MoQSessionTest, FetchError) { EXPECT_EQ( res.error().errorCode, folly::to_underlying(FetchErrorCode::INVALID_RANGE)); - session->close(); + session->close(SessionCloseErrorCode::NO_ERROR); }; f(clientSession_).scheduleOn(&eventBase_).start(); eventBase_.loop(); @@ -362,7 +362,7 @@ TEST_F(MoQSessionTest, FetchCancel) { /*finFetch=*/true); // publish after fetchCancel fails EXPECT_TRUE(res2.hasError()); - clientSession->close(); + clientSession->close(SessionCloseErrorCode::NO_ERROR); }; EXPECT_CALL(serverControl, onFetch(testing::_)) .WillOnce(testing::Invoke([this, &fetchPub](Fetch fetch) { @@ -399,7 +399,7 @@ TEST_F(MoQSessionTest, FetchEarlyCancel) { EXPECT_FALSE(res.hasError()); // TODO: this no-ops right now so there's nothing to verify clientSession->fetchCancel({subscribeID}); - clientSession->close(); + clientSession->close(SessionCloseErrorCode::NO_ERROR); }; EXPECT_CALL(serverControl, onFetch(testing::_)) .WillOnce(testing::Invoke([this](Fetch fetch) { @@ -440,7 +440,7 @@ TEST_F(MoQSessionTest, FetchBadLength) { co_await folly::coro::timeout( std::move(contract.second), std::chrono::milliseconds(100), &tk), folly::FutureTimeout); - session->close(); + session->close(SessionCloseErrorCode::NO_ERROR); }; EXPECT_CALL(serverControl, onFetch(testing::_)) .WillOnce(testing::Invoke([this](Fetch fetch) { @@ -520,7 +520,7 @@ TEST_F(MoQSessionTest, FetchBadID) { eventBase_.loopOnce(); serverSession_->fetchError({SubscribeID(2000), 500, "local write failed"}); eventBase_.loopOnce(); - serverSession_->close(); + serverSession_->close(SessionCloseErrorCode::NO_ERROR); // These are no-ops } @@ -560,7 +560,7 @@ TEST_F(MoQSessionTest, SetupTimeout) { setup.supportedVersions.push_back(kVersionDraftCurrent); auto serverSetup = co_await co_awaitTry(clientSession->setup(setup)); EXPECT_TRUE(serverSetup.hasException()); - clientSession->close(); + clientSession->close(SessionCloseErrorCode::NO_ERROR); }(clientSession_) .scheduleOn(&eventBase_) .start(); @@ -575,7 +575,7 @@ TEST_F(MoQSessionTest, InvalidVersion) { setup.supportedVersions.push_back(0xfaceb001); auto serverSetup = co_await co_awaitTry(clientSession->setup(setup)); EXPECT_TRUE(serverSetup.hasException()); - clientSession->close(); + clientSession->close(SessionCloseErrorCode::NO_ERROR); }(clientSession_) .scheduleOn(&eventBase_) .start(); @@ -591,7 +591,7 @@ TEST_F(MoQSessionTest, InvalidServerVersion) { setup.supportedVersions.push_back(kVersionDraftCurrent); auto serverSetup = co_await co_awaitTry(clientSession->setup(setup)); EXPECT_TRUE(serverSetup.hasException()); - clientSession->close(); + clientSession->close(SessionCloseErrorCode::NO_ERROR); }(clientSession_) .scheduleOn(&eventBase_) .start(); @@ -607,7 +607,7 @@ TEST_F(MoQSessionTest, ServerSetupFail) { setup.supportedVersions.push_back(kVersionDraftCurrent); auto serverSetup = co_await co_awaitTry(clientSession->setup(setup)); EXPECT_TRUE(serverSetup.hasException()); - clientSession->close(); + clientSession->close(SessionCloseErrorCode::NO_ERROR); }(clientSession_) .scheduleOn(&eventBase_) .start();