Skip to content

Commit 04576d8

Browse files
authored
RSDK-6340 Allow streams to restart after disconnect (viamrobotics#3467)
1 parent 00f39ae commit 04576d8

File tree

6 files changed

+158
-30
lines changed

6 files changed

+158
-30
lines changed

components/audioinput/client.go

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ func NewClientFromConn(
4242
name resource.Name,
4343
logger logging.Logger,
4444
) (AudioInput, error) {
45+
// TODO(RSDK-6340): This client might still try to create audio streams after this
46+
// context is canceled. These subsequent audio streams will not work. To fix this,
47+
// use a channel instead of a context like we do in `component/audioinput/client.go`
4548
cancelCtx, cancel := context.WithCancel(context.Background())
4649
c := pb.NewAudioInputServiceClient(conn)
4750
return &client{

components/camera/client.go

+51-20
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@ import (
3131
type client struct {
3232
resource.Named
3333
resource.TriviallyReconfigurable
34-
resource.TriviallyCloseable
3534
mu sync.Mutex
3635
name string
3736
conn rpc.ClientConn
3837
client pb.CameraServiceClient
3938
logger logging.Logger
4039
activeBackgroundWorkers sync.WaitGroup
41-
cancelCtx context.Context
42-
cancel func()
40+
healthyClientCh chan struct{}
4341
}
4442

4543
// NewClientFromConn constructs a new Client from connection passed in.
@@ -50,16 +48,13 @@ func NewClientFromConn(
5048
name resource.Name,
5149
logger logging.Logger,
5250
) (Camera, error) {
53-
cancelCtx, cancel := context.WithCancel(context.Background())
5451
c := pb.NewCameraServiceClient(conn)
5552
return &client{
56-
Named: name.PrependRemote(remoteName).AsNamed(),
57-
name: name.ShortName(),
58-
conn: conn,
59-
client: c,
60-
logger: logger,
61-
cancelCtx: cancelCtx,
62-
cancel: cancel,
53+
Named: name.PrependRemote(remoteName).AsNamed(),
54+
name: name.ShortName(),
55+
conn: conn,
56+
client: c,
57+
logger: logger,
6358
}, nil
6459
}
6560

@@ -121,17 +116,40 @@ func (c *client) Stream(
121116
) (gostream.VideoStream, error) {
122117
ctx, span := trace.StartSpan(ctx, "camera::client::Stream")
123118

124-
cancelCtxWithMIME := gostream.WithMIMETypeHint(c.cancelCtx, gostream.MIMETypeHint(ctx, ""))
125-
streamCtx, stream, frameCh := gostream.NewMediaStreamForChannel[image.Image](cancelCtxWithMIME)
126-
119+
// RSDK-6340: The resource manager closes remote resources when the underlying
120+
// connection goes bad. However, when the connection is re-established, the client
121+
// objects these resources represent are not re-initialized/marked "healthy".
122+
// `healthyClientCh` helps track these transitions between healthy and unhealthy
123+
// states.
124+
//
125+
// When a new `client.Stream()` is created we will either use the existing
126+
// `healthyClientCh` or create a new one.
127+
//
128+
// The goroutine a `Stream()` method spins off will listen to its version of the
129+
// `healthyClientCh` to be notified when the connection has died so it can gracefully
130+
// terminate.
131+
//
132+
// When a connection becomes unhealthy, the resource manager will call `Close` on the
133+
// camera client object. Closing the client will:
134+
// 1. close its `client.healthyClientCh` channel
135+
// 2. wait for existing "stream" goroutines to drain
136+
// 3. nil out the `client.healthyClientCh` member variable
137+
//
138+
// New streams concurrent with closing cannot start until this drain completes. There
139+
// will never be stream goroutines from the old "generation" running concurrently
140+
// with those from the new "generation".
127141
c.mu.Lock()
128-
if err := c.cancelCtx.Err(); err != nil {
129-
c.mu.Unlock()
130-
return nil, err
142+
if c.healthyClientCh == nil {
143+
c.healthyClientCh = make(chan struct{})
131144
}
132-
c.activeBackgroundWorkers.Add(1)
145+
healthyClientCh := c.healthyClientCh
133146
c.mu.Unlock()
134147

148+
ctxWithMIME := gostream.WithMIMETypeHint(context.Background(), gostream.MIMETypeHint(ctx, ""))
149+
streamCtx, stream, frameCh := gostream.NewMediaStreamForChannel[image.Image](ctxWithMIME)
150+
151+
c.activeBackgroundWorkers.Add(1)
152+
135153
goutils.PanicCapturingGo(func() {
136154
streamCtx = trace.NewContext(streamCtx, span)
137155
defer span.End()
@@ -154,6 +172,11 @@ func (c *client) Stream(
154172
select {
155173
case <-streamCtx.Done():
156174
return
175+
case <-healthyClientCh:
176+
if err := stream.Close(ctxWithMIME); err != nil {
177+
c.logger.Warn("error closing stream", err)
178+
}
179+
return
157180
case frameCh <- gostream.MediaReleasePairWithError[image.Image]{
158181
Media: frame,
159182
Release: release,
@@ -290,10 +313,18 @@ func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map
290313
return protoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
291314
}
292315

316+
// TODO(RSDK-6433): This method can be called more than once during a client's lifecycle.
317+
// For example, consider a case where a remote camera goes offline and then back online.
318+
// We will call `Close` on the camera client when we detect the disconnection to remove
319+
// active streams but then reuse the client when the connection is re-established.
293320
func (c *client) Close(ctx context.Context) error {
294321
c.mu.Lock()
295-
c.cancel()
296-
c.mu.Unlock()
322+
defer c.mu.Unlock()
323+
324+
if c.healthyClientCh != nil {
325+
close(c.healthyClientCh)
326+
}
297327
c.activeBackgroundWorkers.Wait()
328+
c.healthyClientCh = nil
298329
return nil
299330
}

components/camera/client_test.go

+72
Original file line numberDiff line numberDiff line change
@@ -551,3 +551,75 @@ func TestClientWithInterceptor(t *testing.T) {
551551

552552
test.That(t, conn.Close(), test.ShouldBeNil)
553553
}
554+
555+
func TestClientStreamAfterClose(t *testing.T) {
556+
// Set up gRPC server
557+
logger := logging.NewTestLogger(t)
558+
listener, err := net.Listen("tcp", "localhost:0")
559+
test.That(t, err, test.ShouldBeNil)
560+
rpcServer, err := rpc.NewServer(logger.AsZap(), rpc.WithUnauthenticated())
561+
test.That(t, err, test.ShouldBeNil)
562+
563+
// Set up camera that can stream images
564+
img := image.NewNRGBA(image.Rect(0, 0, 4, 4))
565+
injectCamera := &inject.Camera{}
566+
injectCamera.PropertiesFunc = func(ctx context.Context) (camera.Properties, error) {
567+
return camera.Properties{}, nil
568+
}
569+
injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
570+
return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) {
571+
return img, func() {}, nil
572+
})), nil
573+
}
574+
575+
// Register CameraService API in our gRPC server.
576+
resources := map[resource.Name]camera.Camera{
577+
camera.Named(testCameraName): injectCamera,
578+
}
579+
cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources)
580+
test.That(t, err, test.ShouldBeNil)
581+
resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API)
582+
test.That(t, err, test.ShouldBeNil)
583+
test.That(t, ok, test.ShouldBeTrue)
584+
test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil)
585+
586+
// Start serving requests.
587+
go rpcServer.Serve(listener)
588+
defer rpcServer.Stop()
589+
590+
// Make client connection
591+
conn, err := viamgrpc.Dial(context.Background(), listener.Addr().String(), logger)
592+
test.That(t, err, test.ShouldBeNil)
593+
client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
594+
test.That(t, err, test.ShouldBeNil)
595+
596+
// Get a stream
597+
stream, err := client.Stream(context.Background())
598+
test.That(t, stream, test.ShouldNotBeNil)
599+
test.That(t, err, test.ShouldBeNil)
600+
601+
// Read from stream
602+
media, _, err := stream.Next(context.Background())
603+
test.That(t, media, test.ShouldNotBeNil)
604+
test.That(t, err, test.ShouldBeNil)
605+
606+
// Close client and read from stream
607+
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
608+
media, _, err = stream.Next(context.Background())
609+
test.That(t, media, test.ShouldBeNil)
610+
test.That(t, err.Error(), test.ShouldContainSubstring, "context canceled")
611+
612+
// Get a new stream
613+
stream, err = client.Stream(context.Background())
614+
test.That(t, stream, test.ShouldNotBeNil)
615+
test.That(t, err, test.ShouldBeNil)
616+
617+
// Read from the new stream
618+
media, _, err = stream.Next(context.Background())
619+
test.That(t, media, test.ShouldNotBeNil)
620+
test.That(t, err, test.ShouldBeNil)
621+
622+
// Close client and connection
623+
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
624+
test.That(t, conn.Close(), test.ShouldBeNil)
625+
}

components/camera/fake/camera.go

+24-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"image"
77
"image/color"
88
"math"
9+
"time"
910

1011
"github.com/golang/geo/r3"
1112
"github.com/pkg/errors"
@@ -57,11 +58,12 @@ func NewCamera(
5758
}
5859
resModel, width, height := fakeModel(newConf.Width, newConf.Height)
5960
cam := &Camera{
60-
Named: conf.ResourceName().AsNamed(),
61-
Model: resModel,
62-
Width: width,
63-
Height: height,
64-
logger: logger,
61+
Named: conf.ResourceName().AsNamed(),
62+
Model: resModel,
63+
Width: width,
64+
Height: height,
65+
Animated: newConf.Animated,
66+
logger: logger,
6567
}
6668
src, err := camera.NewVideoSourceFromReader(ctx, cam, resModel, camera.ColorStream)
6769
if err != nil {
@@ -72,8 +74,9 @@ func NewCamera(
7274

7375
// Config are the attributes of the fake camera config.
7476
type Config struct {
75-
Width int `json:"width,omitempty"`
76-
Height int `json:"height,omitempty"`
77+
Width int `json:"width,omitempty"`
78+
Height int `json:"height,omitempty"`
79+
Animated bool `json:"animated,omitempty"`
7780
}
7881

7982
// Validate checks that the config attributes are valid for a fake camera.
@@ -167,6 +170,7 @@ type Camera struct {
167170
Model *transform.PinholeCameraModel
168171
Width int
169172
Height int
173+
Animated bool
170174
cacheImage *image.RGBA
171175
cachePointCloud pointcloud.PointCloud
172176
logger logging.Logger
@@ -183,16 +187,27 @@ func (c *Camera) Read(ctx context.Context) (image.Image, func(), error) {
183187

184188
totalDist := math.Sqrt(math.Pow(0-width, 2) + math.Pow(0-height, 2))
185189

190+
tick := time.Now().UnixMilli() / 20
186191
var x, y float64
187192
for x = 0; x < width; x++ {
188193
for y = 0; y < height; y++ {
189194
dist := math.Sqrt(math.Pow(0-x, 2) + math.Pow(0-y, 2))
190195
dist /= totalDist
191196
thisColor := color.RGBA{uint8(255 - (255 * dist)), uint8(255 - (255 * dist)), uint8(0 + (255 * dist)), 255}
192-
img.Set(int(x), int(y), thisColor)
197+
198+
var px, py int
199+
if c.Animated {
200+
px = int(int64(x)+tick) % int(width)
201+
py = int(y)
202+
} else {
203+
px, py = int(x), int(y)
204+
}
205+
img.Set(px, py, thisColor)
193206
}
194207
}
195-
c.cacheImage = img
208+
if !c.Animated {
209+
c.cacheImage = img
210+
}
196211
return rimage.ConvertImage(img), func() {}, nil
197212
}
198213

gostream/media.go

+7
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,13 @@ func (ms *mediaStreamFromChannel[T]) Next(ctx context.Context) (T, func(), error
417417
defer span.End()
418418

419419
var zero T
420+
if ms.cancelCtx.Err() != nil {
421+
return zero, nil, ms.cancelCtx.Err()
422+
}
423+
if ctx.Err() != nil {
424+
return zero, nil, ctx.Err()
425+
}
426+
420427
select {
421428
case <-ms.cancelCtx.Done():
422429
return zero, nil, ms.cancelCtx.Err()

resource/resource.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (t TriviallyReconfigurable) Reconfigure(ctx context.Context, deps Dependenc
164164

165165
// TriviallyCloseable is to be embedded by any resource that does not care about
166166
// handling Closes. When is used, it is assumed that the resource does not need
167-
// to return errors when furture non-Close methods are called.
167+
// to return errors when future non-Close methods are called.
168168
type TriviallyCloseable struct{}
169169

170170
// Close always returns no error.

0 commit comments

Comments
 (0)