Skip to content

Commit c04cefa

Browse files
randhidhexbabenicksanforddgottliebethanlookpotts
authored
[RSDK-9458] Remove 'Stream' from Camera interface (viamrobotics#4691)
Co-authored-by: hexbabe <[email protected]> Co-authored-by: sean yu <[email protected]> Co-authored-by: nicksanford <[email protected]> Co-authored-by: Dan Gottlieb <[email protected]> Co-authored-by: Ethan Look-Potts <[email protected]> Co-authored-by: abe-winter <[email protected]> Co-authored-by: Eliot Horowitz <[email protected]> Co-authored-by: Naomi Pentrel <[email protected]> Co-authored-by: Maxim Pertsov <[email protected]> Co-authored-by: nicksanford <[email protected]> Co-authored-by: Devin Hilly <[email protected]> Co-authored-by: martha-johnston <[email protected]> Co-authored-by: Kurt S <[email protected]> Co-authored-by: Vijay Vuyyuru <[email protected]> Co-authored-by: Peter LoVerso <[email protected]> Co-authored-by: Nicole Jung <[email protected]>
1 parent ddc16c8 commit c04cefa

21 files changed

+167
-191
lines changed

components/camera/camera.go

+9-13
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,6 @@ type ImageMetadata struct {
7979
}
8080

8181
// A Camera is a resource that can capture frames.
82-
type Camera interface {
83-
resource.Resource
84-
VideoSource
85-
}
86-
87-
// VideoSource represents anything that can capture frames.
8882
// For more information, see the [camera component docs].
8983
//
9084
// Image example:
@@ -129,7 +123,9 @@ type Camera interface {
129123
// [Images method docs]: https://docs.viam.com/dev/reference/apis/components/camera/#getimages
130124
// [NextPointCloud method docs]: https://docs.viam.com/dev/reference/apis/components/camera/#getpointcloud
131125
// [Close method docs]: https://docs.viam.com/dev/reference/apis/components/camera/#close
132-
type VideoSource interface {
126+
type Camera interface {
127+
resource.Resource
128+
133129
// Image returns a byte slice representing an image that tries to adhere to the MIME type hint.
134130
// Image also may return metadata about the frame.
135131
Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error)
@@ -138,20 +134,20 @@ type VideoSource interface {
138134
// along with associated metadata (just timestamp for now). It's not for getting a time series of images from the same imager.
139135
Images(ctx context.Context) ([]NamedImage, resource.ResponseMetadata, error)
140136

141-
// Stream returns a stream that makes a best effort to return consecutive images
142-
// that may have a MIME type hint dictated in the context via gostream.WithMIMETypeHint.
143-
Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error)
144-
145137
// NextPointCloud returns the next immediately available point cloud, not necessarily one
146138
// a part of a sequence. In the future, there could be streaming of point clouds.
147139
NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error)
148140

149141
// Properties returns properties that are intrinsic to the particular
150142
// implementation of a camera.
151143
Properties(ctx context.Context) (Properties, error)
144+
}
152145

153-
// Close shuts down the resource and prevents further use.
154-
Close(ctx context.Context) error
146+
// VideoSource is a camera that has `Stream` embedded to directly integrate with gostream.
147+
// Note that generally, when writing camera components from scratch, embedding `Stream` is an anti-pattern.
148+
type VideoSource interface {
149+
Camera
150+
Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error)
155151
}
156152

157153
// ReadImage reads an image from the given source that is immediately available.

components/camera/client_test.go

-77
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ func TestClient(t *testing.T) {
179179
test.That(t, err, test.ShouldBeNil)
180180
camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
181181
test.That(t, err, test.ShouldBeNil)
182-
183182
frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1Client)
184183
test.That(t, err, test.ShouldBeNil)
185184
compVal, _, err := rimage.CompareImages(img, frame)
@@ -246,10 +245,6 @@ func TestClient(t *testing.T) {
246245
client2, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(failCameraName), logger)
247246
test.That(t, err, test.ShouldBeNil)
248247

249-
_, _, err = camera.ReadImage(context.Background(), client2)
250-
test.That(t, err, test.ShouldNotBeNil)
251-
test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error())
252-
253248
_, _, err = client2.Image(context.Background(), "", nil)
254249
test.That(t, err, test.ShouldNotBeNil)
255250
test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error())
@@ -572,78 +567,6 @@ func TestClientWithInterceptor(t *testing.T) {
572567
test.That(t, conn.Close(), test.ShouldBeNil)
573568
}
574569

575-
func TestClientStreamAfterClose(t *testing.T) {
576-
// Set up gRPC server
577-
logger := logging.NewTestLogger(t)
578-
listener, err := net.Listen("tcp", "localhost:0")
579-
test.That(t, err, test.ShouldBeNil)
580-
rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated())
581-
test.That(t, err, test.ShouldBeNil)
582-
583-
// Set up camera that can stream images
584-
img := image.NewNRGBA(image.Rect(0, 0, 4, 4))
585-
injectCamera := &inject.Camera{}
586-
injectCamera.PropertiesFunc = func(ctx context.Context) (camera.Properties, error) {
587-
return camera.Properties{}, nil
588-
}
589-
injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) {
590-
imgBytes, err := rimage.EncodeImage(ctx, img, mimeType)
591-
test.That(t, err, test.ShouldBeNil)
592-
return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil
593-
}
594-
595-
// Register CameraService API in our gRPC server.
596-
resources := map[resource.Name]camera.Camera{
597-
camera.Named(testCameraName): injectCamera,
598-
}
599-
cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources)
600-
test.That(t, err, test.ShouldBeNil)
601-
resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API)
602-
test.That(t, err, test.ShouldBeNil)
603-
test.That(t, ok, test.ShouldBeTrue)
604-
test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil)
605-
606-
// Start serving requests.
607-
go rpcServer.Serve(listener)
608-
defer rpcServer.Stop()
609-
610-
// Make client connection
611-
conn, err := viamgrpc.Dial(context.Background(), listener.Addr().String(), logger)
612-
test.That(t, err, test.ShouldBeNil)
613-
client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
614-
test.That(t, err, test.ShouldBeNil)
615-
616-
// Get a stream
617-
stream, err := client.Stream(context.Background())
618-
test.That(t, stream, test.ShouldNotBeNil)
619-
test.That(t, err, test.ShouldBeNil)
620-
621-
// Read from stream
622-
media, _, err := stream.Next(context.Background())
623-
test.That(t, media, test.ShouldNotBeNil)
624-
test.That(t, err, test.ShouldBeNil)
625-
626-
// Close client and read from stream
627-
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
628-
media, _, err = stream.Next(context.Background())
629-
test.That(t, media, test.ShouldBeNil)
630-
test.That(t, err.Error(), test.ShouldContainSubstring, "context canceled")
631-
632-
// Get a new stream
633-
stream, err = client.Stream(context.Background())
634-
test.That(t, stream, test.ShouldNotBeNil)
635-
test.That(t, err, test.ShouldBeNil)
636-
637-
// Read from the new stream
638-
media, _, err = stream.Next(context.Background())
639-
test.That(t, media, test.ShouldNotBeNil)
640-
test.That(t, err, test.ShouldBeNil)
641-
642-
// Close client and connection
643-
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
644-
test.That(t, conn.Close(), test.ShouldBeNil)
645-
}
646-
647570
// See modmanager_test.go for the happy path (aka, when the
648571
// client has a webrtc connection).
649572
func TestRTPPassthroughWithoutWebRTC(t *testing.T) {

components/camera/ffmpeg/ffmpeg.go

-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ func init() {
7373
}
7474

7575
type ffmpegCamera struct {
76-
resource.Named
7776
gostream.VideoReader
7877
cancelFunc context.CancelFunc
7978
activeBackgroundWorkers sync.WaitGroup

components/camera/ffmpeg/ffmpeg_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"go.viam.com/test"
99
"go.viam.com/utils/artifact"
1010

11+
"go.viam.com/rdk/components/camera"
1112
"go.viam.com/rdk/logging"
1213
"go.viam.com/rdk/utils"
1314
)
@@ -20,7 +21,7 @@ func TestFFMPEGCamera(t *testing.T) {
2021
test.That(t, err, test.ShouldBeNil)
2122
test.That(t, err, test.ShouldBeNil)
2223
for i := 0; i < 5; i++ {
23-
_, _, err = cam.Image(ctx, utils.MimeTypeJPEG, nil)
24+
_, err = camera.DecodeImageFromCamera(ctx, utils.MimeTypeJPEG, nil, cam)
2425
test.That(t, err, test.ShouldBeNil)
2526
}
2627
test.That(t, cam.Close(context.Background()), test.ShouldBeNil)

components/camera/transformpipeline/classifier_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ func buildRobotWithClassifier(logger logging.Logger) (robot.Robot, error) {
8282
return r, nil
8383
}
8484

85-
//nolint:dupl
8685
func TestClassifierSource(t *testing.T) {
8786
logger := logging.NewTestLogger(t)
8887
ctx, cancel := context.WithCancel(context.Background())
@@ -99,7 +98,9 @@ func TestClassifierSource(t *testing.T) {
9998
test.That(t, err, test.ShouldBeNil)
10099
defer classifier.Close(ctx)
101100

102-
resImg, _, err := camera.ReadImage(ctx, classifier)
101+
streamClassifier, ok := classifier.(camera.VideoSource)
102+
test.That(t, ok, test.ShouldBeTrue)
103+
resImg, _, err := camera.ReadImage(ctx, streamClassifier)
103104
test.That(t, err, test.ShouldBeNil)
104105
ovImg := rimage.ConvertImage(resImg)
105106

components/camera/transformpipeline/detector_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) {
104104
return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger)
105105
}
106106

107-
//nolint:dupl
108107
func TestColorDetectionSource(t *testing.T) {
109108
logger := logging.NewTestLogger(t)
110109
ctx, cancel := context.WithCancel(context.Background())
@@ -121,7 +120,7 @@ func TestColorDetectionSource(t *testing.T) {
121120
test.That(t, err, test.ShouldBeNil)
122121
defer detector.Close(ctx)
123122

124-
resImg, _, err := camera.ReadImage(ctx, detector)
123+
resImg, err := camera.DecodeImageFromCamera(ctx, rutils.MimeTypePNG, nil, detector)
125124
test.That(t, err, test.ShouldBeNil)
126125
ovImg := rimage.ConvertImage(resImg)
127126
test.That(t, ovImg.GetXY(852, 431), test.ShouldResemble, rimage.Red)
@@ -146,7 +145,7 @@ func BenchmarkColorDetectionSource(b *testing.B) {
146145
b.ResetTimer()
147146
// begin benchmarking
148147
for i := 0; i < b.N; i++ {
149-
_, _, _ = camera.ReadImage(ctx, detector)
148+
_, _ = camera.DecodeImageFromCamera(ctx, rutils.MimeTypeJPEG, nil, detector)
150149
}
151150
test.That(b, detector.Close(context.Background()), test.ShouldBeNil)
152151
}

components/camera/transformpipeline/mods_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,8 @@ func BenchmarkColorRotate(b *testing.B) {
608608
am := utils.AttributeMap{
609609
"angle_degs": 180,
610610
}
611-
rs, stream, err := newRotateTransform(context.Background(), src, camera.ColorStream, am)
611+
vs := videoSourceFromCamera(context.Background(), src)
612+
rs, stream, err := newRotateTransform(context.Background(), vs, camera.ColorStream, am)
612613
test.That(b, err, test.ShouldBeNil)
613614
test.That(b, stream, test.ShouldEqual, camera.ColorStream)
614615

@@ -633,7 +634,8 @@ func BenchmarkDepthRotate(b *testing.B) {
633634
am := utils.AttributeMap{
634635
"angle_degs": 180,
635636
}
636-
rs, stream, err := newRotateTransform(context.Background(), src, camera.DepthStream, am)
637+
vs := videoSourceFromCamera(context.Background(), src)
638+
rs, stream, err := newRotateTransform(context.Background(), vs, camera.DepthStream, am)
637639
test.That(b, err, test.ShouldBeNil)
638640
test.That(b, stream, test.ShouldEqual, camera.DepthStream)
639641

components/camera/transformpipeline/pipeline.go

+45-12
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ import (
1212
"go.opencensus.io/trace"
1313

1414
"go.viam.com/rdk/components/camera"
15+
"go.viam.com/rdk/gostream"
1516
"go.viam.com/rdk/logging"
1617
"go.viam.com/rdk/pointcloud"
1718
"go.viam.com/rdk/resource"
1819
"go.viam.com/rdk/rimage"
1920
"go.viam.com/rdk/rimage/transform"
2021
"go.viam.com/rdk/robot"
22+
camerautils "go.viam.com/rdk/robot/web/stream/camera"
2123
"go.viam.com/rdk/utils"
2224
)
2325

@@ -47,11 +49,12 @@ func init() {
4749
if err != nil {
4850
return nil, fmt.Errorf("no source camera for transform pipeline (%s): %w", sourceName, err)
4951
}
50-
src, err := newTransformPipeline(ctx, source, newConf, actualR, logger)
52+
vs := videoSourceFromCamera(ctx, source)
53+
src, err := newTransformPipeline(ctx, vs, conf.ResourceName().AsNamed(), newConf, actualR, logger)
5154
if err != nil {
5255
return nil, err
5356
}
54-
return camera.FromVideoSource(conf.ResourceName(), src, logger), nil
57+
return src, nil
5558
},
5659
})
5760
}
@@ -84,9 +87,36 @@ func (cfg *transformConfig) Validate(path string) ([]string, error) {
8487
return deps, nil
8588
}
8689

90+
type videoSource struct {
91+
camera.Camera
92+
vs gostream.VideoSource
93+
}
94+
95+
func (sc *videoSource) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
96+
if sc.vs != nil {
97+
return sc.vs.Stream(ctx, errHandlers...)
98+
}
99+
return sc.Stream(ctx, errHandlers...)
100+
}
101+
102+
// videoSourceFromCamera is a hack to allow us to use Stream to pipe frames through the pipeline
103+
// and still implement a camera resource.
104+
// We prefer this methodology over passing Image bytes because each transform desires a image.Image over
105+
// a raw byte slice. To use Image would be to wastefully encode and decode the frame multiple times.
106+
func videoSourceFromCamera(ctx context.Context, cam camera.Camera) camera.VideoSource {
107+
if streamCam, ok := cam.(camera.VideoSource); ok {
108+
return streamCam
109+
}
110+
return &videoSource{
111+
Camera: cam,
112+
vs: camerautils.VideoSourceFromCamera(ctx, cam),
113+
}
114+
}
115+
87116
func newTransformPipeline(
88117
ctx context.Context,
89118
source camera.VideoSource,
119+
named resource.Named,
90120
cfg *transformConfig,
91121
r robot.Robot,
92122
logger logging.Logger,
@@ -98,7 +128,7 @@ func newTransformPipeline(
98128
return nil, errors.New("pipeline has no transforms in it")
99129
}
100130
// check if the source produces a depth image or color image
101-
img, release, err := camera.ReadImage(ctx, source)
131+
img, err := camera.DecodeImageFromCamera(ctx, "", nil, source)
102132

103133
var streamType camera.ImageType
104134
if err != nil {
@@ -110,41 +140,44 @@ func newTransformPipeline(
110140
} else {
111141
streamType = camera.ColorStream
112142
}
113-
if release != nil {
114-
release()
115-
}
116143
// loop through the pipeline and create the image flow
117144
pipeline := make([]camera.VideoSource, 0, len(cfg.Pipeline))
118-
lastSource := source
145+
lastSource := videoSourceFromCamera(ctx, source)
119146
for _, tr := range cfg.Pipeline {
120147
src, newStreamType, err := buildTransform(ctx, r, lastSource, streamType, tr)
121148
if err != nil {
122149
return nil, err
123150
}
124-
pipeline = append(pipeline, src)
125-
lastSource = src
151+
streamSrc := videoSourceFromCamera(ctx, src)
152+
pipeline = append(pipeline, streamSrc)
153+
lastSource = streamSrc
126154
streamType = newStreamType
127155
}
128156
cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(cfg.CameraParameters, cfg.DistortionParameters)
129157
return camera.NewVideoSourceFromReader(
130158
ctx,
131-
transformPipeline{pipeline, lastSource, cfg.CameraParameters, logger},
159+
transformPipeline{named, pipeline, lastSource, cfg.CameraParameters, logger},
132160
&cameraModel,
133161
streamType,
134162
)
135163
}
136164

137165
type transformPipeline struct {
166+
resource.Named
138167
pipeline []camera.VideoSource
139-
src camera.VideoSource
168+
src camera.Camera
140169
intrinsicParameters *transform.PinholeCameraIntrinsics
141170
logger logging.Logger
142171
}
143172

144173
func (tp transformPipeline) Read(ctx context.Context) (image.Image, func(), error) {
145174
ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::Read")
146175
defer span.End()
147-
return camera.ReadImage(ctx, tp.src)
176+
img, err := camera.DecodeImageFromCamera(ctx, "", nil, tp.src)
177+
if err != nil {
178+
return nil, func() {}, err
179+
}
180+
return img, func() {}, nil
148181
}
149182

150183
func (tp transformPipeline) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) {

0 commit comments

Comments
 (0)