Skip to content

Commit

Permalink
Merge branch 'issue-411' (fix #411)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysd committed Apr 12, 2024
2 parents a707edc + bf5fbfa commit 59569c8
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 61 deletions.
18 changes: 9 additions & 9 deletions linter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ func TestLinterLintOK(t *testing.T) {
}

proj := &Project{root: dir}
shellcheck, err := execabs.LookPath("shellcheck")
if err != nil {
t.Skip("skipped because \"shellcheck\" command does not exist in system")
}

pyflakes, err := execabs.LookPath("pyflakes")
if err != nil {
t.Skip("skipped because \"pyflakes\" command does not exist in system")
}
shellcheck, _ := execabs.LookPath("shellcheck")
pyflakes, _ := execabs.LookPath("pyflakes")

for _, f := range fs {
t.Run(filepath.Base(f), func(t *testing.T) {
if strings.Contains(f, "shellcheck") && shellcheck == "" {
t.Skip("skipping", f, "because \"shellcheck\" command does not exist in system")
}
if strings.Contains(f, "pyflakes") && pyflakes == "" {
t.Skip("skipping", f, "because \"pyflakes\" command does not exist in system")
}

opts := LinterOptions{
Shellcheck: shellcheck,
Pyflakes: pyflakes,
Expand Down
97 changes: 61 additions & 36 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,52 @@ import (
"golang.org/x/sys/execabs"
)

// concurrentProcess is a manager to run process concurrently. Since running process consumes OS
// resources, running too many processes concurrently causes some issues. On macOS, making too many
// process makes the parent process hang (see issue #3). And running processes which open files can
// cause the error "pipe: too many files to open". To avoid it, this type manages how many processes
// are run at once.
type concurrentProcess struct {
ctx context.Context
sema *semaphore.Weighted
wg sync.WaitGroup
}

// newConcurrentProcess creates a new ConcurrentProcess instance. The `par` argument represents how
// many processes can be run in parallel. It is recommended to use the value returned from
// runtime.NumCPU() for the argument.
func newConcurrentProcess(par int) *concurrentProcess {
return &concurrentProcess{
ctx: context.Background(),
sema: semaphore.NewWeighted(int64(par)),
}
// cmdExecution represents a single command line execution.
type cmdExecution struct {
cmd string
args []string
stdin string
combineOutput bool
}

func runProcessWithStdin(exe string, args []string, stdin string) ([]byte, error) {
cmd := exec.Command(exe, args...)
func (e *cmdExecution) run() ([]byte, error) {
cmd := exec.Command(e.cmd, e.args...)
cmd.Stderr = nil

p, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("could not make stdin pipe for %s process: %w", exe, err)
return nil, fmt.Errorf("could not make stdin pipe for %s process: %w", e.cmd, err)
}
if _, err := io.WriteString(p, stdin); err != nil {
if _, err := io.WriteString(p, e.stdin); err != nil {
p.Close()
return nil, fmt.Errorf("could not write to stdin of %s process: %w", exe, err)
return nil, fmt.Errorf("could not write to stdin of %s process: %w", e.cmd, err)
}
p.Close()

stdout, err := cmd.Output()
var stdout []byte
if e.combineOutput {
stdout, err = cmd.CombinedOutput()
} else {
stdout, err = cmd.Output()
}

if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
code := exitErr.ExitCode()

stderr := exitErr.Stderr
if e.combineOutput {
stderr = stdout
}

if code < 0 {
return nil, fmt.Errorf("%s was terminated. stderr: %q", exe, exitErr.Stderr)
return nil, fmt.Errorf("%s was terminated. stderr: %q", e.cmd, stderr)
}

if len(stdout) == 0 {
return nil, fmt.Errorf("%s exited with status %d but stdout was empty. stderr: %q", exe, code, exitErr.Stderr)
return nil, fmt.Errorf("%s exited with status %d but stdout was empty. stderr: %q", e.cmd, code, stderr)
}

// Reaches here when exit status is non-zero and stdout is not empty, shellcheck successfully found some errors
} else {
return nil, err
Expand All @@ -66,12 +67,33 @@ func runProcessWithStdin(exe string, args []string, stdin string) ([]byte, error
return stdout, nil
}

func (proc *concurrentProcess) run(eg *errgroup.Group, exe string, args []string, stdin string, callback func([]byte, error) error) {
// concurrentProcess is a manager to run process concurrently. Since running process consumes OS
// resources, running too many processes concurrently causes some issues. On macOS, making too many
// process makes the parent process hang (see issue #3). And running processes which open files can
// cause the error "pipe: too many files to open". To avoid it, this type manages how many processes
// are run at once.
type concurrentProcess struct {
ctx context.Context
sema *semaphore.Weighted
wg sync.WaitGroup
}

// newConcurrentProcess creates a new ConcurrentProcess instance. The `par` argument represents how
// many processes can be run in parallel. It is recommended to use the value returned from
// runtime.NumCPU() for the argument.
func newConcurrentProcess(par int) *concurrentProcess {
return &concurrentProcess{
ctx: context.Background(),
sema: semaphore.NewWeighted(int64(par)),
}
}

func (proc *concurrentProcess) run(eg *errgroup.Group, exec *cmdExecution, callback func([]byte, error) error) {
proc.sema.Acquire(proc.ctx, 1)
proc.wg.Add(1)
eg.Go(func() error {
defer proc.wg.Done()
stdout, err := runProcessWithStdin(exe, args, stdin)
stdout, err := exec.run()
proc.sema.Release(1)
return callback(stdout, err)
})
Expand All @@ -84,14 +106,15 @@ func (proc *concurrentProcess) wait() {

// newCommandRunner creates new external command runner for given executable. The executable path
// is resolved in this function.
func (proc *concurrentProcess) newCommandRunner(exe string) (*externalCommand, error) {
func (proc *concurrentProcess) newCommandRunner(exe string, combineOutput bool) (*externalCommand, error) {
p, err := execabs.LookPath(exe)
if err != nil {
return nil, err
}
cmd := &externalCommand{
proc: proc,
exe: p,
proc: proc,
exe: p,
combineOutput: combineOutput,
}
return cmd, nil
}
Expand All @@ -101,16 +124,18 @@ func (proc *concurrentProcess) newCommandRunner(exe string) (*externalCommand, e
// by using errgroup.Group. The wait() method must be called at the end for checking if some fatal
// error occurred.
type externalCommand struct {
proc *concurrentProcess
eg errgroup.Group
exe string
proc *concurrentProcess
eg errgroup.Group
exe string
combineOutput bool
}

// run runs the command with given arguments and stdin. The callback function is called after the
// process runs. First argument is stdout and the second argument is an error while running the
// process.
func (cmd *externalCommand) run(args []string, stdin string, callback func([]byte, error) error) {
cmd.proc.run(&cmd.eg, cmd.exe, args, stdin, callback)
exec := &cmdExecution{cmd.exe, args, stdin, cmd.combineOutput}
cmd.proc.run(&cmd.eg, exec, callback)
}

// wait waits until all goroutines for this command finish. Note that it does not wait for
Expand Down
81 changes: 68 additions & 13 deletions process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic" // Note: atomic.Bool was added at Go 1.19
"testing"
"time"
)

func testStartEchoCommand(t *testing.T, proc *concurrentProcess, done *bool) {
func testStartEchoCommand(t *testing.T, proc *concurrentProcess, done *atomic.Bool) {
t.Helper()

*done = false
done.Store(false)
echo := testSkipIfNoCommand(t, proc, "echo")
echo.run([]string{}, "", func(b []byte, err error) error {
if err != nil {
t.Error(err)
return err
}
*done = true
done.Store(true)
return nil
})
// This function does not wait the command finishes
}

func testSkipIfNoCommand(t *testing.T, p *concurrentProcess, cmd string) *externalCommand {
t.Helper()
c, err := p.newCommandRunner(cmd)
c, err := p.newCommandRunner(cmd, false)
if err != nil {
t.Skipf("%s command is necessary to run this test: %s", cmd, err)
}
Expand Down Expand Up @@ -218,8 +219,8 @@ func TestProcessErrorCommandNotFound(t *testing.T) {
return nil
})

var echoDone bool
testStartEchoCommand(t, p, &echoDone)
echoDone := &atomic.Bool{}
testStartEchoCommand(t, p, echoDone)

err := c.wait()
if err == nil || !strings.Contains(err.Error(), "yay! error found!") {
Expand All @@ -228,7 +229,7 @@ func TestProcessErrorCommandNotFound(t *testing.T) {

p.wait()

if !echoDone {
if !echoDone.Load() {
t.Fatal("a command following the error did not run")
}
}
Expand All @@ -245,8 +246,8 @@ func TestProcessErrorInCallback(t *testing.T) {
return fmt.Errorf("dummy error")
})

var echoDone bool
testStartEchoCommand(t, p, &echoDone)
echoDone := &atomic.Bool{}
testStartEchoCommand(t, p, echoDone)

err := echo.wait()
if err == nil || err.Error() != "dummy error" {
Expand All @@ -255,7 +256,7 @@ func TestProcessErrorInCallback(t *testing.T) {

p.wait()

if !echoDone {
if !echoDone.Load() {
t.Fatal("a command following the error did not run")
}
}
Expand All @@ -275,8 +276,8 @@ func TestProcessErrorLinterFailed(t *testing.T) {
return nil
})

var echoDone bool
testStartEchoCommand(t, p, &echoDone)
echoDone := &atomic.Bool{}
testStartEchoCommand(t, p, echoDone)

err := ls.wait()
if err == nil {
Expand All @@ -289,7 +290,7 @@ func TestProcessErrorLinterFailed(t *testing.T) {

p.wait()

if !echoDone {
if !echoDone.Load() {
t.Fatal("a command following the error did not run")
}
}
Expand All @@ -316,3 +317,57 @@ func TestProcessRunConcurrentlyAndWait(t *testing.T) {

p.wait()
}

func TestProcessCombineStdoutAndStderr(t *testing.T) {
p := newConcurrentProcess(1)
bash := testSkipIfNoCommand(t, p, "bash")
bash.combineOutput = true
script := "echo 'hello stdout'; echo 'hello stderr' >&2"
done := make(chan string)

bash.run([]string{"-c", script}, "", func(b []byte, err error) error {
if err != nil {
t.Fatal(err)
return err
}
done <- string(b)
return nil
})

out := <-done
if err := bash.wait(); err != nil {
t.Fatal(err)
}
p.wait()

if !strings.Contains(out, "hello stdout") {
t.Errorf("stdout was not captured: %q", out)
}
if !strings.Contains(out, "hello stderr") {
t.Errorf("stderr was not captured: %q", out)
}
}

func TestProcessCommandExitStatusNonZero(t *testing.T) {
p := newConcurrentProcess(1)
bash := testSkipIfNoCommand(t, p, "false")
done := make(chan error)

bash.run([]string{}, "", func(b []byte, err error) error {
done <- err
return nil
})

err := <-done
if err := bash.wait(); err != nil {
t.Fatal(err)
}
p.wait()
if err == nil {
t.Fatal("Error did not happen")
}
msg := err.Error()
if !strings.Contains(msg, "exited with status 1") {
t.Fatalf("Unexpected error happened: %q", msg)
}
}
14 changes: 12 additions & 2 deletions rule_pyflakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func newRulePyflakes(cmd *externalCommand) *RulePyflakes {
// or relative/absolute file path. When the given executable is not found in system, it returns
// an error.
func NewRulePyflakes(executable string, proc *concurrentProcess) (*RulePyflakes, error) {
cmd, err := proc.newCommandRunner(executable)
// Combine output because pyflakes outputs lint errors to stdout and outputs syntax errors to stderr. (#411)
cmd, err := proc.newCommandRunner(executable, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -141,7 +142,16 @@ func (rule *RulePyflakes) parseNextError(stdout []byte, pos *Pos) ([]byte, error
// Eat "<stdin>:"
idx := bytes.Index(b, []byte("<stdin>:"))
if idx == -1 {
return nil, fmt.Errorf("error message from pyflakes does not start with \"<stdin>:\" while checking script at %s. stdout:\n%s", pos, stdout)
// Syntax errors from pyflake are consist of multiple lines. Skip parsing subsequent lines. (#411)
// ```
// <stdin>:1:7: unexpected EOF while parsing
// print(
// ^
// ```
if idx := bytes.IndexByte(b, '\n'); idx >= 0 {
return b[idx+1:], nil
}
return nil, nil
}
b = b[idx+len("<stdin>:"):]

Expand Down
2 changes: 1 addition & 1 deletion rule_shellcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newRuleShellcheck(cmd *externalCommand) *RuleShellcheck {
// name or relative/absolute file path. When the given executable is not found in system, it returns
// an error as 2nd return value.
func NewRuleShellcheck(executable string, proc *concurrentProcess) (*RuleShellcheck, error) {
cmd, err := proc.newCommandRunner(executable)
cmd, err := proc.newCommandRunner(executable, false)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions testdata/err/pyflakes_job_default_shell.out
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/test\.yaml:9:9: pyflakes reported issue in this script: .+ \[pyflakes\]/
9 changes: 9 additions & 0 deletions testdata/err/pyflakes_job_default_shell.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
on: push
jobs:
job-level:
runs-on: ubuntu-latest
defaults:
run:
shell: python
steps:
- run: print(unknown_var)
3 changes: 3 additions & 0 deletions testdata/err/pyflakes_step_shell.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/test\.yaml:7:9: pyflakes reported issue in this script: .+ \[pyflakes\]/
/test\.yaml:10:9: pyflakes reported issue in this script: .+ \[pyflakes\]/
/test\.yaml:13:9: pyflakes reported issue in this script: .+ \[pyflakes\]/
Loading

0 comments on commit 59569c8

Please sign in to comment.