Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement queueing
Browse files Browse the repository at this point in the history
Pipeline runs belonging to one repository now run sequentially. If a
pipeline run cannot start immediately, it is created as "pending", and a
process is started to periodically check if it can start.

Since the pipeline manager service may be restarted, it check on boot if
there are any pending runs for the repositories under its control and
starts the periodic check for those.

We can improve on the design by adding a signal in the finish task of a
pipeline run that the run will soon finish, reducing the up to 30s wait
time for the next run.

Closes #394.
michaelsauter committed Mar 21, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent c23fe08 commit d2f5168
Showing 10 changed files with 993 additions and 538 deletions.
102 changes: 102 additions & 0 deletions internal/manager/bitbucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package manager

import (
"fmt"

"github.com/opendevstack/pipeline/pkg/bitbucket"
)

type bitbucketInterface interface {
bitbucket.CommitClientInterface
bitbucket.RawClientInterface
bitbucket.RepoClientInterface
}

type repository struct {
Project struct {
Key string `json:"key"`
} `json:"project"`
Slug string `json:"slug"`
}
type requestBitbucket struct {
EventKey string `json:"eventKey"`
Repository repository `json:"repository"`
Changes []struct {
Type string `json:"type"`
Ref struct {
ID string `json:"id"`
DisplayID string `json:"displayId"`
Type string `json:"type"`
} `json:"ref"`
FromHash string `json:"fromHash"`
ToHash string `json:"toHash"`
} `json:"changes"`
PullRequest *struct {
FromRef struct {
Repository repository `json:"repository"`
ID string `json:"id"`
DisplayID string `json:"displayId"`
LatestCommit string `json:"latestCommit"`
} `json:"fromRef"`
} `json:"pullRequest"`
Comment *struct {
Text string `json:"text"`
} `json:"comment"`
}

func getCommitSHA(bitbucketClient bitbucket.CommitClientInterface, project, repository, gitFullRef string) (string, error) {
commitList, err := bitbucketClient.CommitList(project, repository, bitbucket.CommitListParams{
Until: gitFullRef,
})
if err != nil {
return "", fmt.Errorf("could not get commit list: %w", err)
}
return commitList.Values[0].ID, nil
}

type prInfo struct {
ID int
Base string
}

func extractPullRequestInfo(bitbucketClient bitbucket.CommitClientInterface, projectKey, repositorySlug, gitCommit string) (prInfo, error) {
var i prInfo

prPage, err := bitbucketClient.CommitPullRequestList(projectKey, repositorySlug, gitCommit)
if err != nil {
return i, err
}

for _, v := range prPage.Values {
if !v.Open {
continue
}
i.ID = v.ID
i.Base = v.ToRef.ID
break
}

return i, nil
}

func shouldSkip(bitbucketClient bitbucket.CommitClientInterface, projectKey, repositorySlug, gitCommit string) bool {
c, err := bitbucketClient.CommitGet(projectKey, repositorySlug, gitCommit)
if err != nil {
return false
}
return isCiSkipInCommitMessage(c.Message)
}

// getRepoNames retrieves the name of all repositories within the project
// identified by projectKey.
func getRepoNames(bitbucketClient bitbucket.RepoClientInterface, projectKey string) ([]string, error) {
repos := []string{}
rl, err := bitbucketClient.RepoList(projectKey)
if err != nil {
return repos, err
}
for _, n := range rl.Values {
repos = append(repos, n.Name)
}
return repos, nil
}
319 changes: 319 additions & 0 deletions internal/manager/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
package manager

import (
"context"
"crypto/sha1"
"fmt"
"regexp"
"strconv"
"strings"

tektonClient "github.com/opendevstack/pipeline/internal/tekton"
"github.com/opendevstack/pipeline/pkg/config"
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const (
// Label prefix to use for labels applied by this service.
labelPrefix = "pipeline.opendevstack.org/"
// Label specifying the Bitbucket repository related to the pipeline.
repositoryLabel = labelPrefix + "repository"
// Label specifying the Git ref (e.g. branch) related to the pipeline.
gitRefLabel = labelPrefix + "git-ref"
// Label specifying the target stage of the pipeline.
stageLabel = labelPrefix + "stage"
// tektonAPIVersion specifies the Tekton API version in use
tektonAPIVersion = "tekton.dev/v1beta1"
// sharedWorkspaceName is the name of the workspace shared by all tasks
sharedWorkspaceName = "shared-workspace"
)

// createPipelineRun creates a PipelineRun resource
func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData, needQueueing bool) (*tekton.PipelineRun, error) {
pr, err := tektonClient.CreatePipelineRun(ctxt, &tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", pData.Name),
Labels: pipelineLabels(pData),
},
TypeMeta: metav1.TypeMeta{
APIVersion: tektonAPIVersion,
Kind: "PipelineRun",
},
Spec: tekton.PipelineRunSpec{
PipelineRef: &tekton.PipelineRef{Name: pData.Name},
ServiceAccountName: "pipeline", // TODO
Workspaces: []tekton.WorkspaceBinding{
{
Name: sharedWorkspaceName,
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pData.PVC,
},
},
},
},
}, metav1.CreateOptions{})
if err != nil {
return nil, err
}
if needQueueing {
pr.Spec.Status = tekton.PipelineRunSpecStatusPending
}
return pr, nil
}

// listPipelineRuns lists pipeline runs associated with repository.
func listPipelineRuns(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, repository string) (*tekton.PipelineRunList, error) {
labelMap := map[string]string{repositoryLabel: repository}
return tektonClient.ListPipelineRuns(
ctxt, metav1.ListOptions{LabelSelector: labels.Set(labelMap).String()},
)
}

// makePipelineName generates the name of the pipeline.
// According to the Kubernetes label rules, a maximum of 63 characters is
// allowed, see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set.
// Therefore, the name might be truncated. As this could cause potential clashes
// between similar named branches, we put a short part of the branch hash value
// into the name name to make this very unlikely.
// We cut the pipeline name at 55 chars to allow e.g. pipeline runs to add suffixes.
func makePipelineName(component string, branch string) string {
// Cut all non-alphanumeric characters
safeCharsRegex := regexp.MustCompile("[^-a-zA-Z0-9]+")
pipeline := component + "-" + safeCharsRegex.ReplaceAllString(
strings.Replace(branch, "/", "-", -1),
"",
)

// 55 is derived from K8s label max length minus room for generateName suffix.
pipeline = fitStringToMaxLength(pipeline, 55)
return strings.ToLower(pipeline)
}

// fitStringToMaxLength ensures s is not longer than max.
// If s is longer than max, it shortenes s and appends a unique, consistent
// suffix so that multiple invocations produce the same result. The length
// of the shortened string will be equal to max.
func fitStringToMaxLength(s string, max int) string {
if len(s) <= max {
return s
}
suffixLength := 7
shortened := s[0 : max-suffixLength-1]
h := sha1.New()
_, err := h.Write([]byte(s))
if err != nil {
return shortened
}
bs := h.Sum(nil)
suffix := fmt.Sprintf("%x", bs)
return fmt.Sprintf("%s-%s", shortened, suffix[0:suffixLength])
}

// pipelineLabels returns a map of labels to apply to pipelines and related runs.
func pipelineLabels(data PipelineData) map[string]string {
return map[string]string{
repositoryLabel: data.Repository,
gitRefLabel: data.GitRef,
stageLabel: data.Stage,
}
}

func assemblePipeline(odsConfig *config.ODS, data PipelineData, taskKind tekton.TaskKind, taskSuffix string) *tekton.Pipeline {

var tasks []tekton.PipelineTask
tasks = append(tasks, tekton.PipelineTask{
Name: "ods-start",
TaskRef: &tekton.TaskRef{Kind: taskKind, Name: "ods-start" + taskSuffix},
Workspaces: []tekton.WorkspacePipelineTaskBinding{
{Name: "source", Workspace: sharedWorkspaceName},
},
Params: []tekton.Param{
{
Name: "url",
Value: tekton.ArrayOrString{
StringVal: "$(params.git-repo-url)",
Type: tekton.ParamTypeString,
},
},
{
Name: "git-full-ref",
Value: tekton.ArrayOrString{
StringVal: "$(params.git-full-ref)",
Type: tekton.ParamTypeString,
},
},
{
Name: "project",
Value: tekton.ArrayOrString{
StringVal: "$(params.project)",
Type: tekton.ParamTypeString,
},
},
{
Name: "pr-key",
Value: tekton.ArrayOrString{
StringVal: "$(params.pr-key)",
Type: tekton.ParamTypeString,
},
},
{
Name: "pr-base",
Value: tekton.ArrayOrString{
StringVal: "$(params.pr-base)",
Type: tekton.ParamTypeString,
},
},
{
Name: "pipeline-run-name",
Value: tekton.ArrayOrString{
StringVal: "$(context.pipelineRun.name)",
Type: tekton.ParamTypeString,
},
},
{
Name: "environment",
Value: tekton.ArrayOrString{
StringVal: "$(params.environment)",
Type: tekton.ParamTypeString,
},
},
{
Name: "version",
Value: tekton.ArrayOrString{
StringVal: "$(params.version)",
Type: tekton.ParamTypeString,
},
},
},
})
if len(odsConfig.Pipeline.Tasks) > 0 {
odsConfig.Pipeline.Tasks[0].RunAfter = append(odsConfig.Pipeline.Tasks[0].RunAfter, "ods-start")
tasks = append(tasks, odsConfig.Pipeline.Tasks...)
}

var finallyTasks []tekton.PipelineTask
finallyTasks = append(finallyTasks, odsConfig.Pipeline.Finally...)

finallyTasks = append(finallyTasks, tekton.PipelineTask{
Name: "ods-finish",
TaskRef: &tekton.TaskRef{Kind: taskKind, Name: "ods-finish" + taskSuffix},
Workspaces: []tekton.WorkspacePipelineTaskBinding{
{Name: "source", Workspace: sharedWorkspaceName},
},
Params: []tekton.Param{
{
Name: "pipeline-run-name",
Value: tekton.ArrayOrString{
StringVal: "$(context.pipelineRun.name)",
Type: tekton.ParamTypeString,
},
},
{
Name: "aggregate-tasks-status",
Value: tekton.ArrayOrString{
StringVal: "$(tasks.status)",
Type: tekton.ParamTypeString,
},
},
},
})

p := &tekton.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: data.Name,
Labels: pipelineLabels(data),
},
TypeMeta: metav1.TypeMeta{
APIVersion: tektonAPIVersion,
Kind: "Pipeline",
},
Spec: tekton.PipelineSpec{
Description: "ODS",
Params: []tekton.ParamSpec{
{
Name: "repository",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.Repository,
Type: tekton.ParamTypeString,
},
},
{
Name: "project",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.Project,
Type: tekton.ParamTypeString,
},
},
{
Name: "component",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.Component,
Type: tekton.ParamTypeString,
},
},
{
Name: "git-repo-url",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.GitURI,
Type: tekton.ParamTypeString,
},
},
{
Name: "git-full-ref",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.GitFullRef,
Type: tekton.ParamTypeString,
},
},
{
Name: "pr-key",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: strconv.Itoa(data.PullRequestKey),
Type: tekton.ParamTypeString,
},
},
{
Name: "pr-base",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.PullRequestBase,
Type: tekton.ParamTypeString,
},
},
{
Name: "environment",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.Environment,
Type: tekton.ParamTypeString,
},
},
{
Name: "version",
Type: "string",
Default: &tekton.ArrayOrString{
StringVal: data.Version,
Type: tekton.ParamTypeString,
},
},
},
Tasks: tasks,
Workspaces: []tekton.PipelineWorkspaceDeclaration{
{
Name: sharedWorkspaceName,
},
},
Finally: finallyTasks,
},
}
return p
}
6 changes: 1 addition & 5 deletions internal/manager/prune.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"

tektonClient "github.com/opendevstack/pipeline/internal/tekton"
@@ -125,10 +124,7 @@ func (p *pipelineRunPrunerByStage) categorizePipelineRunsByStage(pipelineRuns []
// If all pipeline runs of one pipeline can be pruned, the pipeline is
// returned instead of the individual pipeline runs.
func (s *pipelineRunPrunerByStage) findPrunableResources(pipelineRuns []tekton.PipelineRun) *prunableResources {
// Sort pipeline runs by time (descending)
sort.Slice(pipelineRuns, func(i, j int) bool {
return pipelineRuns[j].CreationTimestamp.Time.Before(pipelineRuns[i].CreationTimestamp.Time)
})
sortPipelineRunsDescending(pipelineRuns)

// Apply cleanup to each bucket.
prunablePipelines := []string{}
109 changes: 29 additions & 80 deletions internal/manager/prune_test.go
Original file line number Diff line number Diff line change
@@ -77,86 +77,22 @@ func TestPrune(t *testing.T) {
t.Fatal(err)
}
prs := []tekton.PipelineRun{
{ // not pruned
ObjectMeta: metav1.ObjectMeta{
Name: "pr-a",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Minute * -1)},
Labels: map[string]string{
stageLabel: config.DevStage,
tektonPipelineLabel: "p-one",
},
},
},
{ // would be pruned by maxKeepRuns, but is protected by minKeepHours
ObjectMeta: metav1.ObjectMeta{
Name: "pr-b",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Minute * -3)},
Labels: map[string]string{
stageLabel: config.DevStage,
tektonPipelineLabel: "p-one",
},
},
},
{ // pruned
ObjectMeta: metav1.ObjectMeta{
Name: "pr-c",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Hour * -4)},
Labels: map[string]string{
stageLabel: config.DevStage,
tektonPipelineLabel: "p-one",
},
},
},
{ // pruned through pipeline p-two
ObjectMeta: metav1.ObjectMeta{
Name: "pr-d",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Hour * -5)},
Labels: map[string]string{
stageLabel: config.DevStage,
tektonPipelineLabel: "p-two",
},
},
},
{ // pruned through pipeline p-two
ObjectMeta: metav1.ObjectMeta{
Name: "pr-e",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Hour * -6)},
Labels: map[string]string{
stageLabel: config.DevStage,
tektonPipelineLabel: "p-two",
},
},
},
{ // not pruned because different stage (QA)
ObjectMeta: metav1.ObjectMeta{
Name: "pr-e",
CreationTimestamp: metav1.Time{Time: time.Now()},
Labels: map[string]string{
stageLabel: config.QAStage,
tektonPipelineLabel: "p-three",
},
},
},
{ // not pruned because different stage (PROD)
ObjectMeta: metav1.ObjectMeta{
Name: "pr-f",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Hour * -7)},
Labels: map[string]string{
stageLabel: config.ProdStage,
tektonPipelineLabel: "p-four",
},
},
},
{ // pruned
ObjectMeta: metav1.ObjectMeta{
Name: "pr-g",
CreationTimestamp: metav1.Time{Time: time.Now().Add(time.Hour * -8)},
Labels: map[string]string{
stageLabel: config.ProdStage,
tektonPipelineLabel: "p-four",
},
},
},
// not pruned
pipelineRun("pr-a", "p-one", config.DevStage, time.Now().Add(time.Minute*-1)),
// would be pruned by maxKeepRuns, but is protected by minKeepHours
pipelineRun("pr-b", "p-one", config.DevStage, time.Now().Add(time.Minute*-3)),
// pruned
pipelineRun("pr-c", "p-one", config.DevStage, time.Now().Add(time.Hour*-4)),
// pruned through pipeline p-two
pipelineRun("pr-d", "p-two", config.DevStage, time.Now().Add(time.Hour*-5)),
// pruned through pipeline p-two
pipelineRun("pr-e", "p-two", config.DevStage, time.Now().Add(time.Hour*-6)),
// not pruned because different stage (QA)
pipelineRun("pr-e", "p-three", config.QAStage, time.Now()),
// not pruned because different stage (PROD)
pipelineRun("pr-f", "p-four", config.ProdStage, time.Now().Add(time.Hour*-7)),
// pruned
pipelineRun("pr-g", "p-four", config.ProdStage, time.Now().Add(time.Hour*-8)),
}
err = p.Prune(context.TODO(), prs)
if err != nil {
@@ -170,3 +106,16 @@ func TestPrune(t *testing.T) {
t.Fatalf("p prune mismatch (-want +got):\n%s", diff)
}
}

func pipelineRun(name, pipeline string, stage config.Stage, creationTime time.Time) tekton.PipelineRun {
return tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Time{Time: creationTime},
Labels: map[string]string{
stageLabel: string(stage),
tektonPipelineLabel: pipeline,
},
},
}
}
61 changes: 61 additions & 0 deletions internal/manager/pvc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package manager

import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// Annotation to set the storage provisioner for a PVC.
storageProvisionerAnnotation = "volume.beta.kubernetes.io/storage-provisioner"
// PVC finalizer.
pvcProtectionFinalizer = "kubernetes.io/pvc-protection"
)

// createPVCIfRequired if it does not exist yet
func (s *Server) createPVCIfRequired(ctxt context.Context, pData PipelineData) error {
_, err := s.KubernetesClient.GetPersistentVolumeClaim(ctxt, pData.PVC, metav1.GetOptions{})
if err != nil {
if !kerrors.IsNotFound(err) {
return fmt.Errorf("could not determine if %s already exists: %w", pData.PVC, err)
}
vm := corev1.PersistentVolumeFilesystem
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pData.PVC,
Labels: map[string]string{repositoryLabel: pData.Repository},
Finalizers: []string{pvcProtectionFinalizer},
Annotations: map[string]string{},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceStorage): resource.MustParse(s.StorageConfig.Size),
},
},
StorageClassName: &s.StorageConfig.ClassName,
VolumeMode: &vm,
},
}
if s.StorageConfig.Provisioner != "" {
pvc.Annotations[storageProvisionerAnnotation] = s.StorageConfig.Provisioner
}
_, err := s.KubernetesClient.CreatePersistentVolumeClaim(ctxt, pvc, metav1.CreateOptions{})
if err != nil {
return err
}
}
return nil
}

func makePVCName(component string) string {
pvcName := fmt.Sprintf("ods-workspace-%s", strings.ToLower(component))
return fitStringToMaxLength(pvcName, 63) // K8s label max length to be on the safe side.
}
143 changes: 143 additions & 0 deletions internal/manager/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package manager

import (
"context"
"fmt"
"math/rand"
"sort"
"time"

tektonClient "github.com/opendevstack/pipeline/internal/tekton"
"github.com/opendevstack/pipeline/pkg/logging"
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// pipelineRunQueue manages multiple queues. These queues
// can be polled in vertain intervals.
type pipelineRunQueue struct {
queues map[string]bool
pollInterval time.Duration
// logger is the logger to send logging messages to.
logger logging.LeveledLoggerInterface
}

// StartPolling periodically checks status for given identifier.
// The time until the first time is not more than maxInitialWait.
func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool {
quit := make(chan bool)
if q.queues[identifier] {
close(quit)
return quit
}
q.queues[identifier] = true

maxInitialWaitSeconds := int(maxInitialWait.Seconds())
var ticker *time.Ticker
if maxInitialWaitSeconds > 1 {
initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1
ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second)
} else {
ticker = time.NewTicker(time.Second)
}
go func() {
for {
select {
case <-quit:
q.queues[identifier] = false
ticker.Stop()
return
case <-ticker.C:
ticker.Stop()
ticker = time.NewTicker(q.pollInterval)
queueLength, err := pt.AdvanceQueue(identifier)
if err != nil {
q.logger.Warnf("error during poll tick: %s", err)
}
if queueLength == 0 {
close(quit)
}
}
}
}()

return quit
}

// QueueAdvancer is the interface passed to
// *pipelineRunQueue#StartPolling.
type QueueAdvancer interface {
// AdvanceQueue is called for each poll step.
AdvanceQueue(repository string) (int, error)
}

// Queue represents a pipeline run Queue. Pipelines of one repository must
// not run in parallel.
type Queue struct {
TektonClient tektonClient.ClientPipelineRunInterface
}

// needsQueueing checks if any run has either:
// - pending status set OR
// - is progressing
func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool {
for _, pr := range pipelineRuns.Items {
if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) {
return true
}
}
return false
}

// AdvanceQueue starts the oldest pending pipeline run if there is no
// progressing pipeline run at the moment.
// It returns the queue length.
func (s *Server) AdvanceQueue(repository string) (int, error) {
s.Mutex.Lock()
defer s.Mutex.Unlock()
ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository)
if err != nil {
return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err)
}

var foundRunning bool
pendingPrs := []tekton.PipelineRun{}
for _, pr := range pipelineRuns.Items {
if pr.IsPending() {
pendingPrs = append(pendingPrs, pr)
continue
}
if pipelineRunIsProgressing(pr) {
foundRunning = true
continue
}
}

if !foundRunning && len(pendingPrs) > 0 {
// update oldest pending PR
sortPipelineRunsDescending(pendingPrs)
oldestPR := pendingPrs[len(pendingPrs)-1]
pendingPrs = pendingPrs[:len(pendingPrs)-1]
oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run
_, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{})
if err != nil {
return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err)
}
}
return len(pendingPrs), nil
}

// pipelineRunIsProgressing returns true if the PR is not done, not pending,
// not cancelled, and not timed out.
func pipelineRunIsProgressing(pr tekton.PipelineRun) bool {
return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut())
}

// sortPipelineRunsDescending sorts pipeline runs by time (descending)
func sortPipelineRunsDescending(pipelineRuns []tekton.PipelineRun) {
sort.Slice(pipelineRuns, func(i, j int) bool {
return pipelineRuns[j].CreationTimestamp.Time.Before(pipelineRuns[i].CreationTimestamp.Time)
})
}
220 changes: 220 additions & 0 deletions internal/manager/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package manager

import (
"testing"
"time"

tektonClient "github.com/opendevstack/pipeline/internal/tekton"
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// fakeAdvancerDone is always done advancing the queue.
type fakeAdvancerDone struct {
}

func (f *fakeAdvancerDone) AdvanceQueue(repository string) (int, error) {
return 0, nil
}

func TestPollIdentifier(t *testing.T) {
p := &pipelineRunQueue{
queues: map[string]bool{
"a": true,
"b": false,
},
pollInterval: time.Second,
}
f := &fakeAdvancerDone{}
p.StartPolling(f, "a", time.Second)
p.StartPolling(f, "b", time.Second)
if !p.queues["a"] {
t.Fatal("polling state for 'a' should be true")
}
if !p.queues["b"] {
t.Fatal("polling state for 'b' should be true")
}
}

// fakeAdvancerSteps can be called a few times before it is done advancing the queue.
type fakeAdvancerSteps struct {
count int
}

func (f *fakeAdvancerSteps) AdvanceQueue(repository string) (int, error) {
if f.count < 2 {
f.count++
return 1, nil
}
return 0, nil
}

func TestAdvanceQueueAndQuit(t *testing.T) {
p := &pipelineRunQueue{
queues: map[string]bool{},
pollInterval: time.Millisecond,
}
f := &fakeAdvancerSteps{}
done := p.StartPolling(f, "a", time.Second)
select {
case <-done:
t.Log("quit occured")
case <-time.After(5 * time.Second):
t.Fatal("quit should have occured")
}
}

func TestAdvanceQueue(t *testing.T) {
tests := map[string]struct {
runs []*tekton.PipelineRun
wantStart string
wantPollDone bool
}{
"none": {
runs: []*tekton.PipelineRun{},
wantStart: "",
wantPollDone: true,
},
"one cancelled, none pending": {
runs: []*tekton.PipelineRun{
cancelledPipelineRun(t, "one", time.Now()),
},
wantStart: "",
wantPollDone: true,
},
"one cancelled, one pending": {
runs: []*tekton.PipelineRun{
cancelledPipelineRun(t, "one", time.Now()),
pendingPipelineRun(t, "two", time.Now()),
},
wantStart: "two",
wantPollDone: true,
},
"one cancelled, two pending": {
runs: []*tekton.PipelineRun{
cancelledPipelineRun(t, "one", time.Now()),
pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)),
pendingPipelineRun(t, "three", time.Now().Add(time.Minute*-2)),
},
wantStart: "three",
wantPollDone: false,
},
"two pending": {
runs: []*tekton.PipelineRun{
pendingPipelineRun(t, "one", time.Now().Add(time.Minute*-2)),
pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)),
},
wantStart: "one",
wantPollDone: false,
},
"one timed out, one pending": {
runs: []*tekton.PipelineRun{
timedOutPipelineRun(t, "one", time.Now().Add(time.Minute*-2)),
pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)),
},
wantStart: "two",
wantPollDone: true,
},
"one running, one pending": {
runs: []*tekton.PipelineRun{
runningPipelineRun(t, "one", time.Now().Add(time.Minute*-2)),
pendingPipelineRun(t, "two", time.Now().Add(time.Minute*-1)),
},
wantStart: "",
wantPollDone: false,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
tclient := &tektonClient.TestClient{PipelineRuns: tc.runs}
s := &Server{TektonClient: tclient}
queueLength, err := s.AdvanceQueue("a")
if err != nil {
t.Fatal(err)
}
if tc.wantStart != "" {
if len(tclient.UpdatedPipelineRuns) != 1 {
t.Fatal("should have updated one run")
}
if tclient.UpdatedPipelineRuns[0] != tc.wantStart {
t.Fatalf("should have updated run '%s'", tc.wantStart)
}
} else {
if len(tclient.UpdatedPipelineRuns) > 0 {
t.Fatal("should not have updated any run")
}
}
if (queueLength == 0) != tc.wantPollDone {
t.Fatalf("want polling to be done: %v, but queue length is: %d", tc.wantPollDone, queueLength)
}
})
}
}

func pendingPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun {
pr := &tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Time{Time: creationTime},
},
Spec: tekton.PipelineRunSpec{
Status: tekton.PipelineRunSpecStatusPending,
},
}
if !pr.IsPending() {
t.Fatal("pr should be pending")
}
return pr
}

func cancelledPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun {
pr := &tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Time{Time: creationTime},
},
Spec: tekton.PipelineRunSpec{
Status: tekton.PipelineRunSpecStatusCancelled,
},
}
if !pr.IsCancelled() || pr.IsPending() || pr.IsDone() || pr.IsTimedOut() {
t.Fatal("pr should be cancelled")
}
return pr
}

func timedOutPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun {
// pipelineTimeout := pr.Spec.Timeout
// startTime := pr.Status.StartTime
pr := &tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Time{Time: creationTime},
},
Spec: tekton.PipelineRunSpec{
Timeout: &metav1.Duration{Duration: time.Second},
},
Status: tekton.PipelineRunStatus{
PipelineRunStatusFields: tekton.PipelineRunStatusFields{
StartTime: &metav1.Time{Time: time.Now().Add(-2 * time.Second)},
},
},
}
if !pr.IsTimedOut() || pr.IsPending() || pr.IsDone() || pr.IsCancelled() {
t.Fatal("pr should be timed out")
}
return pr
}

func runningPipelineRun(t *testing.T, name string, creationTime time.Time) *tekton.PipelineRun {
pr := &tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Time{Time: creationTime},
},
}
if pr.IsDone() || pr.IsPending() || pr.IsTimedOut() || pr.IsCancelled() {
t.Fatal("pr should be running")
}
return pr
}
541 changes: 88 additions & 453 deletions internal/manager/server.go

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions internal/manager/server_test.go
Original file line number Diff line number Diff line change
@@ -264,6 +264,25 @@ func testServer(kc kubernetes.ClientInterface, tc tektonClient.ClientInterface,
return httptest.NewServer(http.HandlerFunc(server.HandleRoot)), nil
}

func TestDoneness(t *testing.T) {
pr := &tekton.PipelineRun{
Spec: tekton.PipelineRunSpec{
Status: tekton.PipelineRunSpecStatusPending,
},
}
if pr.IsDone() {
t.Fatal("expected not to be done when pending")
}
pr = &tekton.PipelineRun{
Spec: tekton.PipelineRunSpec{
Status: tekton.PipelineRunSpecStatusCancelled,
},
}
if pr.IsDone() {
t.Fatal("expected not to be done when cancelled")
}
}

func TestWebhookHandling(t *testing.T) {

tests := map[string]struct {
11 changes: 11 additions & 0 deletions pkg/bitbucket/test_client.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
type TestClient struct {
Branches []Branch
Tags []Tag
Repos []Repo
Commits []Commit
PullRequests []PullRequest
// Files contains byte slices for filenames
@@ -40,6 +41,16 @@ func (c *TestClient) TagCreate(projectKey string, repositorySlug string, payload
return nil, errors.New("not implemented")
}

func (c *TestClient) RepoList(projectKey string) (*RepoPage, error) {
return &RepoPage{
Values: c.Repos,
}, nil
}

func (c *TestClient) RepoCreate(projectKey string, payload RepoCreatePayload) (*Repo, error) {
return nil, errors.New("not implemented")
}

func (c *TestClient) RawGet(project, repository, filename, gitFullRef string) ([]byte, error) {
if f, ok := c.Files[filename]; ok {
return f, nil

0 comments on commit d2f5168

Please sign in to comment.