Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Feed the stdout and stderr of the Pilot subprocess into glog #167

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/pilot/cassandra/v3/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v3

import (
"fmt"
"os"
"os/exec"

"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -56,8 +55,6 @@ func (p *Pilot) CmdFunc(pilot *v1alpha1.Pilot) (*exec.Cmd, error) {
// The /run.sh script is unique to gcr.io/google-samples/cassandra:v12.
// TODO: Add support for other Cassandra images with different entry points.
cmd := exec.Command("/run.sh")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd, nil
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/pilot/elasticsearch/v5/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ func (p *Pilot) CmdFunc(pilot *v1alpha1.Pilot) (*exec.Cmd, error) {
}

cmd := exec.Command("elasticsearch")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = p.env().Strings()

return cmd, nil
Expand Down
50 changes: 49 additions & 1 deletion pkg/pilot/genericpilot/processmanager/process.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package processmanager

import (
"bufio"
"fmt"
"os"
"os/exec"
"sync"

"github.com/golang/glog"
)

type Interface interface {
Expand Down Expand Up @@ -41,13 +45,56 @@ type adapter struct {

doneCh chan struct{}
doneErr error
wg sync.WaitGroup
}

var _ Interface = &adapter{}

func (p *adapter) startCommandOutputLoggers() error {
stdout, err := p.cmd.StdoutPipe()
if err != nil {
return err
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
in := bufio.NewScanner(stdout)
for in.Scan() {
glog.Infoln(in.Text())
}
err := in.Err()
if err != nil {
glog.Error(err)
}
}()

stderr, err := p.cmd.StderrPipe()
if err != nil {
return err
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
in := bufio.NewScanner(stderr)
for in.Scan() {
glog.Errorln(in.Text())
}
err := in.Err()
if err != nil {
glog.Error(err)
}
}()
return nil
}

// Start will start the underlying subprocess
func (p *adapter) Start() error {
if err := p.cmd.Start(); err != nil {
err := p.startCommandOutputLoggers()
if err != nil {
return err
}

if err = p.cmd.Start(); err != nil {
return fmt.Errorf("error starting process: %s", err.Error())
}
go p.startWait()
Expand All @@ -71,6 +118,7 @@ func (p *adapter) Stop() error {
// If the subprocess has not been started yet, the returned chan will
// not close until the subprocess has been started and then stopped.
func (p *adapter) Wait() <-chan struct{} {
defer p.wg.Wait()
return p.doneCh
}

Expand Down