Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataStreamDownload API #198

Merged
merged 46 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e0b1bac
Add DataStreamDownload API to proto files
corbin-phipps Feb 27, 2024
3513304
Add basic API trace
corbin-phipps Feb 27, 2024
c61a7ce
Merge pull request #182 from microsoft/user/corbinphipps/add-basic-tr…
corbin-phipps Feb 27, 2024
4f61279
Move away from new/delete
corbin-phipps Feb 28, 2024
edf8e5f
Merge pull request #184 from microsoft/user/corbinphipps/avoid-manual…
corbin-phipps Feb 28, 2024
d28a641
Merge branch 'feature/dataStreaming' into user/corbinphipps/add-downl…
corbin-phipps Feb 28, 2024
fd36f99
Sample implementation from prototyping
corbin-phipps Feb 28, 2024
3c48ff9
Improve enum value names; add unknown enum values
corbin-phipps Feb 28, 2024
e3b31bd
Improve API impl
corbin-phipps Feb 28, 2024
3f7398c
Fill in OnCancel() callback
corbin-phipps Feb 28, 2024
68e4080
Switch back to new/delete
corbin-phipps Feb 28, 2024
5a1f8c2
Add ClientReadReactor impl
corbin-phipps Feb 28, 2024
97e3b12
Add basic unit test; delete this in OnDone
corbin-phipps Feb 28, 2024
e809317
Add missing break statements
corbin-phipps Feb 28, 2024
ac74b9b
clang-format
corbin-phipps Feb 28, 2024
dfd38c3
Update comment
corbin-phipps Feb 28, 2024
c8ad32c
Merge pull request #185 from microsoft/user/corbinphipps/add-download…
corbin-phipps Feb 28, 2024
2279f25
Simplify Properties oneof field names
corbin-phipps Feb 29, 2024
27b20d8
const ref for string
corbin-phipps Feb 29, 2024
a3f2bec
Add client reactor class member for timeout value
corbin-phipps Feb 29, 2024
992a635
Re-order message fields so Status is first
corbin-phipps Feb 29, 2024
4eec9e6
Allocate DataStreamReader on stack
corbin-phipps Feb 29, 2024
b9918e0
Add chrono header
corbin-phipps Feb 29, 2024
da6bc6b
Merge pull request #187 from microsoft/user/corbinphipps/download-api…
corbin-phipps Feb 29, 2024
2f95f76
Add unit test for continuous streaming (not passing yet, need to hand…
corbin-phipps Feb 29, 2024
94c4145
Add atomic bool for cancellation; add some logs
corbin-phipps Feb 29, 2024
2ec99a0
Address review comments: Update canceled spelling; extra logs
corbin-phipps Feb 29, 2024
4aa65a6
Use compare_exchange_strong
corbin-phipps Feb 29, 2024
53912d6
Merge pull request #191 from microsoft/user/corbinphipps/continuous-d…
corbin-phipps Mar 1, 2024
7a6603d
Merge branch 'develop' into user/corbinphipps/merge-develop
corbin-phipps Mar 1, 2024
533dd9c
Merge pull request #194 from microsoft/user/corbinphipps/merge-develop
corbin-phipps Mar 1, 2024
6609de5
Add basic random data generator
corbin-phipps Mar 1, 2024
f7adbb4
Check data stream pattern
corbin-phipps Mar 1, 2024
93c0b67
Use 32-bit version; use std::numeric_limits<uint8_t>::max; add const
corbin-phipps Mar 1, 2024
c5e4c2b
Merge pull request #195 from microsoft/user/corbinphipps/random-data-…
corbin-phipps Mar 1, 2024
adc8261
Use function tracer with debug severity
corbin-phipps Mar 1, 2024
c00680f
Merge pull request #197 from microsoft/user/corbinphipps/use-function…
corbin-phipps Mar 1, 2024
ffd925f
Include headers explicitly.
abeltrano Mar 1, 2024
9d3fc40
Drop args from FunctionTracer.
abeltrano Mar 1, 2024
e697182
Style/formatting.
abeltrano Mar 1, 2024
ebb1799
const
abeltrano Mar 1, 2024
92be9be
More clang-tidy.
abeltrano Mar 1, 2024
1927023
Use uint32_t for distribution then cast to uint8_t
corbin-phipps Mar 1, 2024
d712312
Merge pull request #200 from microsoft/user/corbinphipps/fix-build-break
corbin-phipps Mar 1, 2024
c64e9d5
Merge pull request #199 from microsoft/clangtidy
abeltrano Mar 1, 2024
96b5d5e
Merge branch 'develop' into feature/dataStreaming
abeltrano Mar 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions protocol/protos/NetRemoteDataStream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ enum DataStreamOperationStatusCode
DataStreamOperationStatusCodeUnknown = 0;
DataStreamOperationStatusCodeSucceeded = 1;
DataStreamOperationStatusCodeFailed = 2;
DataStreamOperationStatusCodeCancelled = 3;
DataStreamOperationStatusCodeCanceled = 3;
DataStreamOperationStatusCodeTimedOut = 4;
}

Expand All @@ -23,8 +23,54 @@ message DataStreamUploadData
bytes Data = 1;
}

message DataStreamDownloadData
{
DataStreamOperationStatus Status = 1;
bytes Data = 2;
uint32 SequenceNumber = 3;
}

message DataStreamUploadResult
{
uint32 NumberOfDataBlocksReceived = 1;
DataStreamOperationStatus Status = 2;
DataStreamOperationStatus Status = 1;
uint32 NumberOfDataBlocksReceived = 2;
}

enum DataStreamType
{
DataStreamTypeUnknown = 0;
DataStreamTypeFixed = 1;
DataStreamTypeContinuous = 2;
}

enum DataStreamPattern
{
DataStreamPatternUnknown = 0;
DataStreamPatternConstant = 1;
}

message DataStreamFixedTypeProperties
{
uint32 NumberOfDataBlocksToStream = 1;
}

message DataStreamContinuousTypeProperties
{

}

message DataStreamProperties
{
DataStreamType Type = 1;
DataStreamPattern Pattern = 2;
oneof Properties
{
DataStreamFixedTypeProperties Fixed = 3;
DataStreamContinuousTypeProperties Continuous = 4;
}
}

message DataStreamDownloadRequest
{
DataStreamProperties Properties = 1;
}
1 change: 1 addition & 0 deletions protocol/protos/NetRemoteDataStreamingService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ import "NetRemoteDataStream.proto";
service NetRemoteDataStreaming
{
rpc DataStreamUpload (stream Microsoft.Net.Remote.DataStream.DataStreamUploadData) returns (Microsoft.Net.Remote.DataStream.DataStreamUploadResult);
rpc DataStreamDownload (Microsoft.Net.Remote.DataStream.DataStreamDownloadRequest) returns (stream Microsoft.Net.Remote.DataStream.DataStreamDownloadData);
}
186 changes: 184 additions & 2 deletions src/common/service/NetRemoteDataStreamingReactors.cxx
Original file line number Diff line number Diff line change
@@ -1,18 +1,55 @@

#include <format>

#include "NetRemoteDataStreamingReactors.hxx"
#include <logging/FunctionTracer.hxx>
#include <magic_enum.hpp>
#include <plog/Log.h>
#include <plog/Severity.h>

namespace Microsoft::Net::Remote::Service::Reactors::Helpers
{
DataGenerator::DataGenerator()
{
m_generator.seed(std::random_device{}());
}

std::string
DataGenerator::GenerateRandomData(const std::size_t length)
{
std::string result;
result.reserve(length);

for (std::size_t i = 0; i < length; i++) {
result.push_back(static_cast<char>(GetRandomByte()));
}

return result;
}

uint8_t
DataGenerator::GetRandomByte()
{
std::uniform_int_distribution<uint8_t> distribution(0, std::numeric_limits<uint8_t>::max());
return distribution(m_generator);
}
} // namespace Microsoft::Net::Remote::Service::Reactors::Helpers

using namespace Microsoft::Net::Remote::DataStream;
using namespace Microsoft::Net::Remote::Service::Reactors;

DataStreamReader::DataStreamReader(DataStreamUploadResult* result) :
m_result(result)
{
logging::FunctionTracer traceMe(plog::Severity::debug);
StartRead(&m_data);
}

void
DataStreamReader::OnReadDone(bool isOk)
{
logging::FunctionTracer traceMe(plog::Severity::debug);

if (isOk) {
m_numberOfDataBlocksReceived++;
m_readStatus.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeSucceeded);
Expand All @@ -30,15 +67,160 @@ DataStreamReader::OnReadDone(bool isOk)
void
DataStreamReader::OnCancel()
{
logging::FunctionTracer traceMe(plog::Severity::debug);

m_result->set_numberofdatablocksreceived(m_numberOfDataBlocksReceived);
m_readStatus.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeCancelled);
m_readStatus.set_message("RPC cancelled");
m_readStatus.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeCanceled);
m_readStatus.set_message("RPC canceled");
*m_result->mutable_status() = std::move(m_readStatus);
Finish(grpc::Status::CANCELLED);
}

void
DataStreamReader::OnDone()
{
logging::FunctionTracer traceMe(plog::Severity::debug);
delete this;
}

DataStreamWriter::DataStreamWriter(const DataStreamDownloadRequest* request)
{
logging::FunctionTracer traceMe(plog::Severity::debug);
m_dataStreamProperties = request->properties();

switch (m_dataStreamProperties.type()) {
case DataStreamType::DataStreamTypeFixed: {
if (m_dataStreamProperties.Properties_case() == DataStreamProperties::kFixed) {
m_numberOfDataBlocksToStream = m_dataStreamProperties.fixed().numberofdatablockstostream();
} else {
HandleFailure("Invalid properties for this streaming type. Expected Fixed for DataStreamTypeFixed");
return;
}

break;
}
case DataStreamType::DataStreamTypeContinuous: {
if (m_dataStreamProperties.Properties_case() == DataStreamProperties::kContinuous) {
m_numberOfDataBlocksToStream = 0;
} else {
HandleFailure("Invalid properties for this streaming type. Expected Continuous for DataStreamTypeContinuous");
return;
}

break;
}
default: {
HandleFailure(std::format("Invalid streaming type: {}", magic_enum::enum_name(m_dataStreamProperties.type())));
return;
}
};

m_writeStatus.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeUnknown);
m_writeStatus.set_message("No data sent yet");
NextWrite();
}

void
DataStreamWriter::OnWriteDone(bool isOk)
{
logging::FunctionTracer traceMe(plog::Severity::debug);

// Client may have canceled the RPC, so check for cancelation to prevent writing more data
// when we shouldn't.
if (m_isCanceled.load(std::memory_order_relaxed)) {
LOGD << "RPC canceled, returning early";
return;
}

// Check for a failed status code from HandleWriteFailure since that invoked a final write, thus causing this callback to be invoked.
if (m_writeStatus.code() == DataStreamOperationStatusCode::DataStreamOperationStatusCodeFailed) {
Finish(::grpc::Status::OK);
return;
}

// Continue writing if previous write was successful, otherwise handle the failure.
if (isOk) {
if (m_dataStreamProperties.type() == DataStreamType::DataStreamTypeFixed) {
m_numberOfDataBlocksToStream--;
}
m_writeStatus.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeSucceeded);
m_writeStatus.set_message("Data write successful");
NextWrite();
} else {
HandleFailure("Data write failed");
}
}

void
DataStreamWriter::OnCancel()
{
logging::FunctionTracer traceMe(plog::Severity::debug);

// The RPC is canceled by the client, so call Finish to complete it from the server perspective.
bool isCanceledExpected{ false };
if (m_isCanceled.compare_exchange_strong(isCanceledExpected, true, std::memory_order_relaxed, std::memory_order_relaxed)) {
Finish(grpc::Status::CANCELLED);
}
}

void
DataStreamWriter::OnDone()
{
logging::FunctionTracer traceMe(plog::Severity::debug);
delete this;
}

void
DataStreamWriter::NextWrite()
{
logging::FunctionTracer traceMe(plog::Severity::debug);

// Client may have canceled the RPC, so check for cancelation to prevent writing more data
// when we shouldn't.
if (m_isCanceled.load(std::memory_order_relaxed)) {
LOGD << "RPC canceled, aborting write";
return;
}

if (m_dataStreamProperties.type() == DataStreamType::DataStreamTypeContinuous ||
(m_dataStreamProperties.type() == DataStreamType::DataStreamTypeFixed && m_numberOfDataBlocksToStream > 0)) {
// Check the requested data streaming pattern.
const auto pattern = m_dataStreamProperties.pattern();

switch (pattern) {
// Generate data with the constant default size and write to the client at a constant speed.
case DataStreamPattern::DataStreamPatternConstant: {
// Create data to write to the client.
const auto data = m_dataGenerator.GenerateRandomData();
m_numberOfDataBlocksWritten++;

// Write data to the client.
m_data.set_data(data);
m_data.set_sequencenumber(m_numberOfDataBlocksWritten);
*m_data.mutable_status() = m_writeStatus;
StartWrite(&m_data);

break;
}
default:
HandleFailure(std::format("Unexpected data stream pattern {}", magic_enum::enum_name(pattern)));
};
} else {
// No more data to write.
Finish(::grpc::Status::OK);
}
}

void
DataStreamWriter::HandleFailure(const std::string& errorMessage)
{
logging::FunctionTracer traceMe(plog::Severity::debug);

m_writeStatus.set_code(DataStreamOperationStatusCode::DataStreamOperationStatusCodeFailed);
m_writeStatus.set_message(errorMessage);
*m_data.mutable_status() = m_writeStatus;

// Write a final message to the client. The OnWriteDone() callback will check for the
// DataStreamOperationStatusCodeFailed status code set here to know to complete the RPC.
StartWrite(&m_data);
}
Loading
Loading