From 9cb237043f86be7002007d7189d812dd35ec7b93 Mon Sep 17 00:00:00 2001 From: Corbin Phipps Date: Wed, 20 Mar 2024 21:45:54 +0000 Subject: [PATCH 1/2] Fix bug --- .../service/NetRemoteDataStreamingReactors.cxx | 11 +++++++++-- .../service/NetRemoteDataStreamingReactors.hxx | 1 + .../net/remote/NetRemoteDataStreamingService.hxx | 2 ++ .../TestNetRemoteDataStreamingServiceClient.cxx | 13 ++++++++++--- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/common/service/NetRemoteDataStreamingReactors.cxx b/src/common/service/NetRemoteDataStreamingReactors.cxx index a044294a..06617eeb 100644 --- a/src/common/service/NetRemoteDataStreamingReactors.cxx +++ b/src/common/service/NetRemoteDataStreamingReactors.cxx @@ -145,7 +145,11 @@ DataStreamWriter::OnWriteDone(bool isOk) // Check for a failed status code from HandleWriteFailure since that invoked a final write, thus causing this callback to be invoked. if (m_writeStatus.code() == DataStreamOperationStatusCode::DataStreamOperationStatusCodeFailed) { - Finish(::grpc::Status::OK); + bool isCompletedExpected{ false }; + if (m_isCompleted.compare_exchange_strong(isCompletedExpected, true, std::memory_order_relaxed, std::memory_order_relaxed)) { + Finish(grpc::Status::OK); + } + return; } @@ -170,7 +174,10 @@ DataStreamWriter::OnCancel() // The RPC is canceled by the client, so call Finish to complete it from the server perspective. bool isCanceledExpected{ false }; if (m_isCanceled.compare_exchange_strong(isCanceledExpected, true, std::memory_order_relaxed, std::memory_order_relaxed)) { - Finish(grpc::Status::CANCELLED); + // It's possible that Finish was already called due to a write failure, so don't call Finish again in that case. + if (!m_isCompleted.load(std::memory_order_relaxed)) { + Finish(grpc::Status::CANCELLED); + } } } diff --git a/src/common/service/NetRemoteDataStreamingReactors.hxx b/src/common/service/NetRemoteDataStreamingReactors.hxx index beebd79d..fa7335e6 100644 --- a/src/common/service/NetRemoteDataStreamingReactors.hxx +++ b/src/common/service/NetRemoteDataStreamingReactors.hxx @@ -148,6 +148,7 @@ private: uint32_t m_numberOfDataBlocksWritten{}; Microsoft::Net::Remote::DataStream::DataStreamOperationStatus m_writeStatus{}; std::atomic m_isCanceled{}; + std::atomic m_isCompleted{}; Microsoft::Net::Remote::Service::Reactors::Helpers::DataGenerator m_dataGenerator{}; }; diff --git a/src/common/service/include/microsoft/net/remote/NetRemoteDataStreamingService.hxx b/src/common/service/include/microsoft/net/remote/NetRemoteDataStreamingService.hxx index 191625d6..88340222 100644 --- a/src/common/service/include/microsoft/net/remote/NetRemoteDataStreamingService.hxx +++ b/src/common/service/include/microsoft/net/remote/NetRemoteDataStreamingService.hxx @@ -55,6 +55,8 @@ private: * @brief Allows the client to ping the server for availability. * * @param context + * @param request + * @param response * @return grpc::ServerUnaryReactor* */ grpc::ServerUnaryReactor* diff --git a/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx b/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx index b8ab418c..e5938feb 100644 --- a/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx +++ b/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx @@ -149,9 +149,16 @@ TEST_CASE("DataStreamDownload API", "[basic][rpc][client][remote][stream]") DataStreamOperationStatus operationStatus{}; std::span lostDataBlockSequenceNumbers{}; const grpc::Status status = dataStreamReader.Await(&numberOfDataBlocksReceived, &operationStatus, lostDataBlockSequenceNumbers); - REQUIRE(status.error_code() == grpc::StatusCode::CANCELLED); - REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeSucceeded); - REQUIRE(lostDataBlockSequenceNumbers.empty()); + + REQUIRE((status.error_code() == grpc::StatusCode::CANCELLED || status.error_code() == grpc::StatusCode::OK)); + + if (status.error_code() == grpc::StatusCode::CANCELLED) { + REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeSucceeded); + REQUIRE(lostDataBlockSequenceNumbers.empty()); + // The cancelation may cause a write failure before the server's OnCancel callback is invoked + } else if (status.error_code() == grpc::StatusCode::OK) { + REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeFailed); + } } } From 078c32ecc150f51e2faf9626fb5f51b5b4b595da Mon Sep 17 00:00:00 2001 From: Corbin Phipps Date: Wed, 20 Mar 2024 21:49:14 +0000 Subject: [PATCH 2/2] clang-format --- tests/unit/TestNetRemoteDataStreamingServiceClient.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx b/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx index e5938feb..134d22a4 100644 --- a/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx +++ b/tests/unit/TestNetRemoteDataStreamingServiceClient.cxx @@ -155,7 +155,7 @@ TEST_CASE("DataStreamDownload API", "[basic][rpc][client][remote][stream]") if (status.error_code() == grpc::StatusCode::CANCELLED) { REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeSucceeded); REQUIRE(lostDataBlockSequenceNumbers.empty()); - // The cancelation may cause a write failure before the server's OnCancel callback is invoked + // The cancelation may cause a write failure before the server's OnCancel callback is invoked } else if (status.error_code() == grpc::StatusCode::OK) { REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeFailed); }