Skip to content

Commit

Permalink
RSDK-9591 - Kill all lingering module process before exiting (#4657)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheukt authored Jan 10, 2025
1 parent e281172 commit 819123b
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ require (
go.uber.org/zap v1.27.0
go.viam.com/api v0.1.380
go.viam.com/test v1.2.4
go.viam.com/utils v0.1.118
go.viam.com/utils v0.1.120
goji.io v2.0.2+incompatible
golang.org/x/image v0.19.0
golang.org/x/mobile v0.0.0-20240112133503-c713f31d574b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1517,8 +1517,8 @@ go.viam.com/api v0.1.380 h1:VgRHDlPBku+kjIp4omxmPNmRVZezytFUUOFJ2xpRFR8=
go.viam.com/api v0.1.380/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.1.118 h1:Kp6ebrCBiYReeSC1XnWPTjtBJoTUsQ6YWAomQkQF/mE=
go.viam.com/utils v0.1.118/go.mod h1:g1CaEkf7aJCrSI/Sfkx+6cBS1+Y3fPT2pvMQbp7TTBI=
go.viam.com/utils v0.1.120 h1:qwHt053zgcg6HtdDpP6Aj6lkmxZYyL1w6d43Ef18uso=
go.viam.com/utils v0.1.120/go.mod h1:g1CaEkf7aJCrSI/Sfkx+6cBS1+Y3fPT2pvMQbp7TTBI=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs=
Expand Down
24 changes: 24 additions & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,22 @@ func (mgr *Manager) Close(ctx context.Context) error {
return err
}

// Kill will kill all processes in the module's process group.
// This is best effort as we do not have a lock during this
// function. Taking the lock will mean that we may be blocked,
// and we do not want to be blocked.
func (mgr *Manager) Kill() {
if mgr.restartCtxCancel != nil {
mgr.restartCtxCancel()
}
// sync.Map's Range does not block other methods on the map;
// even f itself may call any method on the map.
mgr.modules.Range(func(_ string, mod *module) bool {
mod.killProcessGroup()
return true
})
}

// Handles returns all the models for each module registered.
func (mgr *Manager) Handles() map[string]modlib.HandlerMap {
res := map[string]modlib.HandlerMap{}
Expand Down Expand Up @@ -1247,6 +1263,14 @@ func (m *module) stopProcess() error {
return nil
}

func (m *module) killProcessGroup() {
if m.process == nil {
return
}
m.logger.Infof("Killing module: %s process", m.cfg.Name)
m.process.KillGroup()
}

func (m *module) registerResources(mgr modmaninterface.ModuleManager) {
for api, models := range m.handles {
if _, ok := resource.LookupGenericAPIRegistration(api.API); !ok {
Expand Down
45 changes: 45 additions & 0 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"sync/atomic"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -362,6 +364,49 @@ func TestModManagerFunctions(t *testing.T) {
}
}

func TestModManagerKill(t *testing.T) {
// this test will not pass on windows as it relies on the UnixPid of the managed process
if runtime.GOOS == "windows" {
t.Skip()
}
modPath := rtestutils.BuildTempModule(t, "examples/customresources/demos/simplemodule")
logger, logs := logging.NewObservedTestLogger(t)
parentAddr := setupSocketWithRobot(t)

ctx := context.Background()
mgr := setupModManager(t, ctx, parentAddr, logger, modmanageroptions.Options{})
modCfg := config.Module{
Name: "simple-module",
ExePath: modPath,
}
err := mgr.Add(ctx, modCfg)
test.That(t, err, test.ShouldBeNil)

// get the module from the module map
mMgr, ok := mgr.(*Manager)
test.That(t, ok, test.ShouldBeTrue)

mod, ok := mMgr.modules.Load(modCfg.Name)
test.That(t, ok, test.ShouldBeTrue)

mgr.Kill()

testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, logs.FilterMessageSnippet("Killing module").Len(),
test.ShouldEqual, 1)
})

// in CI, we have to send another signal to make sure the cmd.Wait() in
// the manage goroutine actually returns.
// We do not care about the error if it is expected.
// maybe related to https://github.com/golang/go/issues/18874
pid, err := mod.process.UnixPid()
test.That(t, err, test.ShouldBeNil)
if err := syscall.Kill(pid, syscall.SIGTERM); err != nil {
test.That(t, errors.Is(err, os.ErrProcessDone), test.ShouldBeFalse)
}
}

func TestModManagerValidation(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)
Expand Down
1 change: 1 addition & 0 deletions module/modmaninterface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ type ModuleManager interface {
FirstRun(ctx context.Context, conf config.Module) error

Close(ctx context.Context) error
Kill()
}
6 changes: 6 additions & 0 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (r *localRobot) Close(ctx context.Context) error {
return err
}

// Kill will attempt to kill any processes on the system started by the robot as quickly as possible.
// This operation is not clean and will not wait for completion.
func (r *localRobot) Kill() {
r.manager.Kill()
}

// StopAll cancels all current and outstanding operations for the robot and stops all actuators and movement.
func (r *localRobot) StopAll(ctx context.Context, extra map[resource.Name]map[string]interface{}) error {
// Stop all operations
Expand Down
28 changes: 28 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,34 @@ func TestResourcelessModuleRemove(t *testing.T) {
})
}

func TestKill(t *testing.T) {
// RSDK-9722: this test will not pass in CI as the managed process's manage goroutine
// will not return from Wait() and thus fail the goroutine leak detection.
t.Skip()
ctx := context.Background()
logger, logs := logging.NewObservedTestLogger(t)

// Precompile module to avoid timeout issues when building takes too long.
testPath := rtestutils.BuildTempModule(t, "module/testmodule")

cfg := &config.Config{
Modules: []config.Module{
{
Name: "mod",
ExePath: testPath,
},
},
}
r := setupLocalRobot(t, ctx, cfg, logger)

r.Kill()

testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, logs.FilterMessageSnippet("Killing module").Len(),
test.ShouldEqual, 1)
})
}

func TestCrashedModuleReconfigure(t *testing.T) {
ctx := context.Background()
logger, logs := logging.NewObservedTestLogger(t)
Expand Down
32 changes: 28 additions & 4 deletions robot/impl/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"time"

"github.com/jhump/protoreflect/desc"
Expand Down Expand Up @@ -50,6 +51,9 @@ type resourceManager struct {
resources *resource.Graph
processManager pexec.ProcessManager
processConfigs map[string]pexec.ProcessConfig
// modManagerLock controls access to the moduleManager and prevents a data race.
// This may happen if Kill() or Close() is called concurrently with startModuleManager.
modManagerLock sync.Mutex
moduleManager modif.ModuleManager
opts resourceManagerOptions
logger logging.Logger
Expand Down Expand Up @@ -127,7 +131,10 @@ func (manager *resourceManager) startModuleManager(
PackagesDir: packagesDir,
FTDC: manager.opts.ftdc,
}
manager.moduleManager = modmanager.NewManager(ctx, parentAddr, logger, mmOpts)
modmanager := modmanager.NewManager(ctx, parentAddr, logger, mmOpts)
manager.modManagerLock.Lock()
manager.moduleManager = modmanager
manager.modManagerLock.Unlock()
}

// addRemote adds a remote to the manager.
Expand Down Expand Up @@ -580,17 +587,34 @@ func (manager *resourceManager) Close(ctx context.Context) error {
if err := manager.removeMarkedAndClose(ctx, excludeWebFromClose); err != nil {
allErrs = multierr.Combine(allErrs, err)
}

// take a lock minimally to make a copy of the moduleManager.
manager.modManagerLock.Lock()
modManager := manager.moduleManager
manager.modManagerLock.Unlock()
// moduleManager may be nil in tests, and must be closed last, after resources within have been closed properly above
if manager.moduleManager != nil {
if err := manager.moduleManager.Close(ctx); err != nil {
if modManager != nil {
if err := modManager.Close(ctx); err != nil {
allErrs = multierr.Combine(allErrs, fmt.Errorf("error closing module manager: %w", err))
}
}

return allErrs
}

// Kill attempts to kill all module processes.
func (manager *resourceManager) Kill() {
// TODO(RSDK-9709): Kill processes in processManager as well.

// take a lock minimally to make a copy of the moduleManager.
manager.modManagerLock.Lock()
modManager := manager.moduleManager
manager.modManagerLock.Unlock()
// moduleManager may be nil in tests
if modManager != nil {
modManager.Kill()
}
}

// completeConfig process the tree in reverse order and attempts to build or reconfigure
// resources that are wrapped in a placeholderResource. this function will attempt to
// process resources concurrently when they do not depend on each other unless
Expand Down
1 change: 1 addition & 0 deletions robot/impl/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,7 @@ func (fp *fakeProcess) Start(ctx context.Context) error {
func (fp *fakeProcess) Stop() error {
return nil
}
func (fp *fakeProcess) KillGroup() {}

func (fp *fakeProcess) Status() error {
return nil
Expand Down
5 changes: 5 additions & 0 deletions robot/robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ type LocalRobot interface {

// RestartAllowed returns whether the robot can safely be restarted.
RestartAllowed() bool

// Kill will attempt to kill any processes on the system started by the robot as quickly as possible.
// This operation is not clean and will not wait for completion.
// Only use this if comfortable with leaking resources (in cases where exiting the program as quickly as possible is desired).
Kill()
}

// A RemoteRobot is a Robot that was created through a connection.
Expand Down
22 changes: 18 additions & 4 deletions web/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"runtime/pprof"
"slices"
"sync"
"time"

"github.com/invopop/jsonschema"
Expand Down Expand Up @@ -348,7 +349,11 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
forceShutdown := make(chan struct{})
defer func() { <-forceShutdown }()

var cloudRestartCheckerActive chan struct{}
var (
theRobot robot.LocalRobot
theRobotLock sync.Mutex
cloudRestartCheckerActive chan struct{}
)
rpcDialer := rpc.NewCachedDialer()
defer func() {
if cloudRestartCheckerActive != nil {
Expand Down Expand Up @@ -378,6 +383,12 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
case <-doneServing:
return true
default:
theRobotLock.Lock()
robot := theRobot
theRobotLock.Unlock()
if robot != nil {
robot.Kill()
}
s.logger.Fatalw("server failed to cleanly shutdown after deadline", "deadline", hungShutdownDeadline)
return true
}
Expand Down Expand Up @@ -493,8 +504,11 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
cancel()
return err
}
theRobotLock.Lock()
theRobot = myRobot
theRobotLock.Unlock()
defer func() {
err = multierr.Combine(err, myRobot.Close(context.Background()))
err = multierr.Combine(err, theRobot.Close(context.Background()))
}()

// watch for and deliver changes to the robot
Expand All @@ -514,7 +528,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
// Use `fullProcessedConfig` as the initial config for the config watcher
// goroutine, as we want incoming config changes to be compared to the full
// config.
s.configWatcher(ctx, fullProcessedConfig, myRobot, watcher)
s.configWatcher(ctx, fullProcessedConfig, theRobot, watcher)
}()
// At end of this function, cancel context and wait for watcher goroutine
// to complete.
Expand All @@ -528,7 +542,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
if err != nil {
return err
}
return web.RunWeb(ctx, myRobot, options, s.logger)
return web.RunWeb(ctx, theRobot, options, s.logger)
}

// dumpResourceRegistrations prints all builtin resource registrations as a json array
Expand Down

0 comments on commit 819123b

Please sign in to comment.