-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use streaming sync rpc for large files. #2703
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fantastic, super excited for this addition! Just a few comments/nits and then looks ready to go. 🔥
Thank you for adding the testing into the PR description — for the image testing to ensure that the streamed files were well-formed, did you reduce the chunk size and verify that it was using the streaming option?
@@ -332,31 +332,26 @@ func TestArbitraryFileUpload(t *testing.T) { | |||
name string | |||
manualSync bool | |||
scheduleSyncDisabled bool | |||
serviceFail bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you removed this because it wasn't actually being used, but do we want a test that checks this in arbitrary file upload? "scheduled sync of arbitrary files should work" and "if an error response is received from the backend, local files should not be deleted" are actually the same test where manualSync
and scheduleSyncDisabled
are both false (and were previously as well, so was an existing test bug)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, added
@@ -694,6 +813,15 @@ func (c mockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc. | |||
return ret, nil | |||
} | |||
|
|||
func (c mockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context, opts ...grpc.CallOption) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) { | |||
if c.fail.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Why are you checking for a context cancelation in the fail.Load()
in DataCaptureUpload but not these streaming requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, we should be. Added here and in FileUpload
} | ||
test.That(t, actData, test.ShouldResemble, fileContents) | ||
test.That(t, actData, test.ShouldResemble, capturedData[0].GetBinary()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, elegant check.
|
||
// Validate file no longer exists. | ||
waitUntilNoFiles(additionalPathsDir) | ||
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0) | ||
waitUntilNoFiles(tmpDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to you adding this helper function before, makes tests extremely readable.
// If it's a large binary file, we need to upload it in chunks. | ||
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && f.Size() > MaxUnaryFileSize { | ||
if len(sensorData) > 1 { | ||
return errors.New("binary sensor data file with more than one sensor reading is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit/question] I agree that you should throw an error here since we're passing along bytes rather than sensorData
, so we need to ensure that there's only a single datapoint. For DataCaptureUpload, we fail if there's multiple SensorData for binary only once we hit the app logic.
Considering whether we should error from the RDK side there as well - but I also like that the below DataCaptureUploadRequest is generic to tabular/binary and simply passes the sensorData along.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for now, I'd rather not change that logic as part of this PR. I don't have a strong opinion on doing it rdk side too
if err != nil { | ||
return err | ||
|
||
// If it's a large binary file, we need to upload it in chunks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really great work making the streaming option readable and fit into the existing flow well by just checking the binary file size.
return nil, data.FailedToReadErr(params.ComponentName, getPointCloudMap.String(), err) | ||
} | ||
|
||
pcd, err := HelperConcatenateChunksToFull(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, glad SLAM already had a helper method to convert into bytes!
|
@@ -240,7 +239,7 @@ func getFileTimestampName() string { | |||
// TODO DATA-246: Implement this in some more robust, programmatic way. | |||
func getDataType(methodName string) v1.DataType { | |||
switch methodName { | |||
case nextPointCloud, readImage: | |||
case nextPointCloud, readImage, getPointCloudMap: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whew good catch!
@@ -11,7 +11,7 @@ import ( | |||
) | |||
|
|||
// UploadChunkSize defines the size of the data included in each message of a FileUpload stream. | |||
var UploadChunkSize = 64 * 1024 | |||
var UploadChunkSize = 256 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checking on this bump?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just two more minor questions.
return errors.Wrapf(err, "received error response while syncing %s", f.Name()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func sendFileUploadRequests(ctx context.Context, stream v1.DataSyncService_FileUploadClient, f *os.File) error { | ||
//nolint:errcheck | ||
defer stream.CloseSend() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed? Is this in order to synchronously have uploadArbitraryFile
and uploadDataCaptureFile
in charge of closing the channel and getting the client response (via CloseAndRecv
), rather than splitting that between the helper and main loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseAndRecv
is the recommended/intended method for ending a client side stream (docs. I believe this is because the stream ending and a response being received are supposed to always happen in this fixed order (since there is only one response message). In bidi streams where these can be interleaveed, CloseSend is used
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0) | ||
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil) | ||
} else { | ||
// Validate file still exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For here and in the other tests, in this failure case, we should check that len(fileUploads) (or streaming uploads) == 0.
The file could technically still exist since we didn't call waitUntilNoFiles
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, and moved waitUntilNoFiles up so it gets called in call cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic work, really excited for SLAM to start using this! 🚀🚀🚀
Code Coverage
|
…ing sync rpc for large files. (viamrobotics#2703)
Use new streaming sync rpc for uploading binary capture files > 1MB.
Testing
Confirmed that capture of point cloud maps works locally.
Confirmed that images are successfully uploaded to the backend using the streaming rpc.