Skip to content

Commit

Permalink
working solution
Browse files Browse the repository at this point in the history
  • Loading branch information
TimHuynh committed Mar 11, 2024
1 parent 6c4bf6c commit 22b8836
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 20 deletions.
47 changes: 38 additions & 9 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,17 +640,47 @@ func (c *client) StreamBuild(ctx context.Context) error {
defer func() {
c.Logger.Infof("CURRENT STATUS at 795 is %s", c.build.GetStatus())
c.Logger.Trace("waiting for stream functions to return")
if c.build.GetStatus() == constants.StatusSuccess {
ctx.Done()

// Logic applies to only streamService
// if stream service
// wait for 10 secs
// tailContainer for any logs
// if nothing and buildIsFinished and Status is finite
// cancelStreaming()
// if there's logs, continue to wait until logStreaming timeout is done
//go func() {
// err := streams.Wait()
// if err != nil {
// c.Logger.Errorf("error in a stream request, %v", err)
// }
//}()
//time.Now().UTC().Unix()
// Create a ticker to check stdout periodically

// Start a goroutine to periodically check buildInfo
go func() {
for {
select {
case <-delayedCtx.Done():
c.Logger.Debug("Ctx is done. Cancelling status update!")
// Ctx is done. Cancels status update
return
default:
context2.Status = c.build.GetStatus()
context2.TimeFinished = c.build.GetFinished()
}
}
}()

err := streams.Wait()
if err != nil {
c.Logger.Errorf("error in a stream request, %v", err)
}
context2.Status = c.build.GetStatus()

//err := streams.Wait()
//c.Logger.Infof("CURRENT STATUS at 802 is %s", c.build.GetStatus())
//if err != nil {
// c.Logger.Errorf("error in a stream request, %v", err)
//}
cancelStreaming()

//
//cancelStreaming()
c.Logger.Infof("CURRENT STATUS at 807 is %s", c.build.GetStatus())
//cancelStreaming()
// wait for context to be done before reporting that everything has returned.
Expand All @@ -677,7 +707,6 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Debug("not accepting any more stream requests as channel is closed")
return nil
}

streams.Go(func() error {
// update engine logger with step metadata
//
Expand Down
9 changes: 5 additions & 4 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
// write all the logs from the scanner
logs.Write(append(scanner.Bytes(), []byte("\n")...))
}
if c.build.GetStatus() == constants.StatusSuccess {
logger.Info("finished streaming logs service")

logger.Info("finished streaming logs service")

// close channel to stop processing logs
close(done)
// close channel to stop processing logs
close(done)
}

return scanner.Err()
}
Expand Down
43 changes: 36 additions & 7 deletions internal/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,81 @@
package context

import (
"bufio"
"context"
"fmt"
"os"
"time"

"github.com/sirupsen/logrus"
)

var Status string
var TimeFinished int64

func WithDelayedCancelPropagation(parent context.Context, timeout time.Duration, name string, logger *logrus.Entry) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
var timer *time.Timer

// Create a ticker with a 1-second interval
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
// start the timer once the parent context is canceled
select {
case <-parent.Done():
logger.Infof("CURRENT STATUS at 21 is %s", Status)
logger.Tracef("parent context is done, starting %s timer for %s", name, timeout)
timer = time.NewTimer(timeout)

break
case <-ctx.Done():
logger.Infof("CURRENT STATUS at 27 is %s", Status)
logger.Tracef("%s finished before the parent context", name)

return
}

logger.Info("34 time is set")
// wait for the timer to elapse or the context to naturally finish.
// stop time ticker once finished.
select {
case <-timer.C:
logger.Infof("CURRENT STATUS at 36 is %s", Status)
logger.Tracef("%s timed out, propagating cancel to %s context", name, name)
ticker.Stop()
cancel()

return
case <-ticker.C:
stdout, stderr := bufio.NewScanner(os.Stdout), bufio.NewScanner(os.Stderr)

if stdout.Scan() || stderr.Scan() {
fmt.Println("There's content in stdout:", stdout.Text(), stderr.Text())
} else if Status == "success" && time.Now().UTC().Unix() >= TimeFinished {
cancel()
}
case <-ctx.Done():
logger.Infof("CURRENT STATUS at 42 is %s", Status)
logger.Tracef("%s finished, stopping timeout timer", name)
timer.Stop()

return
}
}()

logger.Infof("CURRENT STATUS at 50 is %s", Status)
//logger.Infof("CURRENT STATUS at 50 is %s", Status)
return ctx, cancel
}

//
//for {
//// Sleep for 10 seconds
//time.Sleep(10 * time.Second)
//stdout, stderr := bufio.NewScanner(os.Stdout), bufio.NewScanner(os.Stderr)
//
//if stdout.Scan() || stderr.Scan() {
//fmt.Println("There's content in stdout:", stdout.Text(), stderr.Text())
//continue
//} else if c.build.GetStatus() == constants.StatusSuccess && time.Now().UTC().Unix() >= c.build.GetFinished() {
//c.Logger.Info("Build succeed and already finished")
//cancelStreaming()
//fmt.Println("There's no content in stdout.")
//break
//}
//}

0 comments on commit 22b8836

Please sign in to comment.