Skip to content

Commit

Permalink
command: fix blocking fifo env collector (#643)
Browse files Browse the repository at this point in the history
Fixes #635
  • Loading branch information
adambabik authored Aug 6, 2024
1 parent 760505e commit b73a0f5
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 30 deletions.
44 changes: 44 additions & 0 deletions internal/command/command_inline_shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
package command

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

runnerv2alpha1 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2alpha1"
)
Expand All @@ -19,6 +24,45 @@ func TestInlineShellCommand_CollectEnv(t *testing.T) {
testInlineShellCommandCollectEnv(t)
})

t.Run("KillCommandWhileUsingFifo", func(t *testing.T) {
envCollectorUseFifo = true

cfg := &ProgramConfig{
ProgramName: "bash",
Source: &runnerv2alpha1.ProgramConfig_Commands{
Commands: &runnerv2alpha1.ProgramConfig_CommandList{
Items: []string{
"export TEST_ENV=1",
"sleep 5",
},
},
},
Mode: runnerv2alpha1.CommandMode_COMMAND_MODE_INLINE,
}
sess := NewSession()
factory := NewFactory(WithLogger(zaptest.NewLogger(t)))

command, err := factory.Build(cfg, CommandOptions{Session: sess})
require.NoError(t, err)
err = command.Start(context.Background())
require.NoError(t, err)

errC := make(chan error, 1)
go func() {
<-time.After(time.Second)
errC <- command.Signal(os.Kill)
}()
err = <-errC
require.NoError(t, err)

err = command.Wait()
require.EqualError(t, err, "signal: killed")

got, ok := sess.GetEnv("TEST_ENV")
assert.False(t, ok)
assert.Equal(t, "", got)
})

t.Run("NonFifo", func(t *testing.T) {
envCollectorUseFifo = false
testInlineShellCommandCollectEnv(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/command/command_terminal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestTerminalCommand_EnvPropagation(t *testing.T) {
require.NoError(t, err)
// Wait for the prompt before sending the next command.
expectContainLine(t, stdout, "$")
_, err = stdinW.Write([]byte{0x04}) // EOT
_, err = stdinW.Write([]byte("exit\n"))
require.NoError(t, err)

require.NoError(t, cmd.Wait())
Expand Down
2 changes: 1 addition & 1 deletion internal/command/env_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func splitNull(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, 0); i >= 0 {
if i := bytes.IndexByte(data, byte(0)); i >= 0 {
// We have a full null-terminated line.
return i + 1, data[0:i], nil
}
Expand Down
74 changes: 69 additions & 5 deletions internal/command/env_collector_fifo_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ package command
import (
"encoding/hex"
"io"
"os"
"strings"
"syscall"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

var sentinel = []byte("\x00")

type envCollectorFifo struct {
encKey []byte
encNonce []byte
preEnv []string
postEnv []string
readersDone map[string]chan struct{}
readersGroup *errgroup.Group
scanner envScanner
temp *tempDirectory
Expand All @@ -33,11 +39,10 @@ func newEnvCollectorFifo(
}

c := &envCollectorFifo{
encKey: encKey,
encNonce: encNonce,
readersGroup: new(errgroup.Group),
scanner: scanner,
temp: temp,
encKey: encKey,
encNonce: encNonce,
scanner: scanner,
temp: temp,
}

if c.init() != nil {
Expand All @@ -58,15 +63,23 @@ func (c *envCollectorFifo) init() error {
return errors.Wrap(err, "failed to create the post-exit fifo")
}

c.readersDone = map[string]chan struct{}{
c.prePath(): make(chan struct{}),
c.postPath(): make(chan struct{}),
}
c.readersGroup = &errgroup.Group{}

c.readersGroup.Go(func() error {
var err error
c.preEnv, err = c.read(c.prePath())
close(c.readersDone[c.prePath()])
return err
})

c.readersGroup.Go(func() error {
var err error
c.postEnv, err = c.read(c.postPath())
close(c.readersDone[c.postPath()])
return err
})

Expand All @@ -75,9 +88,25 @@ func (c *envCollectorFifo) init() error {

func (c *envCollectorFifo) Diff() (changed []string, deleted []string, _ error) {
defer c.temp.Cleanup()

g := new(errgroup.Group)

g.Go(func() error {
return c.ensureReaderDone(c.prePath())
})

g.Go(func() error {
return c.ensureReaderDone(c.postPath())
})

if err := g.Wait(); err != nil {
return nil, nil, err
}

if err := c.readersGroup.Wait(); err != nil {
return nil, nil, err
}

return diffEnvs(c.preEnv, c.postEnv)
}

Expand Down Expand Up @@ -116,3 +145,38 @@ func (c *envCollectorFifo) read(path string) ([]string, error) {
defer r.Close()
return c.scanner(r)
}

func (c *envCollectorFifo) ensureReaderDone(path string) error {
for {
select {
case <-c.readersDone[path]:
return nil
case <-time.After(time.Millisecond * 100):
err := c.writeSentinel(path)
if err != nil {
if errors.Is(err, errFifoNotAvailable) {
continue
}
return err
}
return nil
}
}
}

var errFifoNotAvailable = errors.New("fifo not available")

func (c *envCollectorFifo) writeSentinel(name string) error {
f, err := os.OpenFile(name, os.O_WRONLY|syscall.O_NONBLOCK, 0o600)
if err != nil {
if strings.Contains(err.Error(), "device not configured") {
// The FIFO is not opened for reading yet, or it was already closed.
// This is expected when writing a sentinel and we can ignore the error.
return errFifoNotAvailable
}
return errors.WithStack(err)
}
defer f.Close()
_, _ = f.Write(sentinel)
return nil
}
10 changes: 10 additions & 0 deletions internal/command/env_collector_fifo_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,13 @@ func TestEnvCollectorFifo(t *testing.T) {
require.Equal(t, []string{"ENV_2=2"}, changedEnv)
require.Equal(t, []string{"ENV_1"}, deletedEnv)
}

func TestEnvCollectorFifoWithoutWriter(t *testing.T) {
t.Parallel()

collector, err := newEnvCollectorFifo(scanEnv, nil, nil)
require.NoError(t, err)

_, _, err = collector.Diff()
require.NoError(t, err)
}
File renamed without changes.
2 changes: 1 addition & 1 deletion internal/command/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var (
envCollectorEnableEncryption = true
envCollectorUseFifo = false
envCollectorUseFifo = true
)

type CommandOptions struct {
Expand Down
47 changes: 25 additions & 22 deletions internal/runnerv2service/service_execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,38 +847,41 @@ func TestRunnerServiceServerExecute_WithStop(t *testing.T) {
errc := make(chan error)
go func() {
defer close(errc)
time.Sleep(500 * time.Millisecond)
time.Sleep(time.Second)
err := stream.Send(&runnerv2alpha1.ExecuteRequest{
Stop: runnerv2alpha1.ExecuteStop_EXECUTE_STOP_INTERRUPT,
})
errc <- err
}()
assert.NoError(t, <-errc)

result := <-execResult

// TODO(adamb): There should be no error.
assert.Contains(t, result.Err.Error(), "signal: interrupt")
assert.Equal(t, 130, result.ExitCode)
select {
case result := <-execResult:
// TODO(adamb): There should be no error.
assert.Contains(t, result.Err.Error(), "signal: interrupt")
assert.Equal(t, 130, result.ExitCode)

// Send one more request to make sure that the server
// is still running after sending SIGINT.
stream, err = client.Execute(context.Background())
require.NoError(t, err)
// Send one more request to make sure that the server
// is still running after sending SIGINT.
stream, err = client.Execute(context.Background())
require.NoError(t, err)

execResult = make(chan executeResult)
go getExecuteResult(stream, execResult)
execResult = make(chan executeResult)
go getExecuteResult(stream, execResult)

err = stream.Send(&runnerv2alpha1.ExecuteRequest{
Config: &runnerv2alpha1.ProgramConfig{
ProgramName: "echo",
Arguments: []string{"-n", "1"},
Mode: runnerv2alpha1.CommandMode_COMMAND_MODE_INLINE,
},
})
require.NoError(t, err)
result = <-execResult
assert.Equal(t, "1", string(result.Stdout))
err = stream.Send(&runnerv2alpha1.ExecuteRequest{
Config: &runnerv2alpha1.ProgramConfig{
ProgramName: "echo",
Arguments: []string{"-n", "1"},
Mode: runnerv2alpha1.CommandMode_COMMAND_MODE_INLINE,
},
})
require.NoError(t, err)
result = <-execResult
assert.Equal(t, "1", string(result.Stdout))
case <-time.After(5 * time.Second):
t.Fatal("expected the response early as the command got interrupted")
}
}

func TestRunnerServiceServerExecute_Winsize(t *testing.T) {
Expand Down

0 comments on commit b73a0f5

Please sign in to comment.