Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsauter committed Mar 30, 2022
1 parent b675529 commit 1bfa1f1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
26 changes: 15 additions & 11 deletions internal/manager/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,25 @@ import (
)

// pipelineRunQueue manages multiple queues. These queues
// can be polled in vertain intervals.
// can be polled in certain intervals.
type pipelineRunQueue struct {
queues map[string]bool
pollInterval time.Duration
// logger is the logger to send logging messages to.
logger logging.LeveledLoggerInterface
}

// wait waits for up to maxInitialWait. The exact wait time is
// pseudo-randomized if maxInitialWait is longer than one second.
func wait(maxInitialWait time.Duration) {
initialWait := time.Second
if maxInitialWait > time.Second {
initialWait = time.Duration(rand.Intn(int(maxInitialWait.Seconds())-1) + 1)
}
timer := time.NewTimer(initialWait)
<-timer.C
}

// StartPolling periodically checks status for given identifier.
// The time until the first time is not more than maxInitialWait.
func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool {
Expand All @@ -31,14 +42,9 @@ func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, max
}
q.queues[identifier] = true

maxInitialWaitSeconds := int(maxInitialWait.Seconds())
var ticker *time.Ticker
if maxInitialWaitSeconds > 1 {
initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1
ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second)
} else {
ticker = time.NewTicker(time.Second)
}
wait(maxInitialWait)

ticker := time.NewTicker(q.pollInterval)
go func() {
for {
select {
Expand All @@ -47,8 +53,6 @@ func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, max
ticker.Stop()
return
case <-ticker.C:
ticker.Stop()
ticker = time.NewTicker(q.pollInterval)
q.logger.Debugf("Advancing queue for %s ...", identifier)
queueLength, err := pt.AdvanceQueue(identifier)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func NewServer(serverConfig ServerConfig) (*Server, error) {
s.Logger.Infof(
"Checking for pending pipeline runs for repository %s ...", r,
)
s.RunQueue.StartPolling(s, r, 30*time.Second)
s.RunQueue.StartPolling(s, r, 10*time.Second)
}
}()
return s, nil
Expand Down

0 comments on commit 1bfa1f1

Please sign in to comment.