-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use streaming sync rpc for large files. #2703
[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use streaming sync rpc for large files. #2703
Changes from 8 commits
715c172
348589c
1ef4211
40aca2f
7e6c7b3
a371814
e0a7b8f
8b6f57e
1ff65ca
8663b3a
79da59c
00998a9
1ff8035
406cb1d
54ffaae
f516ccf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -332,31 +332,26 @@ func TestArbitraryFileUpload(t *testing.T) { | |
name string | ||
manualSync bool | ||
scheduleSyncDisabled bool | ||
serviceFail bool | ||
}{ | ||
{ | ||
name: "scheduled sync of arbitrary files should work", | ||
manualSync: false, | ||
scheduleSyncDisabled: false, | ||
serviceFail: false, | ||
}, | ||
{ | ||
name: "manual sync of arbitrary files should work", | ||
manualSync: true, | ||
scheduleSyncDisabled: true, | ||
serviceFail: false, | ||
}, | ||
{ | ||
name: "running manual and scheduled sync concurrently should work and not lead to duplicate uploads", | ||
manualSync: true, | ||
scheduleSyncDisabled: false, | ||
serviceFail: false, | ||
}, | ||
{ | ||
name: "if an error response is received from the backend, local files should not be deleted", | ||
manualSync: false, | ||
scheduleSyncDisabled: false, | ||
serviceFail: false, | ||
}, | ||
} | ||
|
||
|
@@ -431,32 +426,155 @@ func TestArbitraryFileUpload(t *testing.T) { | |
} | ||
|
||
// Validate error and URs. | ||
remainingFiles := getAllFilePaths(additionalPathsDir) | ||
// Validate first metadata message. | ||
test.That(t, len(fileUploads), test.ShouldEqual, 1) | ||
test.That(t, len(urs), test.ShouldBeGreaterThan, 0) | ||
actMD := urs[0].GetMetadata() | ||
test.That(t, actMD, test.ShouldNotBeNil) | ||
test.That(t, actMD.Type, test.ShouldEqual, v1.DataType_DATA_TYPE_FILE) | ||
test.That(t, actMD.FileName, test.ShouldEqual, fileName) | ||
test.That(t, actMD.FileExtension, test.ShouldEqual, fileExt) | ||
test.That(t, actMD.PartId, test.ShouldNotBeBlank) | ||
|
||
// Validate ensuing data messages. | ||
dataRequests := urs[1:] | ||
var actData []byte | ||
for _, d := range dataRequests { | ||
actData = append(actData, d.GetFileContents().GetData()...) | ||
} | ||
test.That(t, actData, test.ShouldResemble, fileContents) | ||
|
||
// Validate file no longer exists. | ||
waitUntilNoFiles(additionalPathsDir) | ||
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0) | ||
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil) | ||
}) | ||
} | ||
} | ||
|
||
func TestStreamingDCUpload(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
serviceFail bool | ||
}{ | ||
{ | ||
name: "A data capture file greater than MaxUnaryFileSize should be successfully uploaded" + | ||
"via the streaming rpc.", | ||
serviceFail: false, | ||
}, | ||
{ | ||
name: "if an error response is received from the backend, local files should not be deleted", | ||
serviceFail: true, | ||
}, | ||
} | ||
|
||
for _, tc := range tests { | ||
t.Run(tc.name, func(t *testing.T) { | ||
// Set up server. | ||
mockClock := clk.NewMock() | ||
clock = mockClock | ||
tmpDir := t.TempDir() | ||
|
||
// Set up data manager. | ||
dmsvc, r := newTestDataManager(t) | ||
defer dmsvc.Close(context.Background()) | ||
var cfg *Config | ||
var deps []string | ||
captureInterval := time.Millisecond * 10 | ||
cfg, deps = setupConfig(t, enabledBinaryCollectorConfigPath) | ||
|
||
// Set up service config with just capture enabled. | ||
cfg.CaptureDisabled = false | ||
cfg.ScheduledSyncDisabled = true | ||
cfg.SyncIntervalMins = syncIntervalMins | ||
cfg.CaptureDir = tmpDir | ||
|
||
resources := resourcesFromDeps(t, r, deps) | ||
err := dmsvc.Reconfigure(context.Background(), resources, resource.Config{ | ||
ConvertedAttributes: cfg, | ||
}) | ||
test.That(t, err, test.ShouldBeNil) | ||
|
||
// Capture an image, then close. | ||
mockClock.Add(captureInterval) | ||
err = dmsvc.Close(context.Background()) | ||
test.That(t, err, test.ShouldBeNil) | ||
|
||
// Get all captured data. | ||
_, capturedData, err := getCapturedData(tmpDir) | ||
test.That(t, err, test.ShouldBeNil) | ||
|
||
// Turn dmsvc back on with capture disabled. | ||
newDMSvc, r := newTestDataManager(t) | ||
defer newDMSvc.Close(context.Background()) | ||
f := atomic.Bool{} | ||
f.Store(tc.serviceFail) | ||
mockClient := mockDataSyncServiceClient{ | ||
streamingDCUploads: make(chan *mockStreamingDCClient, 10), | ||
fail: &f, | ||
} | ||
// Set max unary file size to 1 byte, so it uses the streaming rpc. | ||
datasync.MaxUnaryFileSize = 1 | ||
newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient)) | ||
cfg.CaptureDisabled = true | ||
cfg.ScheduledSyncDisabled = true | ||
resources = resourcesFromDeps(t, r, deps) | ||
err = newDMSvc.Reconfigure(context.Background(), resources, resource.Config{ | ||
ConvertedAttributes: cfg, | ||
}) | ||
test.That(t, err, test.ShouldBeNil) | ||
|
||
// Call sync. | ||
err = newDMSvc.Sync(context.Background(), nil) | ||
test.That(t, err, test.ShouldBeNil) | ||
|
||
// Wait for upload requests. | ||
var uploads []*mockStreamingDCClient | ||
var urs []*v1.StreamingDataCaptureUploadRequest | ||
// Get the successful requests | ||
wait := time.After(time.Second * 3) | ||
select { | ||
case <-wait: | ||
if !tc.serviceFail { | ||
t.Fatalf("timed out waiting for sync request") | ||
} | ||
case r := <-mockClient.streamingDCUploads: | ||
uploads = append(uploads, r) | ||
select { | ||
case <-wait: | ||
t.Fatalf("timed out waiting for sync request") | ||
case <-r.closed: | ||
urs = append(urs, r.reqs...) | ||
} | ||
} | ||
|
||
// Validate error and URs. | ||
remainingFiles := getAllFilePaths(tmpDir) | ||
if tc.serviceFail { | ||
// Error case, file should not be deleted. | ||
test.That(t, len(remainingFiles), test.ShouldEqual, 1) | ||
} else { | ||
// Validate first metadata message. | ||
test.That(t, len(fileUploads), test.ShouldEqual, 1) | ||
test.That(t, len(uploads), test.ShouldEqual, 1) | ||
test.That(t, len(urs), test.ShouldBeGreaterThan, 0) | ||
actMD := urs[0].GetMetadata() | ||
test.That(t, actMD, test.ShouldNotBeNil) | ||
test.That(t, actMD.Type, test.ShouldEqual, v1.DataType_DATA_TYPE_FILE) | ||
test.That(t, actMD.FileName, test.ShouldEqual, fileName) | ||
test.That(t, actMD.FileExtension, test.ShouldEqual, fileExt) | ||
test.That(t, actMD.PartId, test.ShouldNotBeBlank) | ||
test.That(t, actMD.GetUploadMetadata(), test.ShouldNotBeNil) | ||
test.That(t, actMD.GetSensorMetadata(), test.ShouldNotBeNil) | ||
test.That(t, actMD.GetUploadMetadata().Type, test.ShouldEqual, v1.DataType_DATA_TYPE_BINARY_SENSOR) | ||
test.That(t, actMD.GetUploadMetadata().PartId, test.ShouldNotBeBlank) | ||
|
||
// Validate ensuing data messages. | ||
dataRequests := urs[1:] | ||
var actData []byte | ||
for _, d := range dataRequests { | ||
actData = append(actData, d.GetFileContents().GetData()...) | ||
actData = append(actData, d.GetData()...) | ||
} | ||
test.That(t, actData, test.ShouldResemble, fileContents) | ||
test.That(t, actData, test.ShouldResemble, capturedData[0].GetBinary()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Awesome, elegant check. |
||
|
||
// Validate file no longer exists. | ||
waitUntilNoFiles(additionalPathsDir) | ||
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0) | ||
waitUntilNoFiles(tmpDir) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to you adding this helper function before, makes tests extremely readable. |
||
test.That(t, len(getAllFileInfos(tmpDir)), test.ShouldEqual, 0) | ||
} | ||
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil) | ||
}) | ||
|
@@ -661,6 +779,7 @@ type mockDataSyncServiceClient struct { | |
succesfulDCRequests chan *v1.DataCaptureUploadRequest | ||
failedDCRequests chan *v1.DataCaptureUploadRequest | ||
fileUploads chan *mockFileUploadClient | ||
streamingDCUploads chan *mockStreamingDCClient | ||
fail *atomic.Bool | ||
} | ||
|
||
|
@@ -694,6 +813,17 @@ func (c mockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc. | |
return ret, nil | ||
} | ||
|
||
func (c mockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context, | ||
opts ...grpc.CallOption, | ||
) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) { | ||
if c.fail.Load() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [question] Why are you checking for a context cancelation in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call, we should be. Added here and in FileUpload |
||
return nil, errors.New("oh no error") | ||
} | ||
ret := &mockStreamingDCClient{closed: make(chan struct{})} | ||
c.streamingDCUploads <- ret | ||
return ret, nil | ||
} | ||
|
||
type mockFileUploadClient struct { | ||
urs []*v1.FileUploadRequest | ||
closed chan struct{} | ||
|
@@ -714,6 +844,26 @@ func (m *mockFileUploadClient) CloseSend() error { | |
return nil | ||
} | ||
|
||
type mockStreamingDCClient struct { | ||
reqs []*v1.StreamingDataCaptureUploadRequest | ||
closed chan struct{} | ||
grpc.ClientStream | ||
} | ||
|
||
func (m *mockStreamingDCClient) Send(req *v1.StreamingDataCaptureUploadRequest) error { | ||
m.reqs = append(m.reqs, req) | ||
return nil | ||
} | ||
|
||
func (m *mockStreamingDCClient) CloseAndRecv() (*v1.StreamingDataCaptureUploadResponse, error) { | ||
m.closed <- struct{}{} | ||
return &v1.StreamingDataCaptureUploadResponse{}, nil | ||
} | ||
|
||
func (m *mockStreamingDCClient) CloseSend() error { | ||
return nil | ||
} | ||
|
||
func getTestSyncerConstructorMock(client mockDataSyncServiceClient) datasync.ManagerConstructor { | ||
return func(identity string, _ v1.DataSyncServiceClient, logger golog.Logger) (datasync.Manager, error) { | ||
return datasync.NewManager(identity, client, logger) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,14 +22,13 @@ import ( | |
|
||
// TODO Data-343: Reorganize this into a more standard interface/package, and add tests. | ||
|
||
// TODO: this is all way too complicated i think. Just keep track of read/write offsets | ||
|
||
// FileExt defines the file extension for Viam data capture files. | ||
const ( | ||
InProgressFileExt = ".prog" | ||
FileExt = ".capture" | ||
readImage = "ReadImage" | ||
nextPointCloud = "NextPointCloud" | ||
getPointCloudMap = "GetPointCloudMap" | ||
) | ||
|
||
// File is the data structure containing data captured by collectors. It is backed by a file on disk containing | ||
|
@@ -240,7 +239,7 @@ func getFileTimestampName() string { | |
// TODO DATA-246: Implement this in some more robust, programmatic way. | ||
func getDataType(methodName string) v1.DataType { | ||
switch methodName { | ||
case nextPointCloud, readImage: | ||
case nextPointCloud, readImage, getPointCloudMap: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whew good catch! |
||
return v1.DataType_DATA_TYPE_BINARY_SENSOR | ||
default: | ||
return v1.DataType_DATA_TYPE_TABULAR_SENSOR | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you removed this because it wasn't actually being used, but do we want a test that checks this in arbitrary file upload? "scheduled sync of arbitrary files should work" and "if an error response is received from the backend, local files should not be deleted" are actually the same test where
manualSync
andscheduleSyncDisabled
are both false (and were previously as well, so was an existing test bug)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, added