diff --git a/CHANGELOG.md b/CHANGELOG.md index 507f17e49..7649c892d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- `tt tcm status`: added command to check TCM runtime status (modes: `watchdog` or `interactive`). +- `tt tcm stop`: add command for graceful termination of TCM processes (modes: `watchdog` or `interactive`). + ### Changed ### Fixed @@ -755,4 +758,4 @@ Additionally, several fixes were implemented to improve stability. - Module ``tt create``, to create an application from a template. - Module ``tt build``, to build an application. - Module ``tt install``, to install tarantool/tt. -- Module ``tt remove``, to remove tarantool/tt. +- Module ``tt remove``, to remove tarantool/tt. \ No newline at end of file diff --git a/cli/cmd/tcm.go b/cli/cmd/tcm.go index 9f9f6ec8a..fc0d7a5a2 100644 --- a/cli/cmd/tcm.go +++ b/cli/cmd/tcm.go @@ -2,19 +2,29 @@ package cmd import ( "errors" + "fmt" + "log" "os" "os/exec" + "path/filepath" "time" + "github.com/jedib0t/go-pretty/v6/table" + "github.com/jedib0t/go-pretty/v6/text" "github.com/spf13/cobra" "github.com/tarantool/tt/cli/cmdcontext" - "github.com/tarantool/tt/cli/modules" + "github.com/tarantool/tt/cli/process_utils" tcmCmd "github.com/tarantool/tt/cli/tcm" - "github.com/tarantool/tt/cli/util" + libwatchdog "github.com/tarantool/tt/lib/watchdog" ) var tcmCtx = tcmCmd.TcmCtx{} +const ( + tcmPidFile = "tcm.pid" + watchdogPidFile = "watchdog.pid" +) + func newTcmStartCmd() *cobra.Command { var tcmCmd = &cobra.Command{ Use: "start", @@ -22,12 +32,7 @@ func newTcmStartCmd() *cobra.Command { Long: `Start to the tcm. tt tcm start --watchdog tt tcm start --path`, - Run: func(cmd *cobra.Command, args []string) { - cmdCtx.CommandName = cmd.Name() - err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, internalStartTcm, args) - util.HandleCmdErr(cmd, err) - - }, + Run: RunModuleFunc(internalStartTcm), } tcmCmd.Flags().StringVar(&tcmCtx.Executable, "path", "", "the path to the tcm binary file") tcmCmd.Flags().BoolVar(&tcmCtx.Watchdog, "watchdog", false, "enables the watchdog") @@ -35,6 +40,27 @@ func newTcmStartCmd() *cobra.Command { return tcmCmd } +func newTcmStatusCmd() *cobra.Command { + var tcmCmd = &cobra.Command{ + Use: "status", + Short: "Status tcm application", + Long: `Status to the tcm. + tt tcm status`, + Run: RunModuleFunc(internalTcmStatus), + } + return tcmCmd +} + +func newTcmStopCmd() *cobra.Command { + var tcmCmd = &cobra.Command{ + Use: "stop", + Short: "Stop tcm application", + Long: `Stop to the tcm. tt tcm stop`, + Run: RunModuleFunc(internalTcmStop), + } + return tcmCmd +} + func NewTcmCmd() *cobra.Command { var tcmCmd = &cobra.Command{ Use: "tcm", @@ -42,6 +68,8 @@ func NewTcmCmd() *cobra.Command { } tcmCmd.AddCommand( newTcmStartCmd(), + newTcmStatusCmd(), + newTcmStopCmd(), ) return tcmCmd } @@ -49,26 +77,28 @@ func NewTcmCmd() *cobra.Command { func startTcmInteractive() error { tcmApp := exec.Command(tcmCtx.Executable) - tcmApp.Stdout = os.Stdout - tcmApp.Stderr = os.Stderr - - if err := tcmApp.Run(); err != nil { + if err := tcmApp.Start(); err != nil { return err } - return nil -} + if tcmApp == nil || tcmApp.Process == nil { + return errors.New("process is not running") + } -func startTcmUnderWatchDog() error { - wd, err := tcmCmd.NewWatchdog(5 * time.Second) + err := process_utils.CreatePIDFile(tcmPidFile, tcmApp.Process.Pid) if err != nil { return err } + log.Printf("(INFO): Interactive process PID %d written to %s\n", tcmApp.Process.Pid, tcmPidFile) + return nil +} + +func startTcmUnderWatchDog() error { + wd := libwatchdog.NewWatchdog(tcmPidFile, watchdogPidFile, 5*time.Second) if err := wd.Start(tcmCtx.Executable); err != nil { return err } - return nil } @@ -87,11 +117,61 @@ func internalStartTcm(cmdCtx *cmdcontext.CmdCtx, args []string) error { if err := startTcmInteractive(); err != nil { return err } + } else { + if err := startTcmUnderWatchDog(); err != nil { + return err + } } - if err := startTcmUnderWatchDog(); err != nil { + return nil +} + +func internalTcmStatus(cmdCtx *cmdcontext.CmdCtx, args []string) error { + pidAbsPath, err := filepath.Abs(tcmPidFile) + if err != nil { return err } + if _, err := os.Stat(pidAbsPath); err != nil { + return fmt.Errorf("path does not exist: %v", err) + } + + ts := table.NewWriter() + ts.SetOutputMirror(os.Stdout) + + ts.AppendHeader( + table.Row{"APPLICATION", "STATUS", "PID"}) + + ts.SetColumnConfigs([]table.ColumnConfig{ + {Number: 1, Align: text.AlignLeft, AlignHeader: text.AlignLeft}, + {Number: 2, Align: text.AlignLeft, AlignHeader: text.AlignLeft}, + {Number: 3, Align: text.AlignLeft, AlignHeader: text.AlignLeft}, + {Number: 4, Align: text.AlignLeft, AlignHeader: text.AlignLeft}, + }) + + status := process_utils.ProcessStatus(pidAbsPath) + + ts.AppendRows([]table.Row{ + {"TCM", status.Status, status.PID}, + }) + ts.Render() + return nil +} + +func internalTcmStop(cmdCtx *cmdcontext.CmdCtx, args []string) error { + if isExists, _ := process_utils.ExistsAndRecord(watchdogPidFile); isExists { + _, err := process_utils.StopProcess(watchdogPidFile) + if err != nil { + return err + } + log.Println("Watchdog and TCM stopped") + } else { + _, err := process_utils.StopProcess(tcmPidFile) + if err != nil { + return err + } + log.Println("TCM stopped") + } + return nil } diff --git a/cli/process_utils/process_utils.go b/cli/process_utils/process_utils.go index c49e274a1..96e1ac170 100644 --- a/cli/process_utils/process_utils.go +++ b/cli/process_utils/process_utils.go @@ -104,6 +104,27 @@ func CheckPIDFile(pidFileName string) error { return nil } +// ExistsAndRecord checks if the process with the given pidFileName exists and is alive. +// If it does, returns true, otherwise returns false. +// If something went wrong while trying to read the PID file, returns an error. +func ExistsAndRecord(pidFileName string) (bool, error) { + if _, err := os.Stat(pidFileName); err == nil { + // The PID file already exists. We have to check if the process is alive. + pid, err := GetPIDFromFile(pidFileName) + if err != nil { + return false, fmt.Errorf(`PID file exists, but PID can't be read. Error: "%v"`, err) + } + if res, _ := IsProcessAlive(pid); res { + return true, nil + } + } else if !os.IsNotExist(err) { + return false, fmt.Errorf(`something went wrong while trying to read the`+ + `PID file. Error: "%v"`, err) + } + + return false, nil +} + // CreatePIDFile checks that the instance PID file is absent or // deprecated and creates a new one. Returns an error on failure. func CreatePIDFile(pidFileName string, pid int) error { diff --git a/cli/process_utils/process_utils_test.go b/cli/process_utils/process_utils_test.go new file mode 100644 index 000000000..198d3f83f --- /dev/null +++ b/cli/process_utils/process_utils_test.go @@ -0,0 +1,36 @@ +package process_utils + +import ( + "os" + "os/exec" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_ExistsAndRecord(t *testing.T) { + testFile := "test.pid" + invalid := "invalid.pid" + cmd := exec.Command("sleep", "10") + + t.Cleanup(func() { + os.Remove(testFile) + }) + + err := cmd.Start() + require.NoError(t, err) + + err = CreatePIDFile(testFile, cmd.Process.Pid) + require.NoError(t, err) + + status, err := ExistsAndRecord(testFile) + require.NoError(t, err) + require.True(t, status) + + err = cmd.Process.Kill() + require.NoError(t, err) + + statusInvalid, err := ExistsAndRecord(invalid) + require.False(t, statusInvalid) + require.NoError(t, err) +} diff --git a/cli/tcm/tcm.go b/cli/tcm/tcm.go index cd03a882b..133dbbb9d 100644 --- a/cli/tcm/tcm.go +++ b/cli/tcm/tcm.go @@ -1,6 +1,13 @@ package tcm +// TcmCtx holds parameters and state for managing the TCM process and its watchdog. type TcmCtx struct { + // Path to the TCM executable file. Executable string - Watchdog bool + // Path to the file storing the TCM process PID. + TcmPidFile string + // Flag indicating whether the watchdog is enabled. + Watchdog bool + // Path to the file storing the watchdog process PID. + WathdogPidFile string } diff --git a/cli/tcm/watchdog.go b/cli/tcm/watchdog.go deleted file mode 100644 index dd4c7f453..000000000 --- a/cli/tcm/watchdog.go +++ /dev/null @@ -1,158 +0,0 @@ -package tcm - -import ( - "context" - "errors" - "fmt" - "log" - "os" - "os/exec" - "os/signal" - "path/filepath" - "sync" - "syscall" - "time" -) - -// Watchdog manages the lifecycle of a process. -type Watchdog struct { - // The command to execute and monitor. - cmd *exec.Cmd - // Time to wait before restarting the process. - restartTimeout time.Duration - // Flag to indicate if the Watchdog should stop. - shouldStop bool - // Mutex to protect access to shouldStop. - stopMutex sync.Mutex - // WaitGroup to wait for all goroutines to finish. - doneBarrier sync.WaitGroup - // File to store the process PID. - pidFile string -} - -// NewWatchdog creates a new Watchdog instance. -func NewWatchdog(restartTimeout time.Duration) (*Watchdog, error) { - return &Watchdog{ - restartTimeout: restartTimeout, - pidFile: "tcm/pidFile.pid", - }, nil -} - -// Start starts the process and monitors its execution. -func (wd *Watchdog) Start(bin string, args ...string) error { - wd.doneBarrier.Add(1) - defer wd.doneBarrier.Done() - - signalCtx, signalCancel := context.WithCancel(context.Background()) - defer signalCancel() - - go wd.handleSignals(signalCtx, signalCancel) - - for { - wd.stopMutex.Lock() - if wd.shouldStop { - wd.stopMutex.Unlock() - return nil - } - wd.stopMutex.Unlock() - - wd.cmd = exec.Command(bin, args...) - wd.cmd.Stdout = os.Stdout - wd.cmd.Stderr = os.Stderr - - log.Println("(INFO): Starting process...") - if err := wd.cmd.Start(); err != nil { - log.Printf("(ERROR): Failed to start process: %v\n", err) - return err - } - - if err := wd.writePIDToFile(); err != nil { - log.Printf("(ERROR): Failed to write PID to file: %v\n", err) - return err - } - - err := wd.cmd.Wait() - if err != nil { - var exitErr *exec.ExitError - if errors.As(err, &exitErr) { - log.Printf("(WARN): Process exited with error: %v\n", exitErr) - } else { - log.Printf("(ERROR): Process failed: %v\n", err) - return err - } - } else { - log.Println("(INFO): Process completed successfully.") - } - - wd.stopMutex.Lock() - if wd.shouldStop { - wd.stopMutex.Unlock() - return nil - } - wd.stopMutex.Unlock() - - log.Printf("(INFO): Waiting for %s before restart...\n", wd.restartTimeout) - time.Sleep(wd.restartTimeout) - } -} - -// Stop stops the process and shuts down the Watchdog. -func (wd *Watchdog) Stop() { - wd.stopMutex.Lock() - wd.shouldStop = true - if wd.cmd != nil && wd.cmd.Process != nil { - log.Println("(INFO): Stopping process...") - if err := wd.cmd.Process.Signal(syscall.SIGTERM); err != nil { - log.Printf("(ERROR): Failed to stop process: %v\n", err) - } - } - wd.stopMutex.Unlock() - - wd.doneBarrier.Wait() - os.RemoveAll(filepath.Dir(wd.pidFile)) - log.Println("(INFO): Watchdog stopped.") -} - -// handleSignals listens for OS signals and stops the Watchdog gracefully. -func (wd *Watchdog) handleSignals(ctx context.Context, cancel context.CancelFunc) { - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - - select { - case <-signalChan: - log.Println("(INFO): Received stop signal.") - wd.Stop() - cancel() - case <-ctx.Done(): - return - } -} - -// writePIDToFile writes the PID of the process to a file. -func (wd *Watchdog) writePIDToFile() error { - if wd.cmd == nil || wd.cmd.Process == nil { - return errors.New("process is not running") - } - - pid := wd.cmd.Process.Pid - pidData := fmt.Sprintf("%d", pid) - - dir := filepath.Dir(wd.pidFile) - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return err - } - - file, err := os.Create(wd.pidFile) - if err != nil { - return fmt.Errorf("failed to create PID file: %v", err) - } - defer file.Close() - - _, err = file.WriteString(pidData) - if err != nil { - return err - } - - log.Printf("(INFO): PID %d written to %s\n", pid, wd.pidFile) - return nil -} diff --git a/cli/tcm/watchdog_test.go b/cli/tcm/watchdog_test.go deleted file mode 100644 index 83408ba28..000000000 --- a/cli/tcm/watchdog_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package tcm - -import ( - "fmt" - "os" - "os/exec" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestWatchdogStartProcess(t *testing.T) { - watchdog, err := NewWatchdog(1 * time.Second) - require.NoError(t, err) - - go func() { - watchdog.Start("sleep", "5") - require.NoError(t, err) - }() - - time.Sleep(2 * time.Second) - - _, err = os.Stat(watchdog.pidFile) - require.NoError(t, err) - - watchdog.Stop() -} - -func TestWatchdogRestartProcess(t *testing.T) { - watchdog, err := NewWatchdog(1 * time.Second) - require.NoError(t, err) - - go func() { - err := watchdog.Start("sleep", "1") - require.NoError(t, err) - }() - - time.Sleep(3 * time.Second) - - _, err = os.Stat(watchdog.pidFile) - require.NoError(t, err) - - watchdog.Stop() -} - -func TestWritePIDToFile(t *testing.T) { - pidFile := "/tmp/watchdog_test.pid" - defer os.Remove(pidFile) - - cmd := exec.Command("sleep", "1") - err := cmd.Start() - require.NoError(t, err) - defer cmd.Process.Kill() - - watchdog := &Watchdog{ - cmd: cmd, - pidFile: pidFile, - } - - err = watchdog.writePIDToFile() - require.NoError(t, err) - - pidData, err := os.ReadFile(pidFile) - require.NoError(t, err) - - expectedPID := fmt.Sprintf("%d", cmd.Process.Pid) - require.Equal(t, expectedPID, string(pidData)) -} diff --git a/lib/watchdog/watchdog.go b/lib/watchdog/watchdog.go new file mode 100644 index 000000000..4409977f7 --- /dev/null +++ b/lib/watchdog/watchdog.go @@ -0,0 +1,252 @@ +package watchdog + +import ( + "context" + "errors" + "log" + "os" + "os/exec" + "os/signal" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/tarantool/tt/cli/process_utils" +) + +// Watchdog manages a child process, ensuring reliable startup, automatic restarts on failure, +// and graceful shutdown. It handles system signals, maintains PID file consistency, +// and provides thread-safe operations for concurrent process management. +type Watchdog struct { + // cmd is the child process command (protected by cmdMutex). + cmd *exec.Cmd + // restartTimeout defines delay before restart (0 = immediate). + restartTimeout time.Duration + // shouldStop is atomic flag to prevent restarts when true. + shouldStop atomic.Bool + // doneBarrier waits for goroutines during shutdown. + doneBarrier sync.WaitGroup + // pidFile stores child process PID (protected by pidFileMutex). + pidFile string + // wdPidFile stores watchdog's own PID. + wdPidFile string + // cmdMutex guards cmd operations. + cmdMutex sync.Mutex + // pidFileMutex protects PID file access. + pidFileMutex sync.Mutex + // signalChan receives termination signals. + signalChan chan os.Signal + // processGroupPID stores Process Group ID for cleanup. + processGroupPID atomic.Int32 + // startupComplete signals successful child process start. + startupComplete chan struct{} +} + +// NewWatchdog initializes a new Watchdog instance with the specified +// PID file paths and restart timeout duration. It sets up channels +// for signal notification and startup completion. Returns a pointer +// to the created Watchdog. +func NewWatchdog(pidFile, wdPidFile string, restartTimeout time.Duration) *Watchdog { + return &Watchdog{ + pidFile: pidFile, + wdPidFile: wdPidFile, + restartTimeout: restartTimeout, + signalChan: make(chan os.Signal, 1), + startupComplete: make(chan struct{}), + } +} + +// Start begins monitoring and managing the target process. +// It handles process execution, restart logic, and signal processing. +func (wd *Watchdog) Start(bin string, args ...string) error { + // Add to wait group to track active goroutines. + wd.doneBarrier.Add(1) + // Ensure we decrement wait group when done. + defer wd.doneBarrier.Done() + + // Create context for graceful shutdown. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Ensure context is canceled when we exit. + + // Register signal handler for termination signals. + signal.Notify(wd.signalChan, syscall.SIGINT, syscall.SIGTERM) + // Clean up signal handlers when done. + defer signal.Stop(wd.signalChan) + + // Signal handling goroutine. + go func() { + select { + case sig := <-wd.signalChan: + // Only process signal if not already stopping. + if !wd.shouldStop.Load() { + log.Printf("(INFO): Received signal: %v", sig) + wd.Stop() + } + case <-ctx.Done(): + } + }() + + // Main process management loop. + for { + // Check if we should stop before each iteration. + if wd.shouldStop.Load() { + return nil + } + + // Start the managed process. + wd.cmdMutex.Lock() + wd.cmd = exec.Command(bin, args...) + // Create new process group for proper signal handling. + wd.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // Start the process. + if err := wd.cmd.Start(); err != nil { + wd.cmdMutex.Unlock() + log.Printf("(ERROR): Failed to start process: %v", err) + return err + } + + // Store process group PID atomically + wd.processGroupPID.Store(int32(wd.cmd.Process.Pid)) + wd.cmdMutex.Unlock() + + // Write PID files after successful start + if err := wd.writePIDFiles(); err != nil { + log.Printf("(ERROR): Failed to write PID files: %v", err) + _ = wd.terminateProcess() // Clean up if PID files fail. + return err + } + + log.Println("(INFO): Process started successfully") + close(wd.startupComplete) // Signal that startup is complete. + + // Wait for process completion in separate goroutine. + waitChan := make(chan error, 1) + go func() { waitChan <- wd.cmd.Wait() }() + + select { + case err := <-waitChan: + // Check for stop signal after process exits. + if wd.shouldStop.Load() { + return nil + } + + // Handle process exit status. + if err != nil { + if errors.As(err, new(*exec.ExitError)) { + log.Printf("(WARN): Process exited with error: %v", err) + } else { + log.Printf("(ERROR): Process failed: %v", err) + return err + } + } else { + log.Println("(INFO): Process completed successfully.") + } + + case <-ctx.Done(): + // Context canceled - terminate process. + _ = wd.terminateProcess() + return nil + } + + // Check stop condition again before restart. + if wd.shouldStop.Load() { + return nil + } + + // Wait before restarting + log.Printf("(INFO): Waiting %s before restart...", wd.restartTimeout) + select { + case <-time.After(wd.restartTimeout): + // Continue to next iteration after timeout. + case <-ctx.Done(): + // Exit if context canceled during wait. + return nil + } + + // Reset startup complete channel for next iteration. + wd.startupComplete = make(chan struct{}) + } +} + +// Stop initiates a graceful shutdown of the Watchdog and its managed process. +// It ensures all resources are properly cleaned up and goroutines are terminated. +func (wd *Watchdog) Stop() { + // Atomically set shouldStop flag to prevent multiple concurrent stops + // CompareAndSwap ensures only one goroutine can execute the stop sequence + if !wd.shouldStop.CompareAndSwap(false, true) { + return // Already stopping or stopped. + } + + // Ensure process startup is complete before attempting to stop. + // This prevents races during process initialization. + select { + case <-wd.startupComplete: + // Normal case - startup already completed. + default: + // Startup still in progress - wait for completion. + log.Println("(INFO): Waiting for process startup...") + <-wd.startupComplete + } + + // Terminate the managed process. + _ = wd.terminateProcess() + + // Clean up signal handling. + signal.Stop(wd.signalChan) + close(wd.signalChan) + + // Wait for all goroutines to complete. + // This ensures we don't exit while signal handlers are still running. + wd.doneBarrier.Wait() + + // Final log message indicating successful shutdown. + log.Println("(INFO): Watchdog stopped.") +} + +// terminateProcess sends a termination signal to the managed process. +func (wd *Watchdog) terminateProcess() error { + wd.cmdMutex.Lock() + defer wd.cmdMutex.Unlock() + + if wd.cmd == nil || wd.cmd.Process == nil { + return nil + } + + log.Println("(INFO): Stopping process...") + + pgid := int(wd.processGroupPID.Load()) + + // Send SIGTERM to entire process group if available (preferred method). + if pgid > 0 { + return syscall.Kill(-pgid, syscall.SIGTERM) + } + + return wd.cmd.Process.Signal(syscall.SIGTERM) +} + +// writePIDFiles creates PID files for both the monitored process and the watchdog itself. +func (wd *Watchdog) writePIDFiles() error { + wd.pidFileMutex.Lock() + defer wd.pidFileMutex.Unlock() + + if wd.cmd == nil || wd.cmd.Process == nil { + return errors.New("process is not running") + } + + if err := process_utils.CreatePIDFile(wd.pidFile, wd.cmd.Process.Pid); err != nil { + return err + } + log.Printf("(INFO): Process PID %d written to %s", wd.cmd.Process.Pid, wd.pidFile) + + if isExistsAndRecord, _ := process_utils.ExistsAndRecord(wd.wdPidFile); !isExistsAndRecord { + if err := process_utils.CreatePIDFile(wd.wdPidFile, os.Getpid()); err != nil { + return err + } + } + + log.Printf("(INFO): Watchdog PID %d written to %s", os.Getpid(), wd.wdPidFile) + + return nil +} diff --git a/lib/watchdog/watchdog_test.go b/lib/watchdog/watchdog_test.go new file mode 100644 index 000000000..823a8cbc9 --- /dev/null +++ b/lib/watchdog/watchdog_test.go @@ -0,0 +1,153 @@ +package watchdog + +import ( + "os" + "os/exec" + "path/filepath" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func cleanupPidFiles() { + os.Remove("test.pid") + os.Remove("wd.pid") +} + +func verifyProcessRunning(t *testing.T, wd *Watchdog) { + wd.cmdMutex.Lock() + defer wd.cmdMutex.Unlock() + + if wd.cmd == nil || wd.cmd.Process == nil { + t.Fatal("process should be running") + } +} + +func verifyNoErrors(t *testing.T, errChan chan error) { + select { + case err := <-errChan: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for Start to return") + } +} + +func TestWatchdog_Successful(t *testing.T) { + wd := NewWatchdog("test.pid", "wd.pid", 100*time.Millisecond) + t.Cleanup(cleanupPidFiles) + + cmd := exec.Command("sleep", "1") + errChan := make(chan error, 1) + + go func() { errChan <- wd.Start(cmd.Path, cmd.Args[1:]...) }() + + // Wait for process to start + time.Sleep(200 * time.Millisecond) + + // Verify process is running + verifyProcessRunning(t, wd) + + // Stop the watchdog + wd.Stop() + verifyNoErrors(t, errChan) +} + +func TestWatchdog_EarlyTermination(t *testing.T) { + wd := NewWatchdog("test.pid", "wd.pid", time.Second) + t.Cleanup(cleanupPidFiles) + + cmd := exec.Command("sleep", "10") + errChan := make(chan error, 1) + + go func() { errChan <- wd.Start(cmd.Path, cmd.Args[1:]...) }() + + // Wait for process to start + time.Sleep(200 * time.Millisecond) + + // Stop while process is running + wd.Stop() + verifyNoErrors(t, errChan) +} + +func TestWatchdog_ProcessRestart(t *testing.T) { + wd := NewWatchdog("test.pid", "wd.pid", 100*time.Millisecond) + t.Cleanup(cleanupPidFiles) + + cmd := exec.Command("false") + errChan := make(chan error, 1) + + go func() { errChan <- wd.Start(cmd.Path, cmd.Args[1:]...) }() + + // Wait for at least one restart + time.Sleep(300 * time.Millisecond) + + // Should still be running (restarting) + if wd.shouldStop.Load() { + t.Fatal("watchdog should not be stopped") + } + + wd.Stop() + verifyNoErrors(t, errChan) +} + +// TestWatchdog_SignalHandling tests that the watchdog can handle system signals. +// It verifies that sending a SIGTERM signal to the watchdog's signal channel +// causes the watchdog to stop the monitored process within the expected time frame. +func TestWatchdog_SignalHandling(t *testing.T) { + pidFile := filepath.Join(t.TempDir(), "test.pid") + wdPidFile := filepath.Join(t.TempDir(), "watchdog.pid") + + wd := NewWatchdog(pidFile, wdPidFile, time.Second) + + go func() { + err := wd.Start("sleep", "10") + require.NoError(t, err) + }() + + time.Sleep(100 * time.Millisecond) + + wd.signalChan <- syscall.SIGTERM + + select { + case <-time.After(500 * time.Millisecond): + t.Error("Watchdog didn't stop on SIGTERM") + default: + } +} + +// TestWatchdog_WritePIDFiles verifies that the Watchdog's writePIDFiles +// method successfully creates the expected PID files for both the monitored +// process and the watchdog itself. It starts a test process, assigns it to +// the watchdog, and checks if the PID files are correctly created in the +// specified temporary directories. +func TestWatchdog_WritePIDFiles(t *testing.T) { + pidFile := filepath.Join(t.TempDir(), "test.pid") + wdPidFile := filepath.Join(t.TempDir(), "watchdog.pid") + + wd := &Watchdog{ + pidFile: pidFile, + wdPidFile: wdPidFile, + } + + cmd := exec.Command("sleep", "1") + err := cmd.Start() + require.NoError(t, err) + + defer cmd.Process.Kill() + + wd.cmd = cmd + + err = wd.writePIDFiles() + require.NoError(t, err) + + _, err = os.Stat(pidFile) + require.NoError(t, err) + + _, err = os.Stat(wdPidFile) + require.NoError(t, err) + +} diff --git a/test/integration/tcm/test_tcm.py b/test/integration/tcm/test_tcm.py index f5613422f..898f8763b 100644 --- a/test/integration/tcm/test_tcm.py +++ b/test/integration/tcm/test_tcm.py @@ -1,54 +1,139 @@ -from subprocess import PIPE, Popen +import os +from subprocess import PIPE, STDOUT, Popen, run -from utils import skip_if_tarantool_ce, wait_for_lines_in_output +from utils import skip_if_tcm_not_supported, wait_for_lines_in_output TcmStartCommand = ("tcm", "start") TcmStartWatchdogCommand = ("tcm", "start", "--watchdog") +TcmStatusCommand = ("tcm", "status") +TcmStopCommand = ("tcm", "stop") -def test_tcm_start_success(tt_cmd): - skip_if_tarantool_ce() +def test_tcm_start_success(tt_cmd, tmp_path): + skip_if_tcm_not_supported() - cmd = [str(tt_cmd), *TcmStartCommand] - print(f"Run: {' '.join(cmd)}") + start_cmd = [tt_cmd, *TcmStartCommand] + print(f"Run: {start_cmd}") tcm = Popen( - cmd, + start_cmd, + cwd=tmp_path, + text=True, + encoding="utf-8", + stdout=PIPE, + stderr=STDOUT, + ) + + output = wait_for_lines_in_output(tcm.stdout, ["(INFO):Process PID"]) + + assert tcm.pid + + with open(os.path.join(tmp_path, 'tcmPidFile.pid'), 'r') as f: + tcm_pid = f.read().strip() + assert f'(INFO): Interactive process PID {tcm_pid} written to tcmPidFile.pid' in output.strip() + + cmdStatus = [str(tt_cmd), *TcmStatusCommand] + print(f"Run: {' '.join(cmdStatus)}") + + status = Popen( + cmdStatus, + cwd=tmp_path, text=True, encoding="utf-8", stdout=PIPE, - stderr=PIPE, + stderr=STDOUT, ) - wait_for_lines_in_output(tcm.stdout, ["TCM_CLUSTER_CONNECTION_RATE_LIMIT"]) - tcm.terminate() - tcm.wait() + output = wait_for_lines_in_output(status.stdout, ["TCM", "RUNNING"]) + assert "RUNNING" in output + cmdStop = [str(tt_cmd), *TcmStopCommand] + print(f"Run: {' '.join(cmdStop)}") + + stop = Popen( + cmdStop, + cwd=tmp_path, + text=True, + encoding="utf-8", + stdout=PIPE, + stderr=STDOUT, + ) + + output = wait_for_lines_in_output(stop.stdout, ["TCM"]) + + assert "TCM stopped" in output.strip() assert tcm.poll() is not None -def test_tcm_start_with_watchdog_success(tt_cmd): - skip_if_tarantool_ce() +def test_tcm_start_with_watchdog_success(tt_cmd, tmp_path): + skip_if_tcm_not_supported() cmd = [str(tt_cmd), *TcmStartWatchdogCommand] print(f"Run: {' '.join(cmd)}") tcm = Popen( cmd, + cwd=tmp_path, text=True, encoding="utf-8", stdout=PIPE, - stderr=PIPE, + stderr=STDOUT, ) - wait_for_lines_in_output(tcm.stdout, ["connecting to storage..."]) - tcm.terminate() - tcm.wait() + output = wait_for_lines_in_output(tcm.stdout, ["(INFO): Process started successfully"]) + assert "(INFO): Process started successfully" in output.strip() - assert tcm.pid is not None + cmdStatus = [str(tt_cmd), *TcmStatusCommand] + print(f"Run: {' '.join(cmdStatus)}") + + status = run( + cmdStatus, + cwd=tmp_path, + text=True, + encoding="utf-8", + stdout=PIPE, + stderr=STDOUT, + ) + + with open(os.path.join(tmp_path, 'tcmPidFile.pid'), 'r') as f: + tcm_pid = f.read().strip() + + assert "TCM" and "RUNNING" and tcm_pid in status.stdout tcm.terminate() tcm.wait() + assert tcm.pid is not None assert tcm.poll() is not None + + start_cmd = [tt_cmd, *TcmStartCommand] + print(f"Run: {start_cmd}") + + tcm = Popen( + start_cmd, + cwd=tmp_path, + text=True, + encoding="utf-8", + stdout=PIPE, + stderr=STDOUT, + ) + + output = wait_for_lines_in_output(tcm.stdout, ["(INFO):Process PID"]) + assert tcm.pid + + with open(os.path.join(tmp_path, 'tcmPidFile.pid'), 'r') as f: + tcm_pid = f.read().strip() + assert f'(INFO): Interactive process PID {tcm_pid} written to tcmPidFile.pid' in output.strip() + + tcmDouble = Popen( + start_cmd, + cwd=tmp_path, + text=True, + encoding="utf-8", + stdout=PIPE, + stderr=STDOUT, + ) + + output = wait_for_lines_in_output(tcmDouble.stdout, ["(INFO):Process PID"]) + assert tcm.pid diff --git a/test/utils.py b/test/utils.py index e7ec46305..6286f5501 100644 --- a/test/utils.py +++ b/test/utils.py @@ -581,6 +581,14 @@ def is_cluster_app_supported(): return major >= 3 +def skip_if_tcm_not_supported(): + if not is_tarantool_ee(): + pytest.skip("Tarantool Enterprise required") + + if is_tarantool_less_3(): + pytest.skip("TCM not supported") + + def is_tuple_format_supported(): major, minor = get_tarantool_version() return major > 3 or (major == 3 and minor >= 2)