Skip to content

Commit

Permalink
Merge pull request #236 from microsoft/user/corbinphipps/fix-download…
Browse files Browse the repository at this point in the history
…-bug

Fix DataStreamDownload bug
  • Loading branch information
corbin-phipps authored Mar 20, 2024
2 parents 2b816f2 + 078c32e commit 70db3c2
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
11 changes: 9 additions & 2 deletions src/common/service/NetRemoteDataStreamingReactors.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ DataStreamWriter::OnWriteDone(bool isOk)

// Check for a failed status code from HandleWriteFailure since that invoked a final write, thus causing this callback to be invoked.
if (m_writeStatus.code() == DataStreamOperationStatusCode::DataStreamOperationStatusCodeFailed) {
Finish(::grpc::Status::OK);
bool isCompletedExpected{ false };
if (m_isCompleted.compare_exchange_strong(isCompletedExpected, true, std::memory_order_relaxed, std::memory_order_relaxed)) {
Finish(grpc::Status::OK);
}

return;
}

Expand All @@ -170,7 +174,10 @@ DataStreamWriter::OnCancel()
// The RPC is canceled by the client, so call Finish to complete it from the server perspective.
bool isCanceledExpected{ false };
if (m_isCanceled.compare_exchange_strong(isCanceledExpected, true, std::memory_order_relaxed, std::memory_order_relaxed)) {
Finish(grpc::Status::CANCELLED);
// It's possible that Finish was already called due to a write failure, so don't call Finish again in that case.
if (!m_isCompleted.load(std::memory_order_relaxed)) {
Finish(grpc::Status::CANCELLED);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/service/NetRemoteDataStreamingReactors.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private:
uint32_t m_numberOfDataBlocksWritten{};
Microsoft::Net::Remote::DataStream::DataStreamOperationStatus m_writeStatus{};
std::atomic<bool> m_isCanceled{};
std::atomic<bool> m_isCompleted{};
Microsoft::Net::Remote::Service::Reactors::Helpers::DataGenerator m_dataGenerator{};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ private:
* @brief Allows the client to ping the server for availability.
*
* @param context
* @param request
* @param response
* @return grpc::ServerUnaryReactor*
*/
grpc::ServerUnaryReactor*
Expand Down
13 changes: 10 additions & 3 deletions tests/unit/TestNetRemoteDataStreamingServiceClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,16 @@ TEST_CASE("DataStreamDownload API", "[basic][rpc][client][remote][stream]")
DataStreamOperationStatus operationStatus{};
std::span<uint32_t> lostDataBlockSequenceNumbers{};
const grpc::Status status = dataStreamReader.Await(&numberOfDataBlocksReceived, &operationStatus, lostDataBlockSequenceNumbers);
REQUIRE(status.error_code() == grpc::StatusCode::CANCELLED);
REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeSucceeded);
REQUIRE(lostDataBlockSequenceNumbers.empty());

REQUIRE((status.error_code() == grpc::StatusCode::CANCELLED || status.error_code() == grpc::StatusCode::OK));

if (status.error_code() == grpc::StatusCode::CANCELLED) {
REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeSucceeded);
REQUIRE(lostDataBlockSequenceNumbers.empty());
// The cancelation may cause a write failure before the server's OnCancel callback is invoked
} else if (status.error_code() == grpc::StatusCode::OK) {
REQUIRE(operationStatus.code() == DataStreamOperationStatusCodeFailed);
}
}
}

Expand Down

0 comments on commit 70db3c2

Please sign in to comment.