From e1d637282b839bf2c71709ac0a4b09fa2df41b4e Mon Sep 17 00:00:00 2001 From: Michael Sauter Date: Mon, 21 Feb 2022 08:53:49 +0100 Subject: [PATCH] Implement queueing Pipeline runs belonging to one repository now run sequentially. If a pipeline run cannot start immediately, it is created as "pending", and a process is started to periodically check if it can start. Since the pipeline manager service may be restarted, it check on boot if there are any pending runs for the repositories under its control and starts the periodic check for those. We can improve on the design by adding a signal in the finish task of a pipeline run that the run will soon finish, reducing the up to 30s wait time for the next run. Closes #394. --- internal/manager/bitbucket.go | 14 +++ internal/manager/pipeline.go | 5 +- internal/manager/queue.go | 143 +++++++++++++++++++++ internal/manager/queue_test.go | 220 +++++++++++++++++++++++++++++++++ internal/manager/server.go | 44 ++++++- 5 files changed, 424 insertions(+), 2 deletions(-) create mode 100644 internal/manager/queue.go create mode 100644 internal/manager/queue_test.go diff --git a/internal/manager/bitbucket.go b/internal/manager/bitbucket.go index 51162db5..05f40ba3 100644 --- a/internal/manager/bitbucket.go +++ b/internal/manager/bitbucket.go @@ -86,3 +86,17 @@ func shouldSkip(bitbucketClient bitbucket.CommitClientInterface, projectKey, rep } return isCiSkipInCommitMessage(c.Message) } + +// getRepoNames retrieves the name of all repositories within the project +// identified by projectKey. +func getRepoNames(bitbucketClient bitbucket.RepoClientInterface, projectKey string) ([]string, error) { + repos := []string{} + rl, err := bitbucketClient.RepoList(projectKey) + if err != nil { + return repos, err + } + for _, n := range rl.Values { + repos = append(repos, n.Name) + } + return repos, nil +} diff --git a/internal/manager/pipeline.go b/internal/manager/pipeline.go index 2713c366..2e0d9459 100644 --- a/internal/manager/pipeline.go +++ b/internal/manager/pipeline.go @@ -33,7 +33,7 @@ const ( ) // createPipelineRun creates a PipelineRun resource -func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData) (*tekton.PipelineRun, error) { +func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData, needQueueing bool) (*tekton.PipelineRun, error) { pr, err := tektonClient.CreatePipelineRun(ctxt, &tekton.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ GenerateName: fmt.Sprintf("%s-", pData.Name), @@ -59,6 +59,9 @@ func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctx if err != nil { return nil, err } + if needQueueing { + pr.Spec.Status = tekton.PipelineRunSpecStatusPending + } return pr, nil } diff --git a/internal/manager/queue.go b/internal/manager/queue.go new file mode 100644 index 00000000..b9c2afa5 --- /dev/null +++ b/internal/manager/queue.go @@ -0,0 +1,143 @@ +package manager + +import ( + "context" + "fmt" + "math/rand" + "sort" + "time" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + "github.com/opendevstack/pipeline/pkg/logging" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// pipelineRunQueue manages multiple queues. These queues +// can be polled in vertain intervals. +type pipelineRunQueue struct { + queues map[string]bool + pollInterval time.Duration + // logger is the logger to send logging messages to. + logger logging.LeveledLoggerInterface +} + +// StartPolling periodically checks status for given identifier. +// The time until the first time is not more than maxInitialWait. +func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool { + quit := make(chan bool) + if q.queues[identifier] { + close(quit) + return quit + } + q.queues[identifier] = true + + maxInitialWaitSeconds := int(maxInitialWait.Seconds()) + var ticker *time.Ticker + if maxInitialWaitSeconds > 1 { + initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1 + ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second) + } else { + ticker = time.NewTicker(time.Second) + } + go func() { + for { + select { + case <-quit: + q.queues[identifier] = false + ticker.Stop() + return + case <-ticker.C: + ticker.Stop() + ticker = time.NewTicker(q.pollInterval) + queueLength, err := pt.AdvanceQueue(identifier) + if err != nil { + q.logger.Warnf("error during poll tick: %s", err) + } + if queueLength == 0 { + close(quit) + } + } + } + }() + + return quit +} + +// QueueAdvancer is the interface passed to +// *pipelineRunQueue#StartPolling. +type QueueAdvancer interface { + // AdvanceQueue is called for each poll step. + AdvanceQueue(repository string) (int, error) +} + +// Queue represents a pipeline run Queue. Pipelines of one repository must +// not run in parallel. +type Queue struct { + TektonClient tektonClient.ClientPipelineRunInterface +} + +// needsQueueing checks if any run has either: +// - pending status set OR +// - is progressing +func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool { + for _, pr := range pipelineRuns.Items { + if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) { + return true + } + } + return false +} + +// AdvanceQueue starts the oldest pending pipeline run if there is no +// progressing pipeline run at the moment. +// It returns the queue length. +func (s *Server) AdvanceQueue(repository string) (int, error) { + s.Mutex.Lock() + defer s.Mutex.Unlock() + ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository) + if err != nil { + return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err) + } + + var foundRunning bool + pendingPrs := []tekton.PipelineRun{} + for _, pr := range pipelineRuns.Items { + if pr.IsPending() { + pendingPrs = append(pendingPrs, pr) + continue + } + if pipelineRunIsProgressing(pr) { + foundRunning = true + continue + } + } + + if !foundRunning && len(pendingPrs) > 0 { + // update oldest pending PR + sortPipelineRunsDescending(pendingPrs) + oldestPR := pendingPrs[len(pendingPrs)-1] + pendingPrs = pendingPrs[:len(pendingPrs)-1] + oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run + _, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{}) + if err != nil { + return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err) + } + } + return len(pendingPrs), nil +} + +// pipelineRunIsProgressing returns true if the PR is not done, not pending, +// not cancelled, and not timed out. +func pipelineRunIsProgressing(pr tekton.PipelineRun) bool { + return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut()) +} + +// sortPipelineRunsDescending sorts pipeline runs by time (descending) +func sortPipelineRunsDescending(pipelineRuns []tekton.PipelineRun) { + sort.Slice(pipelineRuns, func(i, j int) bool { + return pipelineRuns[j].CreationTimestamp.Time.Before(pipelineRuns[i].CreationTimestamp.Time) + }) +} diff --git a/internal/manager/queue_test.go b/internal/manager/queue_test.go new file mode 100644 index 00000000..2848d0e5 --- /dev/null +++ b/internal/manager/queue_test.go @@ -0,0 +1,220 @@ +package manager + +import ( + "testing" + "time" + + tektonClient "github.com/opendevstack/pipeline/internal/tekton" + tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// fakeAdvancerDone is always done advancing the queue. +type fakeAdvancerDone struct { +} + +func (f *fakeAdvancerDone) AdvanceQueue(repository string) (int, error) { + return 0, nil +} + +func TestPollIdentifier(t *testing.T) { + p := &pipelineRunQueue{ + queues: map[string]bool{ + "a": true, + "b": false, + }, + pollInterval: time.Second, + } + f := &fakeAdvancerDone{} + p.StartPolling(f, "a", time.Second) + p.StartPolling(f, "b", time.Second) + if !p.queues["a"] { + t.Fatal("polling state for 'a' should be true") + } + if !p.queues["b"] { + t.Fatal("polling state for 'b' should be true") + } +} + +// fakeAdvancerSteps can be called a few times before it is done advancing the queue. +type fakeAdvancerSteps struct { + count int +} + +func (f *fakeAdvancerSteps) AdvanceQueue(repository string) (int, error) { + if f.count < 2 { + f.count++ + return 1, nil + } + return 0, nil +} + +func TestAdvanceQueueAndQuit(t *testing.T) { + p := &pipelineRunQueue{ + queues: map[string]bool{}, + pollInterval: time.Millisecond, + } + f := &fakeAdvancerSteps{} + done := p.StartPolling(f, "a", time.Second) + select { + case <-done: + t.Log("quit occured") + case <-time.After(5 * time.Second): + t.Fatal("quit should have occured") + } +} + +func TestAdvanceQueue(t *testing.T) { + tests := map[string]struct { + runs []*tekton.PipelineRun + wantStart string + wantPollDone bool + }{ + "none": { + runs: []*tekton.PipelineRun{}, + wantStart: "", + wantPollDone: true, + }, + "one cancelled, none pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + }, + wantStart: "", + wantPollDone: true, + }, + "one cancelled, one pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + pendingPipelineRun(t, "two", time.Now()), + }, + wantStart: "two", + wantPollDone: true, + }, + "one cancelled, two pending": { + runs: []*tekton.PipelineRun{ + cancelledPipelineRun(t, "one", time.Now()), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + pendingPipelineRun(t, "three", time.Now().Add(time.Minute*-2)), + }, + wantStart: "three", + wantPollDone: false, + }, + "two pending": { + runs: []*tekton.PipelineRun{ + pendingPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "one", + wantPollDone: false, + }, + "one timed out, one pending": { + runs: []*tekton.PipelineRun{ + timedOutPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "two", + wantPollDone: true, + }, + "one running, one pending": { + runs: []*tekton.PipelineRun{ + runningPipelineRun(t, "one", time.Now().Add(time.Minute*-2)), + pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)), + }, + wantStart: "", + wantPollDone: false, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + tclient := &tektonClient.TestClient{PipelineRuns: tc.runs} + s := &Server{TektonClient: tclient} + queueLength, err := s.AdvanceQueue("a") + if err != nil { + t.Fatal(err) + } + if tc.wantStart != "" { + if len(tclient.UpdatedPipelineRuns) != 1 { + t.Fatal("should have updated one run") + } + if tclient.UpdatedPipelineRuns[0] != tc.wantStart { + t.Fatalf("should have updated run '%s'", tc.wantStart) + } + } else { + if len(tclient.UpdatedPipelineRuns) > 0 { + t.Fatal("should not have updated any run") + } + } + if (queueLength == 0) != tc.wantPollDone { + t.Fatalf("want polling to be done: %v, but queue length is: %d", tc.wantPollDone, queueLength) + } + }) + } +} + +func pendingPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Status: tekton.PipelineRunSpecStatusPending, + }, + } + if !pr.IsPending() { + t.Fatal("pr should be pending") + } + return pr +} + +func cancelledPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Status: tekton.PipelineRunSpecStatusCancelled, + }, + } + if !pr.IsCancelled() || pr.IsPending() || pr.IsDone() || pr.IsTimedOut() { + t.Fatal("pr should be cancelled") + } + return pr +} + +func timedOutPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + // pipelineTimeout := pr.Spec.Timeout + // startTime := pr.Status.StartTime + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + Spec: tekton.PipelineRunSpec{ + Timeout: &metav1.Duration{Duration: time.Second}, + }, + Status: tekton.PipelineRunStatus{ + PipelineRunStatusFields: tekton.PipelineRunStatusFields{ + StartTime: &metav1.Time{Time: time.Now().Add(-2 * time.Second)}, + }, + }, + } + if !pr.IsTimedOut() || pr.IsPending() || pr.IsDone() || pr.IsCancelled() { + t.Fatal("pr should be timed out") + } + return pr +} + +func runningPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun { + pr := &tekton.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: creationTime}, + }, + } + if pr.IsDone() || pr.IsPending() || pr.IsTimedOut() || pr.IsCancelled() { + t.Fatal("pr should be running") + } + return pr +} diff --git a/internal/manager/server.go b/internal/manager/server.go index 76b01cb0..21012761 100644 --- a/internal/manager/server.go +++ b/internal/manager/server.go @@ -27,6 +27,8 @@ const ( allowedChangeRefType = "BRANCH" // letterBytes contains letters to use for random strings. letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + // defaultQueuePollInterval defines the queue pill interval in seconds. + defaultQueuePollIntervalSeconds = 30 ) // Server represents this service, and is a global. @@ -44,6 +46,7 @@ type Server struct { BitbucketClient bitbucketInterface PipelineRunPruner PipelineRunPruner Mutex sync.Mutex + RunQueue *pipelineRunQueue Logger logging.LeveledLoggerInterface } @@ -80,6 +83,9 @@ type ServerConfig struct { BitbucketClient bitbucketInterface // PipelineRunPruner is responsible to prune pipeline runs. PipelineRunPruner PipelineRunPruner + // PollInterval defines the interval between polling for pending pipelines + // in order to start one if possible. + PollInterval time.Duration // Logger is the logger to send logging messages to. Logger logging.LeveledLoggerInterface } @@ -135,6 +141,14 @@ func NewServer(serverConfig ServerConfig) (*Server, error) { if serverConfig.Logger == nil { serverConfig.Logger = &logging.LeveledLogger{Level: logging.LevelError} } + runQueue := &pipelineRunQueue{ + queues: map[string]bool{}, + pollInterval: time.Duration(defaultQueuePollIntervalSeconds) * time.Second, + logger: serverConfig.Logger, + } + if serverConfig.PollInterval > 0 { + runQueue.pollInterval = serverConfig.PollInterval + } s := &Server{ KubernetesClient: serverConfig.KubernetesClient, TektonClient: serverConfig.TektonClient, @@ -148,8 +162,30 @@ func NewServer(serverConfig ServerConfig) (*Server, error) { TaskSuffix: serverConfig.TaskSuffix, StorageConfig: serverConfig.StorageConfig, PipelineRunPruner: serverConfig.PipelineRunPruner, + RunQueue: runQueue, Logger: serverConfig.Logger, } + + s.Logger.Infof( + "Retrieving repositories of Bitbucket project %s to check for pending pipeline runs ...\n", + s.Project, + ) + repos, err := getRepoNames(s.BitbucketClient, s.Project) + if err != nil { + s.Logger.Infof( + "could not retrieve repositories from Bitbucket to poll for pending pipeline runs: %s\n", + err, + ) + } + s.Logger.Debugf( + "Found %d repositories for which to check for pending pipeline runs.\n", len(repos), + ) + for _, r := range repos { + s.Logger.Infof( + "Checking for pending pipeline runs for repository %s ...\n", r, + ) + s.RunQueue.StartPolling(s, r, 30*time.Second) + } return s, nil } @@ -384,7 +420,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.Logger.Infof(requestID, fmt.Sprintf("%+v", pData)) - _, err = createPipelineRun(s.TektonClient, r.Context(), pData) + needQueueing := needsQueueing(pipelineRuns) + _, err = createPipelineRun(s.TektonClient, r.Context(), pData, needQueueing) if err != nil { msg := "cannot create pipeline run" s.Logger.Errorf(requestID, fmt.Sprintf("%s: %s", msg, err)) @@ -392,6 +429,11 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { return } + // if need queueing, then schedule poller + if needQueueing { + s.RunQueue.StartPolling(s, pData.Repository, 30*time.Second) + } + err = json.NewEncoder(w).Encode(pData) if err != nil { s.Logger.Errorf(requestID, fmt.Sprintf("cannot write body: %s", err))