Skip to content

Commit

Permalink
refactor code and add some documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
floreks committed May 15, 2024
1 parent 3be8730 commit d12aa6e
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 93 deletions.
4 changes: 2 additions & 2 deletions cmd/harness/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ var (
argStackRunID = pflag.String("stack-run-id", helpers.GetPluralEnv(EnvStackRunID, ""), "ID of the Stack Run to execute")
argWorkingDir = pflag.String("working-dir", helpers.GetPluralEnv(EnvWorkingDir, defaultWorkingDir), "Working directory used to prepare the environment")
argTimeout = pflag.String("timeout", helpers.GetPluralEnv(EnvTimeout, defaultTimeout), "Timeout after which run will be cancelled")
argLogFlushFrequency = pflag.String("log-flush-frequency", helpers.GetPluralEnv(EnvLogFlushFrequency, defaultLogFlushFrequency), "")
argLogFlushBufferSize = pflag.Int("log-flush-buffer-size", helpers.ParseIntOrDie(helpers.GetPluralEnv(EnvLogFlushBufferSize, defaultLogFlushBufferSize)), "")
argLogFlushFrequency = pflag.String("log-flush-frequency", helpers.GetPluralEnv(EnvLogFlushFrequency, defaultLogFlushFrequency), "Frequency at which logs should be flushed if buffer is not full")
argLogFlushBufferSize = pflag.Int("log-flush-buffer-size", helpers.ParseIntOrDie(helpers.GetPluralEnv(EnvLogFlushBufferSize, defaultLogFlushBufferSize)), "Buffer size to use for log flushing (in kilobytes)")
)

func init() {
Expand Down
92 changes: 92 additions & 0 deletions internal/helpers/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package helpers

import (
"bytes"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
)

// Buffer is an internal buffer wrapper to ensure that Write/Read
// operations are thread-safe and cannot be run at the same time.
// It also starts a small goroutine that notifies about buffer
// size changes through channel. See Updated method for more information.
type Buffer struct {
*bytes.Buffer

size int
lastSize int
updateChan chan struct{}
mu sync.Mutex
}

// Write implements io.Writer interface.
// It ensures thread-safe execution.
func (b *Buffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
n, err = b.Buffer.Write(p)
b.size = b.Buffer.Len()
return
}

// Next overrides bytes.Buffer method.
// It ensures thread-safe execution.
func (b *Buffer) Next(n int) []byte {
b.mu.Lock()
defer b.mu.Unlock()
return b.Buffer.Next(n)
}

// ReadBytes overrides bytes.Buffer method.
// It ensures thread-safe execution.
func (b *Buffer) ReadBytes(delim byte) (line []byte, err error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.Buffer.ReadBytes(delim)
}

// String overrides bytes.Buffer method.
// It ensures thread-safe execution.
func (b *Buffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.Buffer.String()
}

// Len overrides bytes.Buffer method.
// It ensures thread-safe execution.
func (b *Buffer) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.Buffer.Len()
}

// Updated returns a channel that receives signal
// every time buffer size gets updated.
func (b *Buffer) Updated() chan struct{} {
return b.updateChan
}

func (b *Buffer) startSizeWatcher() {
go wait.Until(func() {
if b.size != b.lastSize {
b.updateChan <- struct{}{}
b.lastSize = b.size
}
}, 100*time.Millisecond, wait.NeverStop)
}

func (b *Buffer) init() *Buffer {
b.startSizeWatcher()

return b
}

func NewBuffer() *Buffer {
return (&Buffer{
Buffer: bytes.NewBuffer([]byte{}),
updateChan: make(chan struct{}),
}).init()
}
51 changes: 30 additions & 21 deletions pkg/harness/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,39 @@ func (in *stackRunController) executables(ctx context.Context) []exec.Executable
})

return algorithms.Map(in.stackRun.Steps, func(step *gqlclient.RunStepFragment) exec.Executable {
in.wg.Add(1)
consoleWriter := sink.NewConsoleLogWriter(
ctx,
in.consoleClient,
append(
in.sinkOptions,
sink.WithID(step.ID),
sink.WithOnFinish(in.onLogWriterFinish),
sink.WithStopChan(in.stopChan),
)...,
)

return exec.NewExecutable(
step.Cmd,
exec.WithDir(in.dir),
exec.WithEnv(in.stackRun.Env()),
exec.WithArgs(step.Args),
exec.WithArgsModifier(in.tool.Modifier(step.Stage).Args),
exec.WithID(step.ID),
exec.WithCustomOutputSink(consoleWriter),
)
return in.toExecutable(ctx, step)
})
}

func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient.RunStepFragment) exec.Executable {
in.wg.Add(1)
consoleWriter := sink.NewConsoleLogWriter(
ctx,
in.consoleClient,
append(
in.sinkOptions,
sink.WithID(step.ID),
sink.WithOnFinish(in.onLogWriterFinish),
sink.WithStopChan(in.stopChan),
)...,
)

argsModifier := in.tool.Modifier(step.Stage).Args
args := step.Args
if argsModifier != nil {
args = argsModifier(args)
}

return exec.NewExecutable(
step.Cmd,
exec.WithDir(in.dir),
exec.WithEnv(in.stackRun.Env()),
exec.WithArgs(args),
exec.WithID(step.ID),
exec.WithLogSink(consoleWriter),
)
}

func (in *stackRunController) onLogWriterFinish() {
klog.V(log.LogLevelTrace).InfoS("log writer finished")
in.wg.Done()
Expand Down
32 changes: 17 additions & 15 deletions pkg/harness/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ import (
)

func (in *executable) Run(ctx context.Context) error {
cmd := exec.CommandContext(ctx, in.command, in.arguments()...)
writer := in.writer()
cmd := exec.CommandContext(ctx, in.command, in.args...)
w := in.writer()
defer in.close(in.logSink)

// Configure additional writers so that we can simultaneously write output
// to multiple destinations
// Note: We need to use the same writer for stdout and stderr to guarantee
// thread-safe writing, otherwise output from stdout and stderr could be
// written concurrently and get reordered.
cmd.Stdout = writer
cmd.Stderr = writer
cmd.Stdout = w
cmd.Stderr = w

// Configure environment of the executable.
// Root process environment is used as a base and passed in env vars
Expand All @@ -41,7 +42,7 @@ func (in *executable) Run(ctx context.Context) error {
}

func (in *executable) RunWithOutput(ctx context.Context) ([]byte, error) {
cmd := exec.CommandContext(ctx, in.command, in.arguments()...)
cmd := exec.CommandContext(ctx, in.command, in.args...)

// Configure environment of the executable.
// Root process environment is used as a base and passed in env vars
Expand All @@ -58,7 +59,7 @@ func (in *executable) RunWithOutput(ctx context.Context) ([]byte, error) {
}

func (in *executable) Command() string {
return fmt.Sprintf("%s %s", in.command, strings.Join(in.arguments(), " "))
return fmt.Sprintf("%s %s", in.command, strings.Join(in.args, " "))
}

func (in *executable) ID() string {
Expand All @@ -69,20 +70,21 @@ func (in *executable) ID() string {
return in.id
}

func (in *executable) arguments() []string {
if in.argsModifier != nil {
return in.argsModifier(in.args)
func (in *executable) writer() io.Writer {
if in.logSink != nil {
return io.MultiWriter(os.Stdout, in.logSink)
}

return in.args
return os.Stdout
}

func (in *executable) writer() io.Writer {
if in.standardLogSink != nil {
return io.MultiWriter(os.Stdout, in.standardLogSink)
func (in *executable) close(w io.WriteCloser) {
if w == nil {
return
}

return os.Stdout
if err := w.Close(); err != nil {
klog.ErrorS(err, "failed to close writer")
}
}

func NewExecutable(command string, options ...Option) Executable {
Expand Down
10 changes: 2 additions & 8 deletions pkg/harness/exec/exec_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ func WithDir(workingDirectory string) Option {
}
}

func WithCustomOutputSink(sink io.Writer) Option {
func WithLogSink(sink io.WriteCloser) Option {
return func(e *executable) {
e.standardLogSink = sink
e.logSink = sink
}
}

Expand All @@ -28,12 +28,6 @@ func WithArgs(args []string) Option {
}
}

func WithArgsModifier(f ArgsModifier) Option {
return func(e *executable) {
e.argsModifier = f
}
}

func WithID(id string) Option {
return func(e *executable) {
e.id = id
Expand Down
9 changes: 4 additions & 5 deletions pkg/harness/exec/exec_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ type executable struct {
// args
args []string

// argsModifier
argsModifier ArgsModifier

// standardLogSink
standardLogSink io.Writer
// logSink is a custom writer that can be used to forward
// executable output. It does not stop output from being forwarded
// to the os.Stdout.
logSink io.WriteCloser
}

type Option func(*executable)
Expand Down
Loading

0 comments on commit d12aa6e

Please sign in to comment.