From 819123b8ba7bb4e56fa8c90457a01f753c3ebac3 Mon Sep 17 00:00:00 2001 From: Cheuk <90270663+cheukt@users.noreply.github.com> Date: Fri, 10 Jan 2025 16:05:15 -0500 Subject: [PATCH] RSDK-9591 - Kill all lingering module process before exiting (#4657) --- go.mod | 2 +- go.sum | 4 +-- module/modmanager/manager.go | 24 +++++++++++++++ module/modmanager/manager_test.go | 45 +++++++++++++++++++++++++++++ module/modmaninterface/interface.go | 1 + robot/impl/local_robot.go | 6 ++++ robot/impl/local_robot_test.go | 28 ++++++++++++++++++ robot/impl/resource_manager.go | 32 +++++++++++++++++--- robot/impl/resource_manager_test.go | 1 + robot/robot.go | 5 ++++ web/server/entrypoint.go | 22 +++++++++++--- 11 files changed, 159 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 83e5680b9af..2c6dba386b0 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( go.uber.org/zap v1.27.0 go.viam.com/api v0.1.380 go.viam.com/test v1.2.4 - go.viam.com/utils v0.1.118 + go.viam.com/utils v0.1.120 goji.io v2.0.2+incompatible golang.org/x/image v0.19.0 golang.org/x/mobile v0.0.0-20240112133503-c713f31d574b diff --git a/go.sum b/go.sum index 55b2f7196c5..bea7feed60f 100644 --- a/go.sum +++ b/go.sum @@ -1517,8 +1517,8 @@ go.viam.com/api v0.1.380 h1:VgRHDlPBku+kjIp4omxmPNmRVZezytFUUOFJ2xpRFR8= go.viam.com/api v0.1.380/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= -go.viam.com/utils v0.1.118 h1:Kp6ebrCBiYReeSC1XnWPTjtBJoTUsQ6YWAomQkQF/mE= -go.viam.com/utils v0.1.118/go.mod h1:g1CaEkf7aJCrSI/Sfkx+6cBS1+Y3fPT2pvMQbp7TTBI= +go.viam.com/utils v0.1.120 h1:qwHt053zgcg6HtdDpP6Aj6lkmxZYyL1w6d43Ef18uso= +go.viam.com/utils v0.1.120/go.mod h1:g1CaEkf7aJCrSI/Sfkx+6cBS1+Y3fPT2pvMQbp7TTBI= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E= gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs= diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 79342435070..f5b05478f6e 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -211,6 +211,22 @@ func (mgr *Manager) Close(ctx context.Context) error { return err } +// Kill will kill all processes in the module's process group. +// This is best effort as we do not have a lock during this +// function. Taking the lock will mean that we may be blocked, +// and we do not want to be blocked. +func (mgr *Manager) Kill() { + if mgr.restartCtxCancel != nil { + mgr.restartCtxCancel() + } + // sync.Map's Range does not block other methods on the map; + // even f itself may call any method on the map. + mgr.modules.Range(func(_ string, mod *module) bool { + mod.killProcessGroup() + return true + }) +} + // Handles returns all the models for each module registered. func (mgr *Manager) Handles() map[string]modlib.HandlerMap { res := map[string]modlib.HandlerMap{} @@ -1247,6 +1263,14 @@ func (m *module) stopProcess() error { return nil } +func (m *module) killProcessGroup() { + if m.process == nil { + return + } + m.logger.Infof("Killing module: %s process", m.cfg.Name) + m.process.KillGroup() +} + func (m *module) registerResources(mgr modmaninterface.ModuleManager) { for api, models := range m.handles { if _, ok := resource.LookupGenericAPIRegistration(api.API); !ok { diff --git a/module/modmanager/manager_test.go b/module/modmanager/manager_test.go index 67235710bf6..b76e8263624 100644 --- a/module/modmanager/manager_test.go +++ b/module/modmanager/manager_test.go @@ -9,8 +9,10 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strconv" "sync/atomic" + "syscall" "testing" "time" @@ -362,6 +364,49 @@ func TestModManagerFunctions(t *testing.T) { } } +func TestModManagerKill(t *testing.T) { + // this test will not pass on windows as it relies on the UnixPid of the managed process + if runtime.GOOS == "windows" { + t.Skip() + } + modPath := rtestutils.BuildTempModule(t, "examples/customresources/demos/simplemodule") + logger, logs := logging.NewObservedTestLogger(t) + parentAddr := setupSocketWithRobot(t) + + ctx := context.Background() + mgr := setupModManager(t, ctx, parentAddr, logger, modmanageroptions.Options{}) + modCfg := config.Module{ + Name: "simple-module", + ExePath: modPath, + } + err := mgr.Add(ctx, modCfg) + test.That(t, err, test.ShouldBeNil) + + // get the module from the module map + mMgr, ok := mgr.(*Manager) + test.That(t, ok, test.ShouldBeTrue) + + mod, ok := mMgr.modules.Load(modCfg.Name) + test.That(t, ok, test.ShouldBeTrue) + + mgr.Kill() + + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, logs.FilterMessageSnippet("Killing module").Len(), + test.ShouldEqual, 1) + }) + + // in CI, we have to send another signal to make sure the cmd.Wait() in + // the manage goroutine actually returns. + // We do not care about the error if it is expected. + // maybe related to https://github.com/golang/go/issues/18874 + pid, err := mod.process.UnixPid() + test.That(t, err, test.ShouldBeNil) + if err := syscall.Kill(pid, syscall.SIGTERM); err != nil { + test.That(t, errors.Is(err, os.ErrProcessDone), test.ShouldBeFalse) + } +} + func TestModManagerValidation(t *testing.T) { ctx := context.Background() logger := logging.NewTestLogger(t) diff --git a/module/modmaninterface/interface.go b/module/modmaninterface/interface.go index 22f589ccee4..605910d2ba8 100644 --- a/module/modmaninterface/interface.go +++ b/module/modmaninterface/interface.go @@ -30,4 +30,5 @@ type ModuleManager interface { FirstRun(ctx context.Context, conf config.Module) error Close(ctx context.Context) error + Kill() } diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index 801c0b1f4a1..92269b31450 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -197,6 +197,12 @@ func (r *localRobot) Close(ctx context.Context) error { return err } +// Kill will attempt to kill any processes on the system started by the robot as quickly as possible. +// This operation is not clean and will not wait for completion. +func (r *localRobot) Kill() { + r.manager.Kill() +} + // StopAll cancels all current and outstanding operations for the robot and stops all actuators and movement. func (r *localRobot) StopAll(ctx context.Context, extra map[resource.Name]map[string]interface{}) error { // Stop all operations diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index a72f9a05d74..0d56947e3a6 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -2181,6 +2181,34 @@ func TestResourcelessModuleRemove(t *testing.T) { }) } +func TestKill(t *testing.T) { + // RSDK-9722: this test will not pass in CI as the managed process's manage goroutine + // will not return from Wait() and thus fail the goroutine leak detection. + t.Skip() + ctx := context.Background() + logger, logs := logging.NewObservedTestLogger(t) + + // Precompile module to avoid timeout issues when building takes too long. + testPath := rtestutils.BuildTempModule(t, "module/testmodule") + + cfg := &config.Config{ + Modules: []config.Module{ + { + Name: "mod", + ExePath: testPath, + }, + }, + } + r := setupLocalRobot(t, ctx, cfg, logger) + + r.Kill() + + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, logs.FilterMessageSnippet("Killing module").Len(), + test.ShouldEqual, 1) + }) +} + func TestCrashedModuleReconfigure(t *testing.T) { ctx := context.Background() logger, logs := logging.NewObservedTestLogger(t) diff --git a/robot/impl/resource_manager.go b/robot/impl/resource_manager.go index 8b711d2309d..8703c0e2606 100644 --- a/robot/impl/resource_manager.go +++ b/robot/impl/resource_manager.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "strings" + "sync" "time" "github.com/jhump/protoreflect/desc" @@ -50,6 +51,9 @@ type resourceManager struct { resources *resource.Graph processManager pexec.ProcessManager processConfigs map[string]pexec.ProcessConfig + // modManagerLock controls access to the moduleManager and prevents a data race. + // This may happen if Kill() or Close() is called concurrently with startModuleManager. + modManagerLock sync.Mutex moduleManager modif.ModuleManager opts resourceManagerOptions logger logging.Logger @@ -127,7 +131,10 @@ func (manager *resourceManager) startModuleManager( PackagesDir: packagesDir, FTDC: manager.opts.ftdc, } - manager.moduleManager = modmanager.NewManager(ctx, parentAddr, logger, mmOpts) + modmanager := modmanager.NewManager(ctx, parentAddr, logger, mmOpts) + manager.modManagerLock.Lock() + manager.moduleManager = modmanager + manager.modManagerLock.Unlock() } // addRemote adds a remote to the manager. @@ -580,10 +587,13 @@ func (manager *resourceManager) Close(ctx context.Context) error { if err := manager.removeMarkedAndClose(ctx, excludeWebFromClose); err != nil { allErrs = multierr.Combine(allErrs, err) } - + // take a lock minimally to make a copy of the moduleManager. + manager.modManagerLock.Lock() + modManager := manager.moduleManager + manager.modManagerLock.Unlock() // moduleManager may be nil in tests, and must be closed last, after resources within have been closed properly above - if manager.moduleManager != nil { - if err := manager.moduleManager.Close(ctx); err != nil { + if modManager != nil { + if err := modManager.Close(ctx); err != nil { allErrs = multierr.Combine(allErrs, fmt.Errorf("error closing module manager: %w", err)) } } @@ -591,6 +601,20 @@ func (manager *resourceManager) Close(ctx context.Context) error { return allErrs } +// Kill attempts to kill all module processes. +func (manager *resourceManager) Kill() { + // TODO(RSDK-9709): Kill processes in processManager as well. + + // take a lock minimally to make a copy of the moduleManager. + manager.modManagerLock.Lock() + modManager := manager.moduleManager + manager.modManagerLock.Unlock() + // moduleManager may be nil in tests + if modManager != nil { + modManager.Kill() + } +} + // completeConfig process the tree in reverse order and attempts to build or reconfigure // resources that are wrapped in a placeholderResource. this function will attempt to // process resources concurrently when they do not depend on each other unless diff --git a/robot/impl/resource_manager_test.go b/robot/impl/resource_manager_test.go index 4a2e9c39c7a..8ed0c263a4a 100644 --- a/robot/impl/resource_manager_test.go +++ b/robot/impl/resource_manager_test.go @@ -1325,6 +1325,7 @@ func (fp *fakeProcess) Start(ctx context.Context) error { func (fp *fakeProcess) Stop() error { return nil } +func (fp *fakeProcess) KillGroup() {} func (fp *fakeProcess) Status() error { return nil diff --git a/robot/robot.go b/robot/robot.go index e3a8c7538df..652ff32abec 100644 --- a/robot/robot.go +++ b/robot/robot.go @@ -186,6 +186,11 @@ type LocalRobot interface { // RestartAllowed returns whether the robot can safely be restarted. RestartAllowed() bool + + // Kill will attempt to kill any processes on the system started by the robot as quickly as possible. + // This operation is not clean and will not wait for completion. + // Only use this if comfortable with leaking resources (in cases where exiting the program as quickly as possible is desired). + Kill() } // A RemoteRobot is a Robot that was created through a connection. diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 6c2a2bed01e..0d430823217 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -13,6 +13,7 @@ import ( "runtime" "runtime/pprof" "slices" + "sync" "time" "github.com/invopop/jsonschema" @@ -348,7 +349,11 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err forceShutdown := make(chan struct{}) defer func() { <-forceShutdown }() - var cloudRestartCheckerActive chan struct{} + var ( + theRobot robot.LocalRobot + theRobotLock sync.Mutex + cloudRestartCheckerActive chan struct{} + ) rpcDialer := rpc.NewCachedDialer() defer func() { if cloudRestartCheckerActive != nil { @@ -378,6 +383,12 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err case <-doneServing: return true default: + theRobotLock.Lock() + robot := theRobot + theRobotLock.Unlock() + if robot != nil { + robot.Kill() + } s.logger.Fatalw("server failed to cleanly shutdown after deadline", "deadline", hungShutdownDeadline) return true } @@ -493,8 +504,11 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err cancel() return err } + theRobotLock.Lock() + theRobot = myRobot + theRobotLock.Unlock() defer func() { - err = multierr.Combine(err, myRobot.Close(context.Background())) + err = multierr.Combine(err, theRobot.Close(context.Background())) }() // watch for and deliver changes to the robot @@ -514,7 +528,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err // Use `fullProcessedConfig` as the initial config for the config watcher // goroutine, as we want incoming config changes to be compared to the full // config. - s.configWatcher(ctx, fullProcessedConfig, myRobot, watcher) + s.configWatcher(ctx, fullProcessedConfig, theRobot, watcher) }() // At end of this function, cancel context and wait for watcher goroutine // to complete. @@ -528,7 +542,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err if err != nil { return err } - return web.RunWeb(ctx, myRobot, options, s.logger) + return web.RunWeb(ctx, theRobot, options, s.logger) } // dumpResourceRegistrations prints all builtin resource registrations as a json array