Skip to content

Commit

Permalink
update console writer
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz committed Apr 29, 2024
1 parent 4fc237c commit 748bfc3
Show file tree
Hide file tree
Showing 15 changed files with 439 additions and 47 deletions.
30 changes: 15 additions & 15 deletions cmd/harness/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,33 @@ import (
)

const (
EnvConsoleUrl = "CONSOLE_URL"
EnvConsoleToken = "CONSOLE_TOKEN"
EnvStackRunID = "STACK_RUN_ID"
EnvWorkingDir = "WORKING_DIR"
EnvTimeout = "TIMEOUT"
EnvLogFlushFrequency = "LOG_FLUSH_FREQUENCY"
EnvConsoleUrl = "CONSOLE_URL"
EnvConsoleToken = "CONSOLE_TOKEN"
EnvStackRunID = "STACK_RUN_ID"
EnvWorkingDir = "WORKING_DIR"
EnvTimeout = "TIMEOUT"
EnvLogFlushFrequency = "LOG_FLUSH_FREQUENCY"
EnvLogFlushBufferSize = "LOG_FLUSH_BUFFER_SIZE"

defaultWorkingDir = "stackrun"

// Defaults to 180 minute for run cancellation
defaultTimeout = "180m"
defaultTimeout = "180m"
defaultTimeoutDuration = 180 * time.Minute

// Log related defaults
defaultLogFlushFrequency = "5s"
defaultLogFlushFrequency = "5s"
defaultLogFlushFrequencyDuration = 5 * time.Second
defaultLogFlushBufferSize = "4096"
defaultLogFlushBufferSize = "4096"
)

var (
argConsoleUrl = pflag.String("console-url", helpers.GetPluralEnv(EnvConsoleUrl, ""), "URL to the extended Console API, i.e. https://console.onplural.sh/ext/gql")
argConsoleToken = pflag.String("console-token", helpers.GetPluralEnv(EnvConsoleToken, ""), "Deploy token to the Console API")
argStackRunID = pflag.String("stack-run-id", helpers.GetPluralEnv(EnvStackRunID, ""), "ID of the Stack Run to execute")
argWorkingDir = pflag.String("working-dir", helpers.GetPluralEnv(EnvWorkingDir, defaultWorkingDir), "Working directory used to prepare the environment")
argTimeout = pflag.String("timeout", helpers.GetPluralEnv(EnvTimeout, defaultTimeout), "Timeout after which run will be cancelled")
argLogFlushFrequency = pflag.String("log-flush-frequency", helpers.GetPluralEnv(EnvLogFlushFrequency, defaultLogFlushFrequency), "")
argConsoleUrl = pflag.String("console-url", helpers.GetPluralEnv(EnvConsoleUrl, ""), "URL to the extended Console API, i.e. https://console.onplural.sh/ext/gql")
argConsoleToken = pflag.String("console-token", helpers.GetPluralEnv(EnvConsoleToken, ""), "Deploy token to the Console API")
argStackRunID = pflag.String("stack-run-id", helpers.GetPluralEnv(EnvStackRunID, ""), "ID of the Stack Run to execute")
argWorkingDir = pflag.String("working-dir", helpers.GetPluralEnv(EnvWorkingDir, defaultWorkingDir), "Working directory used to prepare the environment")
argTimeout = pflag.String("timeout", helpers.GetPluralEnv(EnvTimeout, defaultTimeout), "Timeout after which run will be cancelled")
argLogFlushFrequency = pflag.String("log-flush-frequency", helpers.GetPluralEnv(EnvLogFlushFrequency, defaultLogFlushFrequency), "")
argLogFlushBufferSize = pflag.Int("log-flush-buffer-size", helpers.ParseIntOrDie(helpers.GetPluralEnv(EnvLogFlushBufferSize, defaultLogFlushBufferSize)), "")
)

Expand Down
1 change: 1 addition & 0 deletions cmd/harness/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func main() {
)

ctrl, err := controller.NewStackRunController(
controller.WithContext(ctx),
controller.WithStackRun(args.StackRunID()),
controller.WithConsoleClient(consoleClient),
controller.WithFetchClient(fetchClient),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/osteele/liquid v1.3.2
github.com/pkg/errors v0.9.1
github.com/pluralsh/console-client-go v0.5.0
github.com/pluralsh/console-client-go v0.5.2
github.com/pluralsh/controller-reconcile-helper v0.0.4
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
github.com/pluralsh/polly v0.1.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pluralsh/console-client-go v0.5.0 h1:MfhO7tZMFbeYasb0YShZY+HCDOtJ7SPWW8KUTWzODnc=
github.com/pluralsh/console-client-go v0.5.0/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/console-client-go v0.5.2 h1:vDiKzZ/vPFivr9TIXSSi/6Q1nOrH4y1huE5XkrCJ3D0=
github.com/pluralsh/console-client-go v0.5.2/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E=
github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s=
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=
Expand Down
2 changes: 1 addition & 1 deletion internal/helpers/fetch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

const (
defaultFetchTmpDirPattern = "fetch"
defaultFetchTimeout = 15 * time.Second
defaultFetchTimeout = 15 * time.Second
)

type FetchOption func(*fetchClient)
Expand Down
11 changes: 5 additions & 6 deletions pkg/harness/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,12 @@ func (in *stackRunController) executables() []exec.Executable {
return cmp.Compare(s1.Index, s2.Index)
})

// Initialize a single console writer for all executables
consoleWriter := sink.NewConsoleLogWriter(
in.consoleClient,
append(in.sinkOptions, sink.WithID(in.stackRunID))...,
)

return algorithms.Map(in.stackRun.Steps, func(step *gqlclient.RunStepFragment) exec.Executable {
consoleWriter := sink.NewConsoleLogWriter(
in.ctx,
in.consoleClient,
append(in.sinkOptions, sink.WithID(step.ID))...,
)
return exec.NewExecutable(
step.Cmd,
exec.WithDir(in.dir),
Expand Down
8 changes: 8 additions & 0 deletions pkg/harness/controller/controller_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"context"

"github.com/pluralsh/deployment-operator/internal/helpers"
console "github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/harness/sink"
Expand Down Expand Up @@ -35,3 +37,9 @@ func WithSinkOptions(options ...sink.Option) Option {
s.sinkOptions = options
}
}

func WithContext(ctx context.Context) Option {
return func(s *stackRunController) {
s.ctx = ctx
}
}
2 changes: 2 additions & 0 deletions pkg/harness/controller/controller_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Controller interface {
}

type stackRunController struct {
ctx context.Context

sync.Mutex

// errChan
Expand Down
2 changes: 1 addition & 1 deletion pkg/harness/controller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (in *executor) run(ctx context.Context, executable exec.Executable) (retErr
in.preRun(executable.ID())

if err := executable.Run(ctx); err != nil {
retErr = fmt.Errorf("command execution failed: %s: err: %s", executable.Command(), err)
retErr = fmt.Errorf("command execution failed: %s: err: %w", executable.Command(), err)
}

return in.postRun(executable.ID(), retErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/harness/controller/executor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

type executor struct {
startQueue []exec.Executable
start sync.Mutex
started bool
start sync.Mutex
started bool

// errChan is the error channel passed by the caller
// when the executor is created.
Expand Down
8 changes: 4 additions & 4 deletions pkg/harness/errors/error.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package errors

import (
"errors"
"errors"
)

var (
ErrTimeout = errors.New("timed out")
ErrTimeout = errors.New("timed out")
ErrRemoteCancel = errors.New("cancelled remotely")
ErrNotFound = errors.New("resource not found")
ErrTerminated = errors.New("process has been terminated")
ErrNotFound = errors.New("resource not found")
ErrTerminated = errors.New("process has been terminated")
)
4 changes: 2 additions & 2 deletions pkg/harness/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (in *executable) stdout() io.Writer {
func NewExecutable(command string, options ...Option) Executable {
result := &executable{
command: command,
args: make([]string, 0),
env: make([]string, 0),
args: make([]string, 0),
env: make([]string, 0),
}

for _, o := range options {
Expand Down
36 changes: 26 additions & 10 deletions pkg/harness/sink/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sink

import (
"bytes"
"context"
"io"
"time"

Expand All @@ -12,36 +13,45 @@ import (
)

func (in *ConsoleWriter) Write(p []byte) (n int, err error) {
n, err = in.Buffer.Write(p)
in.bufferSizeChan <- in.Buffer.Len()
n, err = in.buffer.Write(p)
in.bufferSizeChan <- in.buffer.Len()
return
}

// bufferedFlush sends logs to the console only when available
// logs size is greater or equal to bufferSizeLimit.
func (in *ConsoleWriter) bufferedFlush() {
n := in.Buffer.Len()
n := in.buffer.Len()
if n < in.bufferSizeLimit {
return
}

klog.V(log.LogLevelTrace).InfoS("flushing logs", "buffer_size", n, "limit", in.bufferSizeLimit)
// flush logs
read := n
if read > in.bufferSizeLimit {
read = in.bufferSizeLimit
}
if err := in.client.AddStackRunLogs(in.id, string(in.buffer.Next(read))); err != nil {
klog.Error(err)
}
}

// flush sends logs to the console.
// When ignoreLimit is true it send all available logs to the console,
// otherwise it sends logs up to the bufferSizeLimit.
func (in *ConsoleWriter) flush(ignoreLimit bool) {
n := in.Buffer.Len()
n := in.buffer.Len()
if n <= 0 {
return
}

if ignoreLimit {
klog.V(log.LogLevelTrace).InfoS("flushing all remaining logs", "buffer_size", n)

// flush all logs
if err := in.client.AddStackRunLogs(in.id, in.buffer.String()); err != nil {
klog.Error(err)
}
return
}

Expand All @@ -51,16 +61,21 @@ func (in *ConsoleWriter) flush(ignoreLimit bool) {
if read > in.bufferSizeLimit {
read = in.bufferSizeLimit
}

_ = string(in.Buffer.Next(read))
if err := in.client.AddStackRunLogs(in.id, string(in.buffer.Next(read))); err != nil {
klog.Error(err)
}
}

func (in *ConsoleWriter) readAsync() {
if in.ticker == nil {
return
}
defer in.ticker.Stop()

for {
// TODO: add case for graceful shutdown and try to flush remaining logs before exit
select {
case <-in.ctx.Done():
in.flush(true)
case <-in.bufferSizeChan:
in.bufferedFlush()
case <-in.ticker.C:
Expand All @@ -84,9 +99,10 @@ func (in *ConsoleWriter) init() io.Writer {
return in
}

func NewConsoleLogWriter(client console.Client, options ...Option) io.Writer {
func NewConsoleLogWriter(ctx context.Context, client console.Client, options ...Option) io.Writer {
result := &ConsoleWriter{
Buffer: bytes.NewBuffer([]byte{}),
ctx: ctx,
buffer: bytes.NewBuffer([]byte{}),
client: client,
bufferSizeChan: make(chan int),
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/harness/sink/console_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package sink

import (
"bytes"
"context"
"time"

console "github.com/pluralsh/deployment-operator/pkg/client"
)

const (
defaultBufferSizeLimit = 4096 // in kilobytes
defaultThrottleTime = 5 * time.Second
defaultThrottleTime = 5 * time.Second
)

type ConsoleWriter struct {
*bytes.Buffer
ctx context.Context
buffer *bytes.Buffer
// id is a stack run id that logs should be appended to
id string
// client ...
Expand All @@ -28,5 +30,4 @@ type ConsoleWriter struct {
ticker *time.Ticker
}


type Option func(*ConsoleWriter)
Loading

0 comments on commit 748bfc3

Please sign in to comment.