Skip to content

Commit

Permalink
update console logs sink
Browse files Browse the repository at this point in the history
  • Loading branch information
floreks committed Apr 25, 2024
1 parent 4e8abbb commit 03fd09f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 13 deletions.
7 changes: 5 additions & 2 deletions pkg/harness/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func (in *stackRunController) executables() []exec.Executable {
})

// Initialize a single console writer for all executables
consoleWriter := sink.NewConsoleLogWriter(in.consoleClient, in.sinkOptions...)
consoleWriter := sink.NewConsoleLogWriter(
in.consoleClient,
append(in.sinkOptions, sink.WithID(in.stackRunID))...,
)

return algorithms.Map(in.stackRun.Steps, func(step *gqlclient.RunStepFragment) exec.Executable {
return exec.NewExecutable(
Expand Down Expand Up @@ -199,7 +202,7 @@ func NewStackRunController(options ...Option) (Controller, error) {
runner := &stackRunController{
errChan: errChan,
finishedChan: finishedChan,
sinkOptions: make([]sink.Option, 0),
sinkOptions: make([]sink.Option, 0),
}

runner.executor = newExecutor(
Expand Down
64 changes: 58 additions & 6 deletions pkg/harness/sink/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sink
import (
"bytes"
"io"
"time"

"k8s.io/klog/v2"

Expand All @@ -16,26 +17,77 @@ func (in *ConsoleWriter) Write(p []byte) (n int, err error) {
return
}

// bufferedFlush sends logs to the console only when available
// logs size is greater or equal to bufferSizeLimit.
func (in *ConsoleWriter) bufferedFlush() {
n := in.Buffer.Len()
if n < in.bufferSizeLimit {
return
}

klog.V(log.LogLevelTrace).InfoS("flushing logs", "buffer_size", n, "limit", in.bufferSizeLimit)
// flush logs
}

// flush sends logs to the console.
// When ignoreLimit is true it send all available logs to the console,
// otherwise it sends logs up to the bufferSizeLimit.
func (in *ConsoleWriter) flush(ignoreLimit bool) {
n := in.Buffer.Len()
if n <= 0 {
return
}

if ignoreLimit {
klog.V(log.LogLevelTrace).InfoS("flushing all remaining logs", "buffer_size", n)

// flush all logs
return
}

// flush logs up to the limit
klog.V(log.LogLevelTrace).InfoS("flushing logs", "buffer_size", n, "limit", in.bufferSizeLimit)
read := n
if read > in.bufferSizeLimit {
read = in.bufferSizeLimit
}

_ = string(in.Buffer.Next(read))
}

func (in *ConsoleWriter) readAsync() {
defer in.ticker.Stop()

for {
select {
case bufferSize := <-in.bufferSizeChan:
klog.V(log.LogLevelTrace).InfoS("reading buffer", "size", bufferSize)
// TODO: add case for graceful shutdown and try to flush remaining logs before exit
select {
case <-in.bufferSizeChan:
in.bufferedFlush()
case <-in.ticker.C:
in.flush(false)
}
}
}

func (in *ConsoleWriter) init() io.Writer {
// TODO: init throttle, buffer, ticker
if in.throttle == 0 {
klog.Warningf("throttle cannot be set to 0, defaulting to: %d", defaultThrottleTime)
in.throttle = defaultThrottleTime
}

if in.bufferSizeLimit == 0 {
klog.Warningf("bufferSizeLimit cannot be set to 0, defaulting to: %d", defaultBufferSizeLimit)
in.bufferSizeLimit = defaultBufferSizeLimit
}

in.ticker = time.NewTicker(in.throttle)
return in
}

func NewConsoleLogWriter(client console.Client, options ...Option) io.Writer {
result := &ConsoleWriter{
Buffer: bytes.NewBuffer([]byte{}),
client: client,
Buffer: bytes.NewBuffer([]byte{}),
client: client,
bufferSizeChan: make(chan int),
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/harness/sink/console_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func WithBufferSizeLimit(limit int) Option {
}
}

func WithName(name string) Option {
func WithID(id string) Option {
return func(writer *ConsoleWriter) {
writer.name = name
writer.id = id
}
}
11 changes: 8 additions & 3 deletions pkg/harness/sink/console_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
console "github.com/pluralsh/deployment-operator/pkg/client"
)

const (
defaultBufferSizeLimit = 4096 // in kilobytes
defaultThrottleTime = 5 * time.Second
)

type ConsoleWriter struct {
*bytes.Buffer
// name ...
name string
// id is a stack run id that logs should be appended to
id string
// client ...
client console.Client
// throttle controls how frequently logs will be flushed to its destination
Expand All @@ -20,7 +25,7 @@ type ConsoleWriter struct {
// bufferSizeChan
bufferSizeChan chan int
// ticker
ticker time.Ticker
ticker *time.Ticker
}


Expand Down

0 comments on commit 03fd09f

Please sign in to comment.