Skip to content

Commit

Permalink
Implement read --follow (#700)
Browse files Browse the repository at this point in the history
* Implement `read --follow`

* Avoid timeout on `--watch`

* Simplify logic

* Improve timeout logic

* Apply suggestions from code review

Co-authored-by: Domas Monkus <[email protected]>

Co-authored-by: Domas Monkus <[email protected]>
Co-authored-by: Daniel Barnes <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2022
1 parent 110a740 commit d53e68f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 55 deletions.
101 changes: 69 additions & 32 deletions cmd/leo/read/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package read
import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -12,12 +14,19 @@ import (
"terraform-provider-iterative/task/common"
)

type status string

const (
statusQueued status = "queued"
statusSucceeded status = "succeeded"
statusFailed status = "failed"
statusRunning status = "running"
)

type Options struct {
Parallelism int
Timestamps bool
Status bool
Events bool
Logs bool
Follow bool
}

func New(cloud *common.Cloud) *cobra.Command {
Expand All @@ -35,9 +44,7 @@ func New(cloud *common.Cloud) *cobra.Command {

cmd.Flags().IntVar(&o.Parallelism, "parallelism", 1, "parallelism")
cmd.Flags().BoolVar(&o.Timestamps, "timestamps", false, "display timestamps")
cmd.Flags().BoolVar(&o.Status, "status", true, "read status")
cmd.Flags().BoolVar(&o.Logs, "logs", false, "read logs")
cmd.MarkFlagsMutuallyExclusive("status", "logs")
cmd.Flags().BoolVar(&o.Follow, "follow", false, "follow logs")

return cmd
}
Expand All @@ -50,7 +57,6 @@ func (o *Options) Run(cmd *cobra.Command, args []string, cloud *common.Cloud) er
}

ctx, cancel := context.WithTimeout(context.Background(), cloud.Timeouts.Read)
defer cancel()

id, err := common.ParseIdentifier(args[0])
if err != nil {
Expand All @@ -62,39 +68,70 @@ func (o *Options) Run(cmd *cobra.Command, args []string, cloud *common.Cloud) er
return err
}

if err := tsk.Read(ctx); err != nil {
return err
}
var last int
for {
if err := tsk.Read(ctx); err != nil {
return err
}

switch {
case o.Logs:
return o.printLogs(ctx, tsk)
case o.Status:
return o.printStatus(ctx, tsk)
}
logs, err := o.getLogs(ctx, tsk)
if err != nil {
return err
}

status, err := o.getStatus(ctx, tsk)
if err != nil {
return err
}

return nil
if delta := strings.Join(logs[last:], "\n"); delta != "" {
fmt.Println(delta)
last = len(logs)
}

switch o.Follow {
case true:
// disable debug logs for subsequent iterations
logrus.SetLevel(logrus.WarnLevel)
// create a new context to reset timeout on every iteration
ctx, cancel = context.WithTimeout(context.Background(), cloud.Timeouts.Read)
defer cancel()
case false:
return nil
}

switch status {
case statusSucceeded:
os.Exit(0)
case statusFailed:
os.Exit(1)
default:
time.Sleep(3 * time.Second)
}
}
}

func (o *Options) printLogs(ctx context.Context, tsk task.Task) error {
func (o *Options) getLogs(ctx context.Context, tsk task.Task) ([]string, error) {
logs, err := tsk.Logs(ctx)
if err != nil {
return err
return nil, err
}

var result []string

for _, log := range logs {
for _, line := range strings.Split(strings.Trim(log, "\n"), "\n") {
if !o.Timestamps {
_, line, _ = strings.Cut(line, " ")
}
fmt.Println(line)
result = append(result, line)
}
}

return nil
return result, nil
}

func (o *Options) printStatus(ctx context.Context, tsk task.Task) error {
func (o *Options) getStatus(ctx context.Context, tsk task.Task) (status, error) {
for _, event := range tsk.Events(ctx) {
line := fmt.Sprintf("%s: %s", event.Code, strings.Join(event.Description, " "))
if o.Timestamps {
Expand All @@ -106,21 +143,21 @@ func (o *Options) printStatus(ctx context.Context, tsk task.Task) error {

status, err := tsk.Status(ctx)
if err != nil {
return err
return "", err
}

message := "queued"
result := statusQueued

if status["succeeded"] >= o.Parallelism {
message = "succeeded"
if status[common.StatusCodeSucceeded] >= o.Parallelism {
result = statusSucceeded
}
if status["failed"] > 0 {
message = "failed"
if status[common.StatusCodeFailed] > 0 {
result = statusFailed
}
if status["running"] >= o.Parallelism {
message = "running"
if status[common.StatusCodeActive] >= o.Parallelism {
result = statusRunning
}

fmt.Println(message)
return nil
logrus.Debug(result)
return result, nil
}
8 changes: 0 additions & 8 deletions task/aws/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,6 @@ func (t *Task) Delete(ctx context.Context) error {
}

func (t *Task) Logs(ctx context.Context) ([]string, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Logs(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"])
}

Expand Down Expand Up @@ -346,10 +342,6 @@ func (t *Task) Events(ctx context.Context) []common.Event {
}

func (t *Task) Status(ctx context.Context) (common.Status, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Status(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"], t.Attributes.Status)
}

Expand Down
7 changes: 0 additions & 7 deletions task/az/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,6 @@ func (t *Task) Delete(ctx context.Context) error {
}

func (t *Task) Logs(ctx context.Context) ([]string, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Logs(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"])
}

Expand Down Expand Up @@ -338,9 +334,6 @@ func (t *Task) Events(ctx context.Context) []common.Event {
}

func (t *Task) Status(ctx context.Context) (common.Status, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}
return machine.Status(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"], t.Attributes.Status)
}

Expand Down
8 changes: 0 additions & 8 deletions task/gcp/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,6 @@ func (t *Task) Delete(ctx context.Context) error {
}

func (t *Task) Logs(ctx context.Context) ([]string, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Logs(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"])
}

Expand Down Expand Up @@ -418,10 +414,6 @@ func (t *Task) Events(ctx context.Context) []common.Event {
}

func (t *Task) Status(ctx context.Context) (common.Status, error) {
if err := t.Read(ctx); err != nil {
return nil, err
}

return machine.Status(ctx, t.DataSources.Credentials.Resource["RCLONE_REMOTE"], t.Attributes.Status)
}

Expand Down

0 comments on commit d53e68f

Please sign in to comment.