Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use streaming sync rpc for large files. #2703

Merged
merged 16 commits into from
Jul 28, 2023
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
go.viam.com/api v0.1.159
go.viam.com/api v0.1.164
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2
go.viam.com/utils v0.1.38
goji.io v2.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.viam.com/api v0.1.159 h1:yjqEV9rT4FFqWAH7KsRPjLVdCAuNy2bmaGRn2j/Sb4E=
go.viam.com/api v0.1.159/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
go.viam.com/api v0.1.164 h1:CYR35bAQAueU0DCXRCJMj/DxGZsHMjOxSIJ+4eMWc/Q=
go.viam.com/api v0.1.164/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2 h1:oBiK580EnEIzgFLU4lHOXmGAE3MxnVbeR7s1wp/F3Ps=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2/go.mod h1:XM0tej6riszsiNLT16uoyq1YjuYPWlRBweTPRDanIts=
go.viam.com/utils v0.1.38 h1:Xc5UsEOYjX4WTcnku4vPD9JFKlu6NjdDmA3AY8qnySA=
Expand Down
18 changes: 9 additions & 9 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type builtIn struct {
logger golog.Logger
captureDir string
captureDisabled bool
collectors map[componentMethodMetadata]*collectorAndConfig
collectors map[resourceMethodMetadata]*collectorAndConfig
lock sync.Mutex
backgroundWorkers sync.WaitGroup
waitAfterLastModifiedMillis int
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewBuiltIn(
Named: conf.ResourceName().AsNamed(),
logger: logger,
captureDir: viamCaptureDotDir,
collectors: make(map[componentMethodMetadata]*collectorAndConfig),
collectors: make(map[resourceMethodMetadata]*collectorAndConfig),
syncIntervalMins: 0,
additionalSyncPaths: []string{},
tags: []string{},
Expand Down Expand Up @@ -179,8 +179,8 @@ type collectorAndConfig struct {

// Identifier for a particular collector: component name, component model, component type,
// method parameters, and method name.
type componentMethodMetadata struct {
ComponentName string
type resourceMethodMetadata struct {
ResourceName string
MethodParams string
MethodMetadata data.MethodMetadata
}
Expand All @@ -196,7 +196,7 @@ func getDurationFromHz(captureFrequencyHz float32) time.Duration {
// Initialize a collector for the component/method or update it if it has previously been created.
// Return the component/method metadata which is used as a key in the collectors map.
func (svc *builtIn) initializeOrUpdateCollector(
md componentMethodMetadata,
md resourceMethodMetadata,
config *datamanager.DataCaptureConfig,
) (
*collectorAndConfig, error,
Expand Down Expand Up @@ -366,11 +366,11 @@ func (svc *builtIn) Reconfigure(
// Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service.
if svc.captureDisabled {
svc.closeCollectors()
svc.collectors = make(map[componentMethodMetadata]*collectorAndConfig)
svc.collectors = make(map[resourceMethodMetadata]*collectorAndConfig)
}

// Initialize or add collectors based on changes to the component configurations.
newCollectors := make(map[componentMethodMetadata]*collectorAndConfig)
newCollectors := make(map[resourceMethodMetadata]*collectorAndConfig)
if !svc.captureDisabled {
for _, resConf := range svcConfig.ResourceConfigs {
if resConf.Resource == nil {
Expand All @@ -384,8 +384,8 @@ func (svc *builtIn) Reconfigure(
MethodName: resConf.Method,
}

componentMethodMetadata := componentMethodMetadata{
ComponentName: resConf.Name.ShortName(),
componentMethodMetadata := resourceMethodMetadata{
ResourceName: resConf.Name.ShortName(),
MethodMetadata: methodMetadata,
MethodParams: fmt.Sprintf("%v", resConf.AdditionalParams),
}
Expand Down
180 changes: 165 additions & 15 deletions services/datamanager/builtin/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,31 +332,26 @@ func TestArbitraryFileUpload(t *testing.T) {
name string
manualSync bool
scheduleSyncDisabled bool
serviceFail bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you removed this because it wasn't actually being used, but do we want a test that checks this in arbitrary file upload? "scheduled sync of arbitrary files should work" and "if an error response is received from the backend, local files should not be deleted" are actually the same test where manualSync and scheduleSyncDisabled are both false (and were previously as well, so was an existing test bug)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, added

}{
{
name: "scheduled sync of arbitrary files should work",
manualSync: false,
scheduleSyncDisabled: false,
serviceFail: false,
},
{
name: "manual sync of arbitrary files should work",
manualSync: true,
scheduleSyncDisabled: true,
serviceFail: false,
},
{
name: "running manual and scheduled sync concurrently should work and not lead to duplicate uploads",
manualSync: true,
scheduleSyncDisabled: false,
serviceFail: false,
},
{
name: "if an error response is received from the backend, local files should not be deleted",
manualSync: false,
scheduleSyncDisabled: false,
serviceFail: false,
},
}

Expand Down Expand Up @@ -431,32 +426,155 @@ func TestArbitraryFileUpload(t *testing.T) {
}

// Validate error and URs.
remainingFiles := getAllFilePaths(additionalPathsDir)
// Validate first metadata message.
test.That(t, len(fileUploads), test.ShouldEqual, 1)
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
actMD := urs[0].GetMetadata()
test.That(t, actMD, test.ShouldNotBeNil)
test.That(t, actMD.Type, test.ShouldEqual, v1.DataType_DATA_TYPE_FILE)
test.That(t, actMD.FileName, test.ShouldEqual, fileName)
test.That(t, actMD.FileExtension, test.ShouldEqual, fileExt)
test.That(t, actMD.PartId, test.ShouldNotBeBlank)

// Validate ensuing data messages.
dataRequests := urs[1:]
var actData []byte
for _, d := range dataRequests {
actData = append(actData, d.GetFileContents().GetData()...)
}
test.That(t, actData, test.ShouldResemble, fileContents)

// Validate file no longer exists.
waitUntilNoFiles(additionalPathsDir)
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0)
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
})
}
}

func TestStreamingDCUpload(t *testing.T) {
tests := []struct {
name string
serviceFail bool
}{
{
name: "A data capture file greater than MaxUnaryFileSize should be successfully uploaded" +
"via the streaming rpc.",
serviceFail: false,
},
{
name: "if an error response is received from the backend, local files should not be deleted",
serviceFail: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Set up server.
mockClock := clk.NewMock()
clock = mockClock
tmpDir := t.TempDir()

// Set up data manager.
dmsvc, r := newTestDataManager(t)
defer dmsvc.Close(context.Background())
var cfg *Config
var deps []string
captureInterval := time.Millisecond * 10
cfg, deps = setupConfig(t, enabledBinaryCollectorConfigPath)

// Set up service config with just capture enabled.
cfg.CaptureDisabled = false
cfg.ScheduledSyncDisabled = true
cfg.SyncIntervalMins = syncIntervalMins
cfg.CaptureDir = tmpDir

resources := resourcesFromDeps(t, r, deps)
err := dmsvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

// Capture an image, then close.
mockClock.Add(captureInterval)
err = dmsvc.Close(context.Background())
test.That(t, err, test.ShouldBeNil)

// Get all captured data.
_, capturedData, err := getCapturedData(tmpDir)
test.That(t, err, test.ShouldBeNil)

// Turn dmsvc back on with capture disabled.
newDMSvc, r := newTestDataManager(t)
defer newDMSvc.Close(context.Background())
f := atomic.Bool{}
f.Store(tc.serviceFail)
mockClient := mockDataSyncServiceClient{
streamingDCUploads: make(chan *mockStreamingDCClient, 10),
fail: &f,
}
// Set max unary file size to 1 byte, so it uses the streaming rpc.
datasync.MaxUnaryFileSize = 1
newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg.CaptureDisabled = true
cfg.ScheduledSyncDisabled = true
resources = resourcesFromDeps(t, r, deps)
err = newDMSvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

// Call sync.
err = newDMSvc.Sync(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)

// Wait for upload requests.
var uploads []*mockStreamingDCClient
var urs []*v1.StreamingDataCaptureUploadRequest
// Get the successful requests
wait := time.After(time.Second * 3)
select {
case <-wait:
if !tc.serviceFail {
t.Fatalf("timed out waiting for sync request")
}
case r := <-mockClient.streamingDCUploads:
uploads = append(uploads, r)
select {
case <-wait:
t.Fatalf("timed out waiting for sync request")
case <-r.closed:
urs = append(urs, r.reqs...)
}
}

// Validate error and URs.
remainingFiles := getAllFilePaths(tmpDir)
if tc.serviceFail {
// Error case, file should not be deleted.
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
} else {
// Validate first metadata message.
test.That(t, len(fileUploads), test.ShouldEqual, 1)
test.That(t, len(uploads), test.ShouldEqual, 1)
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
actMD := urs[0].GetMetadata()
test.That(t, actMD, test.ShouldNotBeNil)
test.That(t, actMD.Type, test.ShouldEqual, v1.DataType_DATA_TYPE_FILE)
test.That(t, actMD.FileName, test.ShouldEqual, fileName)
test.That(t, actMD.FileExtension, test.ShouldEqual, fileExt)
test.That(t, actMD.PartId, test.ShouldNotBeBlank)
test.That(t, actMD.GetUploadMetadata(), test.ShouldNotBeNil)
test.That(t, actMD.GetSensorMetadata(), test.ShouldNotBeNil)
test.That(t, actMD.GetUploadMetadata().Type, test.ShouldEqual, v1.DataType_DATA_TYPE_BINARY_SENSOR)
test.That(t, actMD.GetUploadMetadata().PartId, test.ShouldNotBeBlank)

// Validate ensuing data messages.
dataRequests := urs[1:]
var actData []byte
for _, d := range dataRequests {
actData = append(actData, d.GetFileContents().GetData()...)
actData = append(actData, d.GetData()...)
}
test.That(t, actData, test.ShouldResemble, fileContents)
test.That(t, actData, test.ShouldResemble, capturedData[0].GetBinary())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, elegant check.


// Validate file no longer exists.
waitUntilNoFiles(additionalPathsDir)
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0)
waitUntilNoFiles(tmpDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to you adding this helper function before, makes tests extremely readable.

test.That(t, len(getAllFileInfos(tmpDir)), test.ShouldEqual, 0)
}
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
})
Expand Down Expand Up @@ -661,6 +779,7 @@ type mockDataSyncServiceClient struct {
succesfulDCRequests chan *v1.DataCaptureUploadRequest
failedDCRequests chan *v1.DataCaptureUploadRequest
fileUploads chan *mockFileUploadClient
streamingDCUploads chan *mockStreamingDCClient
fail *atomic.Bool
}

Expand Down Expand Up @@ -694,6 +813,17 @@ func (c mockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc.
return ret, nil
}

func (c mockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context,
opts ...grpc.CallOption,
) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) {
if c.fail.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Why are you checking for a context cancelation in the fail.Load() in DataCaptureUpload but not these streaming requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, we should be. Added here and in FileUpload

return nil, errors.New("oh no error")
}
ret := &mockStreamingDCClient{closed: make(chan struct{})}
c.streamingDCUploads <- ret
return ret, nil
}

type mockFileUploadClient struct {
urs []*v1.FileUploadRequest
closed chan struct{}
Expand All @@ -714,6 +844,26 @@ func (m *mockFileUploadClient) CloseSend() error {
return nil
}

type mockStreamingDCClient struct {
reqs []*v1.StreamingDataCaptureUploadRequest
closed chan struct{}
grpc.ClientStream
}

func (m *mockStreamingDCClient) Send(req *v1.StreamingDataCaptureUploadRequest) error {
m.reqs = append(m.reqs, req)
return nil
}

func (m *mockStreamingDCClient) CloseAndRecv() (*v1.StreamingDataCaptureUploadResponse, error) {
m.closed <- struct{}{}
return &v1.StreamingDataCaptureUploadResponse{}, nil
}

func (m *mockStreamingDCClient) CloseSend() error {
return nil
}

func getTestSyncerConstructorMock(client mockDataSyncServiceClient) datasync.ManagerConstructor {
return func(identity string, _ v1.DataSyncServiceClient, logger golog.Logger) (datasync.Manager, error) {
return datasync.NewManager(identity, client, logger)
Expand Down
Loading