Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DATA-1649]/[DATA-1647] Add GetPointCloudMap collector and use streaming sync rpc for large files. #2703

Merged
merged 16 commits into from
Jul 28, 2023
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
go.viam.com/api v0.1.159
go.viam.com/api v0.1.164
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2
go.viam.com/utils v0.1.38
goji.io v2.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.viam.com/api v0.1.159 h1:yjqEV9rT4FFqWAH7KsRPjLVdCAuNy2bmaGRn2j/Sb4E=
go.viam.com/api v0.1.159/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
go.viam.com/api v0.1.164 h1:CYR35bAQAueU0DCXRCJMj/DxGZsHMjOxSIJ+4eMWc/Q=
go.viam.com/api v0.1.164/go.mod h1:CwLz82Ie4+Z2lSH2F0oQGViP4/TV9uxjJs+rqHvFWE8=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2 h1:oBiK580EnEIzgFLU4lHOXmGAE3MxnVbeR7s1wp/F3Ps=
go.viam.com/test v1.1.1-0.20220913152726-5da9916c08a2/go.mod h1:XM0tej6riszsiNLT16uoyq1YjuYPWlRBweTPRDanIts=
go.viam.com/utils v0.1.38 h1:Xc5UsEOYjX4WTcnku4vPD9JFKlu6NjdDmA3AY8qnySA=
Expand Down
18 changes: 9 additions & 9 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type builtIn struct {
logger golog.Logger
captureDir string
captureDisabled bool
collectors map[componentMethodMetadata]*collectorAndConfig
collectors map[resourceMethodMetadata]*collectorAndConfig
lock sync.Mutex
backgroundWorkers sync.WaitGroup
waitAfterLastModifiedMillis int
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewBuiltIn(
Named: conf.ResourceName().AsNamed(),
logger: logger,
captureDir: viamCaptureDotDir,
collectors: make(map[componentMethodMetadata]*collectorAndConfig),
collectors: make(map[resourceMethodMetadata]*collectorAndConfig),
syncIntervalMins: 0,
additionalSyncPaths: []string{},
tags: []string{},
Expand Down Expand Up @@ -179,8 +179,8 @@ type collectorAndConfig struct {

// Identifier for a particular collector: component name, component model, component type,
// method parameters, and method name.
type componentMethodMetadata struct {
ComponentName string
type resourceMethodMetadata struct {
ResourceName string
MethodParams string
MethodMetadata data.MethodMetadata
}
Expand All @@ -196,7 +196,7 @@ func getDurationFromHz(captureFrequencyHz float32) time.Duration {
// Initialize a collector for the component/method or update it if it has previously been created.
// Return the component/method metadata which is used as a key in the collectors map.
func (svc *builtIn) initializeOrUpdateCollector(
md componentMethodMetadata,
md resourceMethodMetadata,
config *datamanager.DataCaptureConfig,
) (
*collectorAndConfig, error,
Expand Down Expand Up @@ -366,11 +366,11 @@ func (svc *builtIn) Reconfigure(
// Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service.
if svc.captureDisabled {
svc.closeCollectors()
svc.collectors = make(map[componentMethodMetadata]*collectorAndConfig)
svc.collectors = make(map[resourceMethodMetadata]*collectorAndConfig)
}

// Initialize or add collectors based on changes to the component configurations.
newCollectors := make(map[componentMethodMetadata]*collectorAndConfig)
newCollectors := make(map[resourceMethodMetadata]*collectorAndConfig)
if !svc.captureDisabled {
for _, resConf := range svcConfig.ResourceConfigs {
if resConf.Resource == nil {
Expand All @@ -384,8 +384,8 @@ func (svc *builtIn) Reconfigure(
MethodName: resConf.Method,
}

componentMethodMetadata := componentMethodMetadata{
ComponentName: resConf.Name.ShortName(),
componentMethodMetadata := resourceMethodMetadata{
ResourceName: resConf.Name.ShortName(),
MethodMetadata: methodMetadata,
MethodParams: fmt.Sprintf("%v", resConf.AdditionalParams),
}
Expand Down
198 changes: 184 additions & 14 deletions services/datamanager/builtin/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,22 @@ func TestArbitraryFileUpload(t *testing.T) {
name: "scheduled sync of arbitrary files should work",
manualSync: false,
scheduleSyncDisabled: false,
serviceFail: false,
},
{
name: "manual sync of arbitrary files should work",
manualSync: true,
scheduleSyncDisabled: true,
serviceFail: false,
},
{
name: "running manual and scheduled sync concurrently should work and not lead to duplicate uploads",
manualSync: true,
scheduleSyncDisabled: false,
serviceFail: false,
},
{
name: "if an error response is received from the backend, local files should not be deleted",
manualSync: false,
scheduleSyncDisabled: false,
serviceFail: false,
serviceFail: true,
},
}

Expand All @@ -372,11 +369,13 @@ func TestArbitraryFileUpload(t *testing.T) {
dmsvc, r := newTestDataManager(t)
dmsvc.SetWaitAfterLastModifiedMillis(0)
defer dmsvc.Close(context.Background())
f := atomic.Bool{}
f.Store(tc.serviceFail)
mockClient := mockDataSyncServiceClient{
succesfulDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
failedDCRequests: make(chan *v1.DataCaptureUploadRequest, 100),
fileUploads: make(chan *mockFileUploadClient, 100),
fail: &atomic.Bool{},
fail: &f,
}
dmsvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg, deps := setupConfig(t, disabledTabularCollectorConfigPath)
Expand Down Expand Up @@ -416,10 +415,12 @@ func TestArbitraryFileUpload(t *testing.T) {
var fileUploads []*mockFileUploadClient
var urs []*v1.FileUploadRequest
// Get the successful requests
wait := time.After(time.Second * 5)
wait := time.After(time.Second * 3)
select {
case <-wait:
t.Fatalf("timed out waiting for sync request")
if !tc.serviceFail {
t.Fatalf("timed out waiting for sync request")
}
case r := <-mockClient.fileUploads:
fileUploads = append(fileUploads, r)
select {
Expand All @@ -430,12 +431,7 @@ func TestArbitraryFileUpload(t *testing.T) {
}
}

// Validate error and URs.
remainingFiles := getAllFilePaths(additionalPathsDir)
if tc.serviceFail {
// Error case, file should not be deleted.
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
} else {
if !tc.serviceFail {
// Validate first metadata message.
test.That(t, len(fileUploads), test.ShouldEqual, 1)
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
Expand All @@ -457,6 +453,138 @@ func TestArbitraryFileUpload(t *testing.T) {
// Validate file no longer exists.
waitUntilNoFiles(additionalPathsDir)
test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 0)
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
} else {
// Validate file still exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For here and in the other tests, in this failure case, we should check that len(fileUploads) (or streaming uploads) == 0.

The file could technically still exist since we didn't call waitUntilNoFiles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, and moved waitUntilNoFiles up so it gets called in call cases

test.That(t, len(getAllFileInfos(additionalPathsDir)), test.ShouldEqual, 1)
}
})
}
}

func TestStreamingDCUpload(t *testing.T) {
tests := []struct {
name string
serviceFail bool
}{
{
name: "A data capture file greater than MaxUnaryFileSize should be successfully uploaded" +
"via the streaming rpc.",
serviceFail: false,
},
{
name: "if an error response is received from the backend, local files should not be deleted",
serviceFail: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Set up server.
mockClock := clk.NewMock()
clock = mockClock
tmpDir := t.TempDir()

// Set up data manager.
dmsvc, r := newTestDataManager(t)
defer dmsvc.Close(context.Background())
var cfg *Config
var deps []string
captureInterval := time.Millisecond * 10
cfg, deps = setupConfig(t, enabledBinaryCollectorConfigPath)

// Set up service config with just capture enabled.
cfg.CaptureDisabled = false
cfg.ScheduledSyncDisabled = true
cfg.SyncIntervalMins = syncIntervalMins
cfg.CaptureDir = tmpDir

resources := resourcesFromDeps(t, r, deps)
err := dmsvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

// Capture an image, then close.
mockClock.Add(captureInterval)
err = dmsvc.Close(context.Background())
test.That(t, err, test.ShouldBeNil)

// Get all captured data.
_, capturedData, err := getCapturedData(tmpDir)
test.That(t, err, test.ShouldBeNil)

// Turn dmsvc back on with capture disabled.
newDMSvc, r := newTestDataManager(t)
defer newDMSvc.Close(context.Background())
f := atomic.Bool{}
f.Store(tc.serviceFail)
mockClient := mockDataSyncServiceClient{
streamingDCUploads: make(chan *mockStreamingDCClient, 10),
fail: &f,
}
// Set max unary file size to 1 byte, so it uses the streaming rpc.
datasync.MaxUnaryFileSize = 1
newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg.CaptureDisabled = true
cfg.ScheduledSyncDisabled = true
resources = resourcesFromDeps(t, r, deps)
err = newDMSvc.Reconfigure(context.Background(), resources, resource.Config{
ConvertedAttributes: cfg,
})
test.That(t, err, test.ShouldBeNil)

// Call sync.
err = newDMSvc.Sync(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)

// Wait for upload requests.
var uploads []*mockStreamingDCClient
var urs []*v1.StreamingDataCaptureUploadRequest
// Get the successful requests
wait := time.After(time.Second * 3)
select {
case <-wait:
if !tc.serviceFail {
t.Fatalf("timed out waiting for sync request")
}
case r := <-mockClient.streamingDCUploads:
uploads = append(uploads, r)
select {
case <-wait:
t.Fatalf("timed out waiting for sync request")
case <-r.closed:
urs = append(urs, r.reqs...)
}
}

// Validate error and URs.
remainingFiles := getAllFilePaths(tmpDir)
if tc.serviceFail {
// Error case, file should not be deleted.
test.That(t, len(remainingFiles), test.ShouldEqual, 1)
} else {
// Validate first metadata message.
test.That(t, len(uploads), test.ShouldEqual, 1)
test.That(t, len(urs), test.ShouldBeGreaterThan, 0)
actMD := urs[0].GetMetadata()
test.That(t, actMD, test.ShouldNotBeNil)
test.That(t, actMD.GetUploadMetadata(), test.ShouldNotBeNil)
test.That(t, actMD.GetSensorMetadata(), test.ShouldNotBeNil)
test.That(t, actMD.GetUploadMetadata().Type, test.ShouldEqual, v1.DataType_DATA_TYPE_BINARY_SENSOR)
test.That(t, actMD.GetUploadMetadata().PartId, test.ShouldNotBeBlank)

// Validate ensuing data messages.
dataRequests := urs[1:]
var actData []byte
for _, d := range dataRequests {
actData = append(actData, d.GetData()...)
}
test.That(t, actData, test.ShouldResemble, capturedData[0].GetBinary())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, elegant check.


// Validate file no longer exists.
waitUntilNoFiles(tmpDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to you adding this helper function before, makes tests extremely readable.

test.That(t, len(getAllFileInfos(tmpDir)), test.ShouldEqual, 0)
}
test.That(t, dmsvc.Close(context.Background()), test.ShouldBeNil)
})
Expand Down Expand Up @@ -661,6 +789,7 @@ type mockDataSyncServiceClient struct {
succesfulDCRequests chan *v1.DataCaptureUploadRequest
failedDCRequests chan *v1.DataCaptureUploadRequest
fileUploads chan *mockFileUploadClient
streamingDCUploads chan *mockStreamingDCClient
fail *atomic.Bool
}

Expand Down Expand Up @@ -690,7 +819,26 @@ func (c mockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc.
return nil, errors.New("oh no error")
}
ret := &mockFileUploadClient{closed: make(chan struct{})}
c.fileUploads <- ret
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.fileUploads <- ret:
}
return ret, nil
}

func (c mockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context,
opts ...grpc.CallOption,
) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) {
if c.fail.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Why are you checking for a context cancelation in the fail.Load() in DataCaptureUpload but not these streaming requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, we should be. Added here and in FileUpload

return nil, errors.New("oh no error")
}
ret := &mockStreamingDCClient{closed: make(chan struct{})}
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.streamingDCUploads <- ret:
}
return ret, nil
}

Expand All @@ -711,6 +859,28 @@ func (m *mockFileUploadClient) CloseAndRecv() (*v1.FileUploadResponse, error) {
}

func (m *mockFileUploadClient) CloseSend() error {
m.closed <- struct{}{}
return nil
}

type mockStreamingDCClient struct {
reqs []*v1.StreamingDataCaptureUploadRequest
closed chan struct{}
grpc.ClientStream
}

func (m *mockStreamingDCClient) Send(req *v1.StreamingDataCaptureUploadRequest) error {
m.reqs = append(m.reqs, req)
return nil
}

func (m *mockStreamingDCClient) CloseAndRecv() (*v1.StreamingDataCaptureUploadResponse, error) {
m.closed <- struct{}{}
return &v1.StreamingDataCaptureUploadResponse{}, nil
}

func (m *mockStreamingDCClient) CloseSend() error {
m.closed <- struct{}{}
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions services/datamanager/datacapture/data_capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (

// TODO Data-343: Reorganize this into a more standard interface/package, and add tests.

// TODO: this is all way too complicated i think. Just keep track of read/write offsets

// FileExt defines the file extension for Viam data capture files.
const (
InProgressFileExt = ".prog"
FileExt = ".capture"
readImage = "ReadImage"
nextPointCloud = "NextPointCloud"
getPointCloudMap = "GetPointCloudMap"
)

// File is the data structure containing data captured by collectors. It is backed by a file on disk containing
Expand Down Expand Up @@ -240,7 +239,7 @@ func getFileTimestampName() string {
// TODO DATA-246: Implement this in some more robust, programmatic way.
func getDataType(methodName string) v1.DataType {
switch methodName {
case nextPointCloud, readImage:
case nextPointCloud, readImage, getPointCloudMap:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whew good catch!

return v1.DataType_DATA_TYPE_BINARY_SENSOR
default:
return v1.DataType_DATA_TYPE_TABULAR_SENSOR
Expand Down
Loading