From 4794a927bcf1e2addebb720701ddea0aa3ed012a Mon Sep 17 00:00:00 2001 From: susmitaSanyal <118220271+susmitaSanyal@users.noreply.github.com> Date: Thu, 2 May 2024 16:36:22 -0400 Subject: [PATCH] Rsdk-7332: slow position reporting (#3856) --- .../gpsrtkserial/gpsrtkserial.go | 88 ++++++------------- 1 file changed, 29 insertions(+), 59 deletions(-) diff --git a/components/movementsensor/gpsrtkserial/gpsrtkserial.go b/components/movementsensor/gpsrtkserial/gpsrtkserial.go index 5cc59e5803d..7d4a20f2c80 100644 --- a/components/movementsensor/gpsrtkserial/gpsrtkserial.go +++ b/components/movementsensor/gpsrtkserial/gpsrtkserial.go @@ -104,25 +104,26 @@ type rtkSerial struct { cancelFunc func() activeBackgroundWorkers sync.WaitGroup - mu sync.Mutex - - ntripClient *gpsutils.NtripInfo - isConnectedToNtrip bool - isClosed bool err movementsensor.LastError lastposition movementsensor.LastPosition lastcompassheading movementsensor.LastCompassHeading InputProtocol string + isClosed bool + + mu sync.Mutex - cachedData *gpsutils.CachedData - correctionWriter io.ReadWriteCloser - writePath string - wbaud int - isVirtualBase bool - readerWriter *bufio.ReadWriter - writer io.Writer - reader io.Reader + // everything below this comment is protected by mu + isConnectedToNtrip bool + ntripClient *gpsutils.NtripInfo + cachedData *gpsutils.CachedData + correctionWriter io.ReadWriteCloser + writePath string + wbaud int + isVirtualBase bool + readerWriter *bufio.ReadWriter + writer io.Writer + reader io.Reader } // Reconfigure reconfigures attributes. @@ -229,19 +230,15 @@ func (g *rtkSerial) start() error { // getStream attempts to connect to ntrip stream. We give up after maxAttempts unsuccessful tries. func (g *rtkSerial) getStream(mountPoint string, maxAttempts int) error { - g.mu.Lock() - defer g.mu.Unlock() + success := false + attempts := 0 var rc io.ReadCloser var err error g.logger.Debug("Getting NTRIP stream") - for attempts := 0; attempts < maxAttempts; attempts++ { - if g.isClosed { - return g.err.Get() - } - + for !success && attempts < maxAttempts { select { case <-g.cancelCtx.Done(): return errors.New("Canceled") @@ -252,24 +249,22 @@ func (g *rtkSerial) getStream(mountPoint string, maxAttempts int) error { return g.ntripClient.Client.GetStream(mountPoint) }() if err == nil { - g.logger.Debug("Connected to stream") - - g.ntripClient.Stream = rc - return g.err.Get() + success = true } + attempts++ } - // If we get here, we had errors on every connection attempt. - // Errors about the old ICY protocol are not "real" errors, but all others are. - if !strings.Contains(err.Error(), "ICY") { + if err != nil { g.logger.Errorf("Can't connect to NTRIP stream: %s", err) return err } + g.logger.Debug("Connected to stream") + + g.mu.Lock() + defer g.mu.Unlock() - // The error was related to the old ICY protocol. Try storing the ReadCloser anyway. - g.logger.Warnf("Detected old HTTP protocol: %s", err) g.ntripClient.Stream = rc - return err + return g.err.Get() } // openPort opens the serial port for writing. @@ -282,9 +277,6 @@ func (g *rtkSerial) openPort() error { MinimumReadSize: 1, } - g.mu.Lock() - defer g.mu.Unlock() - if err := g.cancelCtx.Err(); err != nil { return err } @@ -460,25 +452,26 @@ func (g *rtkSerial) receiveAndWriteSerial() { g.mu.Lock() g.isConnectedToNtrip = true g.mu.Unlock() + continue } } } } +// Most of the movementsensor functions here don't have mutex locks since g.cachedData is protected by +// it's own mutex and not having mutex around g.err is alright. + // Position returns the current geographic location of the MOVEMENTSENSOR. func (g *rtkSerial) Position(ctx context.Context, extra map[string]interface{}) (*geo.Point, float64, error) { - g.mu.Lock() lastError := g.err.Get() if lastError != nil { lastPosition := g.lastposition.GetLastPosition() - g.mu.Unlock() if lastPosition != nil { return lastPosition, 0, nil } return geo.NewPoint(math.NaN(), math.NaN()), math.NaN(), lastError } - g.mu.Unlock() position, alt, err := g.cachedData.Position(ctx, extra) if err != nil { @@ -500,8 +493,6 @@ func (g *rtkSerial) Position(ctx context.Context, extra map[string]interface{}) // LinearVelocity passthrough. func (g *rtkSerial) LinearVelocity(ctx context.Context, extra map[string]interface{}) (r3.Vector, error) { - g.mu.Lock() - defer g.mu.Unlock() lastError := g.err.Get() if lastError != nil { return r3.Vector{}, lastError @@ -521,9 +512,6 @@ func (g *rtkSerial) LinearAcceleration(ctx context.Context, extra map[string]int // AngularVelocity passthrough. func (g *rtkSerial) AngularVelocity(ctx context.Context, extra map[string]interface{}) (spatialmath.AngularVelocity, error) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return spatialmath.AngularVelocity{}, lastError @@ -534,9 +522,6 @@ func (g *rtkSerial) AngularVelocity(ctx context.Context, extra map[string]interf // CompassHeading passthrough. func (g *rtkSerial) CompassHeading(ctx context.Context, extra map[string]interface{}) (float64, error) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return 0, lastError @@ -546,9 +531,6 @@ func (g *rtkSerial) CompassHeading(ctx context.Context, extra map[string]interfa // Orientation passthrough. func (g *rtkSerial) Orientation(ctx context.Context, extra map[string]interface{}) (spatialmath.Orientation, error) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return spatialmath.NewZeroOrientation(), lastError @@ -558,9 +540,6 @@ func (g *rtkSerial) Orientation(ctx context.Context, extra map[string]interface{ // readFix passthrough. func (g *rtkSerial) readFix(ctx context.Context) (int, error) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return 0, lastError @@ -570,9 +549,6 @@ func (g *rtkSerial) readFix(ctx context.Context) (int, error) { // readSatsInView returns the number of satellites in view. func (g *rtkSerial) readSatsInView(ctx context.Context) (int, error) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return 0, lastError @@ -583,9 +559,6 @@ func (g *rtkSerial) readSatsInView(ctx context.Context) (int, error) { // Properties passthrough. func (g *rtkSerial) Properties(ctx context.Context, extra map[string]interface{}) (*movementsensor.Properties, error) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return &movementsensor.Properties{}, lastError @@ -597,9 +570,6 @@ func (g *rtkSerial) Properties(ctx context.Context, extra map[string]interface{} // Accuracy passthrough. func (g *rtkSerial) Accuracy(ctx context.Context, extra map[string]interface{}) (*movementsensor.Accuracy, error, ) { - g.mu.Lock() - defer g.mu.Unlock() - lastError := g.err.Get() if lastError != nil { return nil, lastError