Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
michaelsauter committed Mar 30, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent fbd4ca7 commit 51dbe63
Showing 2 changed files with 28 additions and 24 deletions.
50 changes: 27 additions & 23 deletions internal/manager/queue.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import (
)

// pipelineRunQueue manages multiple queues. These queues
// can be polled in vertain intervals.
// can be polled in certain intervals.
type pipelineRunQueue struct {
queues map[string]bool
pollInterval time.Duration
@@ -31,14 +31,9 @@ func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, max
}
q.queues[identifier] = true

maxInitialWaitSeconds := int(maxInitialWait.Seconds())
var ticker *time.Ticker
if maxInitialWaitSeconds > 1 {
initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1
ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second)
} else {
ticker = time.NewTicker(time.Second)
}
wait(maxInitialWait)

ticker := time.NewTicker(q.pollInterval)
go func() {
for {
select {
@@ -47,8 +42,6 @@ func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, max
ticker.Stop()
return
case <-ticker.C:
ticker.Stop()
ticker = time.NewTicker(q.pollInterval)
q.logger.Debugf("Advancing queue for %s ...", identifier)
queueLength, err := pt.AdvanceQueue(identifier)
if err != nil {
@@ -78,18 +71,6 @@ type Queue struct {
TektonClient tektonClient.ClientPipelineRunInterface
}

// needsQueueing checks if any run has either:
// - pending status set OR
// - is progressing
func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool {
for _, pr := range pipelineRuns.Items {
if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) {
return true
}
}
return false
}

// AdvanceQueue starts the oldest pending pipeline run if there is no
// progressing pipeline run at the moment.
// It returns the queue length.
@@ -136,8 +117,31 @@ func (s *Server) AdvanceQueue(repository string) (int, error) {
return len(pendingPrs), nil
}

// needsQueueing checks if any run has either:
// - pending status set OR
// - is progressing
func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool {
for _, pr := range pipelineRuns.Items {
if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) {
return true
}
}
return false
}

// pipelineRunIsProgressing returns true if the PR is not done, not pending,
// not cancelled, and not timed out.
func pipelineRunIsProgressing(pr tekton.PipelineRun) bool {
return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut())
}

// wait waits for up to maxInitialWait. The exact wait time is
// pseudo-randomized if maxInitialWait is longer than one second.
func wait(maxInitialWait time.Duration) {
initialWait := time.Second
if maxInitialWait > time.Second {
initialWait = time.Duration(rand.Intn(int(maxInitialWait.Seconds())-1) + 1)
}
timer := time.NewTimer(initialWait)
<-timer.C
}
2 changes: 1 addition & 1 deletion internal/manager/server.go
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ func NewServer(serverConfig ServerConfig) (*Server, error) {
s.Logger.Infof(
"Checking for pending pipeline runs for repository %s ...", r,
)
s.RunQueue.StartPolling(s, r, 30*time.Second)
s.RunQueue.StartPolling(s, r, 10*time.Second)
}
}()
return s, nil

0 comments on commit 51dbe63

Please sign in to comment.