Skip to content

Commit

Permalink
RSDK-9878: Move webService into web.go. (#4766)
Browse files Browse the repository at this point in the history
Co-authored-by: Abe Winter <[email protected]>
  • Loading branch information
dgottlieb and abe-winter authored Jan 31, 2025
1 parent 8acdd7f commit d65bbc4
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 70 deletions.
28 changes: 1 addition & 27 deletions robot/web/stream/stream.go → robot/web/stream/backoff.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,15 @@
//go:build !no_cgo || android

// Package webstream provides controls for streaming from the web server.
package webstream

import (
"context"
"errors"
"time"

"github.com/pkg/errors"
"go.viam.com/utils"

"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
)

// streamVideoSource starts a stream from a video source with a throttled error handler.
func streamVideoSource(
ctx context.Context,
source gostream.VideoSource,
stream gostream.Stream,
backoffOpts *BackoffTuningOptions,
logger logging.Logger,
) error {
return gostream.StreamVideoSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, stream.Name()), logger)
}

// streamAudioSource starts a stream from an audio source with a throttled error handler.
func streamAudioSource(
ctx context.Context,
source gostream.AudioSource,
stream gostream.Stream,
backoffOpts *BackoffTuningOptions,
logger logging.Logger,
) error {
return gostream.StreamAudioSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, "audio"), logger)
}

// BackoffTuningOptions represents a set of parameters for determining exponential
// backoff when receiving multiple simultaneous errors.
//
Expand Down
33 changes: 33 additions & 0 deletions robot/web/stream/stream_c.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//go:build !no_cgo || android

// Package webstream provides controls for streaming from the web server.
package webstream

import (
"context"

"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
)

// streamVideoSource starts a stream from a video source with a throttled error handler.
func streamVideoSource(
ctx context.Context,
source gostream.VideoSource,
stream gostream.Stream,
backoffOpts *BackoffTuningOptions,
logger logging.Logger,
) error {
return gostream.StreamVideoSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, stream.Name()), logger)
}

// streamAudioSource starts a stream from an audio source with a throttled error handler.
func streamAudioSource(
ctx context.Context,
source gostream.AudioSource,
stream gostream.Stream,
backoffOpts *BackoffTuningOptions,
logger logging.Logger,
) error {
return gostream.StreamAudioSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, "audio"), logger)
}
34 changes: 34 additions & 0 deletions robot/web/stream/stream_notc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build no_cgo && !android

// Package webstream provides controls for streaming from the web server.
package webstream

import (
"context"

"github.com/pkg/errors"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
)

// streamVideoSource starts a stream from a video source with a throttled error handler.
func streamVideoSource(
ctx context.Context,
source gostream.VideoSource,
stream gostream.Stream,
backoffOpts *BackoffTuningOptions,
logger logging.Logger,
) error {
return errors.New("not implemented for non-cgo")
}

// streamAudioSource starts a stream from an audio source with a throttled error handler.
func streamAudioSource(
ctx context.Context,
source gostream.AudioSource,
stream gostream.Stream,
backoffOpts *BackoffTuningOptions,
logger logging.Logger,
) error {
return errors.New("not implemented for non-cgo")
}
23 changes: 23 additions & 0 deletions robot/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.viam.com/rdk/robot"
grpcserver "go.viam.com/rdk/robot/server"
weboptions "go.viam.com/rdk/robot/web/options"
webstream "go.viam.com/rdk/robot/web/stream"
rutils "go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -80,6 +81,28 @@ type Service interface {
Stats() any
}

type webService struct {
resource.Named

mu sync.Mutex
r robot.Robot
rpcServer rpc.Server
modServer rpc.Server

// Will be nil on non-cgo builds.
streamServer *webstream.Server
services map[resource.API]resource.APIResourceCollection[resource.Resource]
opts options
addr string
modAddr string
logger logging.Logger
cancelCtx context.Context
cancelFunc func()
isRunning bool
webWorkers sync.WaitGroup
modWorkers sync.WaitGroup
}

var internalWebServiceName = resource.NewName(
resource.APINamespaceRDKInternal.WithServiceType("web"),
"builtin",
Expand Down
22 changes: 0 additions & 22 deletions robot/web/web_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"bytes"
"context"
"net/http"
"sync"

"github.com/pkg/errors"
streampb "go.viam.com/api/stream/v1"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
Expand All @@ -37,26 +35,6 @@ func New(r robot.Robot, logger logging.Logger, opts ...Option) Service {
return webSvc
}

type webService struct {
resource.Named

mu sync.Mutex
r robot.Robot
rpcServer rpc.Server
modServer rpc.Server
streamServer *webstream.Server
services map[resource.API]resource.APIResourceCollection[resource.Resource]
opts options
addr string
modAddr string
logger logging.Logger
cancelCtx context.Context
cancelFunc func()
isRunning bool
webWorkers sync.WaitGroup
modWorkers sync.WaitGroup
}

// Reconfigure pulls resources and updates the stream server audio and video streams with the new resources.
func (svc *webService) Reconfigure(ctx context.Context, deps resource.Dependencies, _ resource.Config) error {
svc.mu.Lock()
Expand Down
21 changes: 0 additions & 21 deletions robot/web/web_notc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ package web

import (
"context"
"sync"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
"go.viam.com/utils/rpc"
)

// New returns a new web service for the given robot.
Expand All @@ -29,25 +27,6 @@ func New(r robot.Robot, logger logging.Logger, opts ...Option) Service {
return webSvc
}

type webService struct {
resource.Named

mu sync.Mutex
r robot.Robot
rpcServer rpc.Server
modServer rpc.Server
services map[resource.API]resource.APIResourceCollection[resource.Resource]
opts options
addr string
modAddr string
logger logging.Logger
cancelCtx context.Context
cancelFunc func()
isRunning bool
webWorkers sync.WaitGroup
modWorkers sync.WaitGroup
}

// Update updates the web service when the robot has changed.
func (svc *webService) Reconfigure(ctx context.Context, deps resource.Dependencies, _ resource.Config) error {
svc.mu.Lock()
Expand Down

0 comments on commit d65bbc4

Please sign in to comment.