Skip to content

Commit 77ef2a4

Browse files
author
AaronCasas
authored
[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use streaming sync rpc for large files. (viamrobotics#2703)
1 parent 8cd75d6 commit 77ef2a4

File tree

9 files changed

+326
-50
lines changed

9 files changed

+326
-50
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ require (
8080
go.uber.org/atomic v1.10.0
8181
go.uber.org/multierr v1.11.0
8282
go.uber.org/zap v1.24.0
83-
go.viam.com/api v0.1.159
83+
go.viam.com/api v0.1.164
8484
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2
8585
go.viam.com/utils v0.1.38
8686
goji.io v2.0.2+incompatible

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1567,8 +1567,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
15671567
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
15681568
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
15691569
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
1570-
go.viam.com/api v0.1.159 h1:yjqEV9rT4FFqWAH7KsRPjLVdCAuNy2bmaGRn2j/Sb4E=
1571-
go.viam.com/api v0.1.159/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
1570+
go.viam.com/api v0.1.164 h1:CYR35bAQAueU0DCXRCJMj/DxGZsHMjOxSIJ+4eMWc/Q=
1571+
go.viam.com/api v0.1.164/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
15721572
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2 h1:oBiK580EnEIzgFLU4lHOXmGAE3MxnVbeR7s1wp/F3Ps=
15731573
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2/go.mod h1:XM0tej6riszsiNLT16uoyq1YjuYPWlRBweTPRDanIts=
15741574
go.viam.com/utils v0.1.38 h1:Xc5UsEOYjX4WTcnku4vPD9JFKlu6NjdDmA3AY8qnySA=

services/datamanager/builtin/builtin.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type builtIn struct {
8888
logger golog.Logger
8989
captureDir string
9090
captureDisabled bool
91-
collectors map[componentMethodMetadata]*collectorAndConfig
91+
collectors map[resourceMethodMetadata]*collectorAndConfig
9292
lock sync.Mutex
9393
backgroundWorkers sync.WaitGroup
9494
waitAfterLastModifiedMillis int
@@ -118,7 +118,7 @@ func NewBuiltIn(
118118
Named: conf.ResourceName().AsNamed(),
119119
logger: logger,
120120
captureDir: viamCaptureDotDir,
121-
collectors: make(map[componentMethodMetadata]*collectorAndConfig),
121+
collectors: make(map[resourceMethodMetadata]*collectorAndConfig),
122122
syncIntervalMins: 0,
123123
additionalSyncPaths: []string{},
124124
tags: []string{},
@@ -179,8 +179,8 @@ type collectorAndConfig struct {
179179

180180
// Identifier for a particular collector: component name, component model, component type,
181181
// method parameters, and method name.
182-
type componentMethodMetadata struct {
183-
ComponentName string
182+
type resourceMethodMetadata struct {
183+
ResourceName string
184184
MethodParams string
185185
MethodMetadata data.MethodMetadata
186186
}
@@ -196,7 +196,7 @@ func getDurationFromHz(captureFrequencyHz float32) time.Duration {
196196
// Initialize a collector for the component/method or update it if it has previously been created.
197197
// Return the component/method metadata which is used as a key in the collectors map.
198198
func (svc *builtIn) initializeOrUpdateCollector(
199-
md componentMethodMetadata,
199+
md resourceMethodMetadata,
200200
config *datamanager.DataCaptureConfig,
201201
) (
202202
*collectorAndConfig, error,
@@ -366,11 +366,11 @@ func (svc *builtIn) Reconfigure(
366366
// Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service.
367367
if svc.captureDisabled {
368368
svc.closeCollectors()
369-
svc.collectors = make(map[componentMethodMetadata]*collectorAndConfig)
369+
svc.collectors = make(map[resourceMethodMetadata]*collectorAndConfig)
370370
}
371371

372372
// Initialize or add collectors based on changes to the component configurations.
373-
newCollectors := make(map[componentMethodMetadata]*collectorAndConfig)
373+
newCollectors := make(map[resourceMethodMetadata]*collectorAndConfig)
374374
if !svc.captureDisabled {
375375
for _, resConf := range svcConfig.ResourceConfigs {
376376
if resConf.Resource == nil {
@@ -384,8 +384,8 @@ func (svc *builtIn) Reconfigure(
384384
MethodName: resConf.Method,
385385
}
386386

387-
componentMethodMetadata := componentMethodMetadata{
388-
ComponentName: resConf.Name.ShortName(),
387+
componentMethodMetadata := resourceMethodMetadata{
388+
ResourceName: resConf.Name.ShortName(),
389389
MethodMetadata: methodMetadata,
390390
MethodParams: fmt.Sprintf("%v", resConf.AdditionalParams),
391391
}

services/datamanager/builtin/sync_test.go

+190-15
Original file line numberDiff line numberDiff line change
@@ -338,25 +338,22 @@ func TestArbitraryFileUpload(t *testing.T) {
338338
name: "scheduled sync of arbitrary files should work",
339339
manualSync: false,
340340
scheduleSyncDisabled: false,
341-
serviceFail: false,
342341
},
343342
{
344343
name: "manual sync of arbitrary files should work",
345344
manualSync: true,
346345
scheduleSyncDisabled: true,
347-
serviceFail: false,
348346
},
349347
{
350348
name: "running manual and scheduled sync concurrently should work and not lead to duplicate uploads",
351349
manualSync: true,
352350
scheduleSyncDisabled: false,
353-
serviceFail: false,
354351
},
355352
{
356353
name: "if an error response is received from the backend, local files should not be deleted",
357354
manualSync: false,
358355
scheduleSyncDisabled: false,
359-
serviceFail: false,
356+
serviceFail: true,
360357
},
361358
}
362359

@@ -372,11 +369,13 @@ func TestArbitraryFileUpload(t *testing.T) {
372369
dmsvc, r := newTestDataManager(t)
373370
dmsvc.SetWaitAfterLastModifiedMillis(0)
374371
defer dmsvc.Close(context.Background())
372+
f := atomic.Bool{}
373+
f.Store(tc.serviceFail)
375374
mockClient := mockDataSyncServiceClient{
376375
succesfulDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
377376
failedDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
378377
fileUploads: make(chan *mockFileUploadClient, 100),
379-
fail: &atomic.Bool{},
378+
fail: &f,
380379
}
381380
dmsvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
382381
cfg, deps := setupConfig(t, disabledTabularCollectorConfigPath)
@@ -416,10 +415,12 @@ func TestArbitraryFileUpload(t *testing.T) {
416415
var fileUploads []*mockFileUploadClient
417416
var urs []*v1.FileUploadRequest
418417
// Get the successful requests
419-
wait := time.After(time.Second * 5)
418+
wait := time.After(time.Second * 3)
420419
select {
421420
case <-wait:
422-
t.Fatalf("timed out waiting for sync request")
421+
if !tc.serviceFail {
422+
t.Fatalf("timed out waiting for sync request")
423+
}
423424
case r := <-mockClient.fileUploads:
424425
fileUploads = append(fileUploads, r)
425426
select {
@@ -430,12 +431,8 @@ func TestArbitraryFileUpload(t *testing.T) {
430431
}
431432
}
432433

433-
// Validate error and URs.
434-
remainingFiles := getAllFilePaths(additionalPathsDir)
435-
if tc.serviceFail {
436-
// Error case, file should not be deleted.
437-
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
438-
} else {
434+
waitUntilNoFiles(additionalPathsDir)
435+
if !tc.serviceFail {
439436
// Validate first metadata message.
440437
test.That(t, len(fileUploads), test.ShouldEqual, 1)
441438
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
@@ -455,8 +452,144 @@ func TestArbitraryFileUpload(t *testing.T) {
455452
test.That(t, actData, test.ShouldResemble, fileContents)
456453

457454
// Validate file no longer exists.
458-
waitUntilNoFiles(additionalPathsDir)
459455
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0)
456+
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
457+
} else {
458+
// Validate no files were successfully uploaded.
459+
test.That(t, len(fileUploads), test.ShouldEqual, 0)
460+
// Validate file still exists.
461+
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 1)
462+
}
463+
})
464+
}
465+
}
466+
467+
func TestStreamingDCUpload(t *testing.T) {
468+
tests := []struct {
469+
name string
470+
serviceFail bool
471+
}{
472+
{
473+
name: "A data capture file greater than MaxUnaryFileSize should be successfully uploaded" +
474+
"via the streaming rpc.",
475+
serviceFail: false,
476+
},
477+
{
478+
name: "if an error response is received from the backend, local files should not be deleted",
479+
serviceFail: true,
480+
},
481+
}
482+
483+
for _, tc := range tests {
484+
t.Run(tc.name, func(t *testing.T) {
485+
// Set up server.
486+
mockClock := clk.NewMock()
487+
clock = mockClock
488+
tmpDir := t.TempDir()
489+
490+
// Set up data manager.
491+
dmsvc, r := newTestDataManager(t)
492+
defer dmsvc.Close(context.Background())
493+
var cfg *Config
494+
var deps []string
495+
captureInterval := time.Millisecond * 10
496+
cfg, deps = setupConfig(t, enabledBinaryCollectorConfigPath)
497+
498+
// Set up service config with just capture enabled.
499+
cfg.CaptureDisabled = false
500+
cfg.ScheduledSyncDisabled = true
501+
cfg.SyncIntervalMins = syncIntervalMins
502+
cfg.CaptureDir = tmpDir
503+
504+
resources := resourcesFromDeps(t, r, deps)
505+
err := dmsvc.Reconfigure(context.Background(), resources, resource.Config{
506+
ConvertedAttributes: cfg,
507+
})
508+
test.That(t, err, test.ShouldBeNil)
509+
510+
// Capture an image, then close.
511+
mockClock.Add(captureInterval)
512+
waitForCaptureFilesToExceedNFiles(tmpDir, 0)
513+
err = dmsvc.Close(context.Background())
514+
test.That(t, err, test.ShouldBeNil)
515+
516+
// Get all captured data.
517+
_, capturedData, err := getCapturedData(tmpDir)
518+
test.That(t, err, test.ShouldBeNil)
519+
520+
// Turn dmsvc back on with capture disabled.
521+
newDMSvc, r := newTestDataManager(t)
522+
defer newDMSvc.Close(context.Background())
523+
f := atomic.Bool{}
524+
f.Store(tc.serviceFail)
525+
mockClient := mockDataSyncServiceClient{
526+
streamingDCUploads: make(chan *mockStreamingDCClient, 10),
527+
fail: &f,
528+
}
529+
// Set max unary file size to 1 byte, so it uses the streaming rpc.
530+
datasync.MaxUnaryFileSize = 1
531+
newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
532+
cfg.CaptureDisabled = true
533+
cfg.ScheduledSyncDisabled = true
534+
resources = resourcesFromDeps(t, r, deps)
535+
err = newDMSvc.Reconfigure(context.Background(), resources, resource.Config{
536+
ConvertedAttributes: cfg,
537+
})
538+
test.That(t, err, test.ShouldBeNil)
539+
540+
// Call sync.
541+
err = newDMSvc.Sync(context.Background(), nil)
542+
test.That(t, err, test.ShouldBeNil)
543+
544+
// Wait for upload requests.
545+
var uploads []*mockStreamingDCClient
546+
var urs []*v1.StreamingDataCaptureUploadRequest
547+
// Get the successful requests
548+
wait := time.After(time.Second * 3)
549+
select {
550+
case <-wait:
551+
if !tc.serviceFail {
552+
t.Fatalf("timed out waiting for sync request")
553+
}
554+
case r := <-mockClient.streamingDCUploads:
555+
uploads = append(uploads, r)
556+
select {
557+
case <-wait:
558+
t.Fatalf("timed out waiting for sync request")
559+
case <-r.closed:
560+
urs = append(urs, r.reqs...)
561+
}
562+
}
563+
waitUntilNoFiles(tmpDir)
564+
565+
// Validate error and URs.
566+
remainingFiles := getAllFilePaths(tmpDir)
567+
if tc.serviceFail {
568+
// Validate no files were successfully uploaded.
569+
test.That(t, len(uploads), test.ShouldEqual, 0)
570+
// Error case, file should not be deleted.
571+
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
572+
} else {
573+
// Validate first metadata message.
574+
test.That(t, len(uploads), test.ShouldEqual, 1)
575+
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
576+
actMD := urs[0].GetMetadata()
577+
test.That(t, actMD, test.ShouldNotBeNil)
578+
test.That(t, actMD.GetUploadMetadata(), test.ShouldNotBeNil)
579+
test.That(t, actMD.GetSensorMetadata(), test.ShouldNotBeNil)
580+
test.That(t, actMD.GetUploadMetadata().Type, test.ShouldEqual, v1.DataType_DATA_TYPE_BINARY_SENSOR)
581+
test.That(t, actMD.GetUploadMetadata().PartId, test.ShouldNotBeBlank)
582+
583+
// Validate ensuing data messages.
584+
dataRequests := urs[1:]
585+
var actData []byte
586+
for _, d := range dataRequests {
587+
actData = append(actData, d.GetData()...)
588+
}
589+
test.That(t, actData, test.ShouldResemble, capturedData[0].GetBinary())
590+
591+
// Validate file no longer exists.
592+
test.That(t, len(getAllFileInfos(tmpDir)), test.ShouldEqual, 0)
460593
}
461594
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
462595
})
@@ -661,6 +794,7 @@ type mockDataSyncServiceClient struct {
661794
succesfulDCRequests chan *v1.DataCaptureUploadRequest
662795
failedDCRequests chan *v1.DataCaptureUploadRequest
663796
fileUploads chan *mockFileUploadClient
797+
streamingDCUploads chan *mockStreamingDCClient
664798
fail *atomic.Bool
665799
}
666800

@@ -690,7 +824,26 @@ func (c mockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc.
690824
return nil, errors.New("oh no error")
691825
}
692826
ret := &mockFileUploadClient{closed: make(chan struct{})}
693-
c.fileUploads <- ret
827+
select {
828+
case <-ctx.Done():
829+
return nil, ctx.Err()
830+
case c.fileUploads <- ret:
831+
}
832+
return ret, nil
833+
}
834+
835+
func (c mockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context,
836+
opts ...grpc.CallOption,
837+
) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) {
838+
if c.fail.Load() {
839+
return nil, errors.New("oh no error")
840+
}
841+
ret := &mockStreamingDCClient{closed: make(chan struct{})}
842+
select {
843+
case <-ctx.Done():
844+
return nil, ctx.Err()
845+
case c.streamingDCUploads <- ret:
846+
}
694847
return ret, nil
695848
}
696849

@@ -711,6 +864,28 @@ func (m *mockFileUploadClient) CloseAndRecv() (*v1.FileUploadResponse, error) {
711864
}
712865

713866
func (m *mockFileUploadClient) CloseSend() error {
867+
m.closed <- struct{}{}
868+
return nil
869+
}
870+
871+
type mockStreamingDCClient struct {
872+
reqs []*v1.StreamingDataCaptureUploadRequest
873+
closed chan struct{}
874+
grpc.ClientStream
875+
}
876+
877+
func (m *mockStreamingDCClient) Send(req *v1.StreamingDataCaptureUploadRequest) error {
878+
m.reqs = append(m.reqs, req)
879+
return nil
880+
}
881+
882+
func (m *mockStreamingDCClient) CloseAndRecv() (*v1.StreamingDataCaptureUploadResponse, error) {
883+
m.closed <- struct{}{}
884+
return &v1.StreamingDataCaptureUploadResponse{}, nil
885+
}
886+
887+
func (m *mockStreamingDCClient) CloseSend() error {
888+
m.closed <- struct{}{}
714889
return nil
715890
}
716891

services/datamanager/datacapture/data_capture_file.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ import (
2222

2323
// TODO Data-343: Reorganize this into a more standard interface/package, and add tests.
2424

25-
// TODO: this is all way too complicated i think. Just keep track of read/write offsets
26-
2725
// FileExt defines the file extension for Viam data capture files.
2826
const (
2927
InProgressFileExt = ".prog"
3028
FileExt = ".capture"
3129
readImage = "ReadImage"
3230
nextPointCloud = "NextPointCloud"
31+
getPointCloudMap = "GetPointCloudMap"
3332
)
3433

3534
// File is the data structure containing data captured by collectors. It is backed by a file on disk containing
@@ -240,7 +239,7 @@ func getFileTimestampName() string {
240239
// TODO DATA-246: Implement this in some more robust, programmatic way.
241240
func getDataType(methodName string) v1.DataType {
242241
switch methodName {
243-
case nextPointCloud, readImage:
242+
case nextPointCloud, readImage, getPointCloudMap:
244243
return v1.DataType_DATA_TYPE_BINARY_SENSOR
245244
default:
246245
return v1.DataType_DATA_TYPE_TABULAR_SENSOR

0 commit comments

Comments
 (0)